This guide demonstrates how to create a robot that can safely navigate its environment using depth perception. The example works with both simulated (AirGenCar) and real robots (JetBot).

Overview

The safe navigation example combines:

  • Depth perception using MIDAS model
  • Grid-based environment analysis
  • Obstacle avoidance
  • Velocity-based navigation

Core Components

Depth Processing

We use the MIDAS model for depth estimation:

from grid.model.perception.depth.midas import MIDAS
depth = MIDAS()

def get_formatted_midas_image(rgb_image, depth_model):
    depth_image = depth_model.run(rgb_image.data)
    formatted = (depth_image * 255 / np.max(depth_image)).astype("uint8")
    return formatted

Grid Analysis

The system divides the depth image into a grid for analysis:

def divide_into_grid(depth_image, num_horizontal_patches, num_vertical_patches):
    patches = []
    patch_height = depth_image.shape[0] // num_vertical_patches
    patch_width = depth_image.shape[1] // num_horizontal_patches
    
    for v in range(num_vertical_patches):
        for h in range(num_horizontal_patches):
            patch = depth_image[
                v * patch_height:(v + 1) * patch_height,
                h * patch_width:(h + 1) * patch_width
            ]
            patches.append(patch)
    return patches

Safety Assessment

These functions evaluate the safety of different navigation paths:

def compute_safety_metric(patches):
    """Calculate mean distance for each patch"""
    return [np.mean(patch) for patch in patches]

def check_blocked(patches, means, num_h, num_v, obs_threshold=0.2, dist_threshold=0.2):
    """Determine if path is blocked and find best alternative"""
    # Check center patches for immediate obstacles
    np_patches = np.array(patches)
    middle_patches = [len(patches) // 2]
    if len(patches) % 2 == 0:
        middle_patches.append(len(patches)//2 - 1)
        
    for i in middle_patches:
        if is_patch_blocked(np_patches[i], obs_threshold):
            return True, -1
            
    # Check if all paths are too close
    if all(m < dist_threshold for m in means):
        return True, -1
        
    return False, determine_best_patch(means, num_h, num_v)

Movement Control

Steering Calculation

Converts patch selection to movement commands:

def map_patch_to_steering(patch_index, num_h, num_v):
    yaw = (patch_index[1] - num_h // 2) * (180 / num_h)
    pitch = (patch_index[0] - num_v // 2) * (180 / num_v)
    return np.clip(yaw, -30, 30), pitch

Main Control Loop

The main loop continuously:

  1. Captures RGB images
  2. Generates depth maps
  3. Analyzes environment safety
  4. Controls robot movement
def safenav_main(agent, rgb_image, depth_model):
    # Process depth image
    depth_image = get_formatted_midas_image(rgb_image, depth_model)
    patches = divide_into_grid(depth_image, 3, 1)
    
    # Analyze safety
    means = compute_safety_metric(patches)
    blocked, best_patch = check_blocked(patches, means, 3, 1)
    
    if blocked:
        # Rotate to find new path
        agent.rotate(1, 0.3)
        return
        
    # Navigate towards best path
    yaw, _ = map_patch_to_steering(best_patch, 3, 1)
    vel = Velocity(0.4, np.deg2rad(yaw), 0)
    agent.moveByVelocity(vel)

Robot Setup

Simulated Robot (AirGenCar)

For simulation environments:

from grid.robot.wheeled.airgen_car import AirGenCar
agent = AirGenCar()

Real Robot (JetBot)

For physical robot deployment:

from grid.robot.wheeled.jetbot import JetBot
agent = JetBot("192.168.254.198")  # Replace with your JetBot's IP

Running the Example

  1. First, ensure you have GRID installed and set up properly.

  2. Choose your robot type:

def start(sim_mode=False):
    agent = AirGenCar() if sim_mode else JetBot("192.168.254.198")
    main_navigation_loop(agent)
  1. Run the navigation loop:
if __name__ == '__main__':
    start(sim_mode=True)

The robot will start navigating while avoiding obstacles using depth perception.

Make sure to monitor the robot during initial testing and be ready to stop it if needed. The safety thresholds may need adjustment based on your specific environment.

Full Source Code

Here is the complete source code for the safe navigation example:

from grid.model.perception.depth.midas import MIDAS
from grid.utils.types import Image, Position, Velocity
from grid.utils.logger import log
from grid.robot.wheeled.airgen_car import AirGenCar
from grid.robot.wheeled.jetbot import JetBot
import matplotlib

import numpy as np
import cv2


def get_formatted_midas_image(rgb_image: Image, depth_model):
    """Convert an image from an rgb image to a depth image

    Args:
        rgb_image (grid.utils.types.Image): rgb image to process
        depth_model: depth model to run on the image

    Return:
        np.ndarray: Depth image with range [0, 255] for depth where higher is further
    """

    depth_image = depth_model.run(rgb_image.data)
    formatted = (depth_image * 255 / np.max(depth_image)).astype("uint8")
    return formatted

def divide_into_grid(depth_image: np.ndarray, num_horizontal_patches: int, num_vertical_patches: int):
    """Take a depth image and create n_horizontal * n_vertical patches out of it

    Args:
        depth_image (np.ndarray): accepts a normalized depth image from the depth sensor
        num_horizontal_patches (int):  number of horizontal patches
        num_vertical_patches (int):  number of vertical patches

    Return:
        list: patches of the image
    """
    patches = []
    patch_height = depth_image.shape[0] // num_vertical_patches
    patch_width = depth_image.shape[1] // num_horizontal_patches
    for v in range(num_vertical_patches):
        for h in range(num_horizontal_patches):
            patch = depth_image[v * patch_height:(v + 1) * patch_height,
                                h * patch_width:(h + 1) * patch_width]
            patches.append(patch)
    return patches

def compute_safety_metric(patches: list):
    """Accepts a list of patches and computes a safety metric for each patch

    Args:
        patches (list): list of patches

    Return:
        list: mean distance (not metric)
    """

    return [np.mean(patch) for patch in patches]

def determine_best_patch(safety_metrics, num_horizontal_patches, num_vertical_patches):
    """Accepts a list of safety metrics and returns the index of the best patch

    Args:
        safety_metrics: list of safety metrics
        num_horizontal_patches:  number of horizontal patches
        num_vertical_patches:  number of vertical patches

    Return:
        int: index of the best patch
    """
    # Reshape metrics array into grid form
    metrics_grid = np.array(safety_metrics).reshape((num_vertical_patches, num_horizontal_patches))
    # Find index of the best patch
    best_patch_index = np.unravel_index(np.argmin(metrics_grid), metrics_grid.shape)
    return best_patch_index

def check_blocked(patches: list, means: list, num_horizontal_patches: int, num_vertical_patches: int,
                  obs_threshold_ratio: float=.2, dist_threshold: float=.2):
    """Check if the robot's path is blocked by analyzing depth image patches.

    This function examines the center patches of the depth image to determine if there are obstacles
    directly in front of the robot. It also checks if all patches indicate objects are too close.

    Args:
        patches (list): List of depth image patches
        means (list): List of mean depth values for each patch
        num_horizontal_patches (int): Number of horizontal patches the image is divided into
        num_vertical_patches (int): Number of vertical patches the image is divided into
        obs_threshold_ratio (float, optional): Threshold ratio for considering a patch blocked. Defaults to 0.2
        dist_threshold (float, optional): Threshold distance for considering all patches too close. Defaults to 0.2

    Returns:
        tuple: (is_blocked, best_patch_index)
            - is_blocked (bool): True if path is blocked, False otherwise
            - best_patch_index (int): Index of best patch to move towards, -1 if blocked
    """
    np_patches = np.array(patches)
    odd = len(patches) % 2
    middle_i = [len(patches) // 2]
    if not odd:
        middle_i.append(len(patches)//2 - 1)
    for i in middle_i:
        patch = np_patches[i]
        patch[patch > .1] = 0
        patch[patch > 0] = 1
        print(f"Center patch has {np.mean(patch)} blocked ratio")
        if np.mean(patch) > obs_threshold_ratio:
            return True, -1
    np_means = np.array(means)
    print(f"means: {np_means}")
    if np_means.all() < dist_threshold:
        return True, -1
    else:
        return False, determine_best_patch(means, num_horizontal_patches, num_vertical_patches)

def map_patch_to_steering(patch_index, num_horizontal_patches, num_vertical_patches):
    '''
    accepts a patch index and maps it to a steering command
    i/p:
    patch_index: index of the best patch
    num_horizontal_patches:  number of horizontal patches
    num_vertical_patches:  number of vertical patches

    o/p:  yaw and pitch values
    '''
    # Simplified mapping
    yaw = (patch_index[1] - num_horizontal_patches // 2) * (180 / num_horizontal_patches)
    pitch = (patch_index[0] - num_vertical_patches // 2) * (180 / num_vertical_patches)
    print(yaw, pitch)
    return yaw, pitch

def safenav_main(agent, rgb_image, depth_model):
    grid_size = 3
    depth_image = get_formatted_midas_image(rgb_image, depth_model)

    log("grid/safenav/depth_image", Image(depth_image))
    patches_img = divide_into_grid(depth_image, grid_size, 1)  # Example grid size

    # Compute safety metrics
    means = compute_safety_metric(patches_img)
    blocked, best_patch_index = check_blocked(patches_img, means, grid_size, 1)
    if blocked:
        agent.rotate(1, .3)
        return

    # Map patch to steering
    yaw, pitch = map_patch_to_steering(best_patch_index, grid_size, 1)
    # Clip the yaw and pitch values
    yaw_clip=(-30, 30)
    yaw = np.clip(yaw, yaw_clip[0], yaw_clip[1])

    yaw_rad = np.deg2rad(yaw)
    vel = Velocity(0.4, yaw_rad, 0)
    agent.moveByVelocity(vel)

def main_navigation_loop(agent):
    depth = MIDAS()
    while(True):
        rgb_image = agent.getImage(camera_name="front_center", image_type="rgb")
        if rgb_image is None:
            continue
        log("grid/rgb_image", rgb_image)
        safenav_main(agent, rgb_image, depth)


def start(sim_mode: bool=False):
    if sim_mode:
        agent = AirGenCar()
    else:
        agent = JetBot("192.168.254.198")
    main_navigation_loop(agent)

if __name__ == '__main__':
    start(sim_mode=True)