from grid.model.perception.depth.midas import MIDAS
from grid.utils.types import Image, Position, Velocity
from grid.utils.logger import log
from grid.robot.wheeled.airgen_car import AirGenCar
from grid.robot.wheeled.jetbot import JetBot
import matplotlib
import numpy as np
import cv2
def get_formatted_midas_image(rgb_image: Image, depth_model):
"""Convert an image from an rgb image to a depth image
Args:
rgb_image (grid.utils.types.Image): rgb image to process
depth_model: depth model to run on the image
Return:
np.ndarray: Depth image with range [0, 255] for depth where higher is further
"""
depth_image = depth_model.run(rgb_image.data)
formatted = (depth_image * 255 / np.max(depth_image)).astype("uint8")
return formatted
def divide_into_grid(depth_image: np.ndarray, num_horizontal_patches: int, num_vertical_patches: int):
"""Take a depth image and create n_horizontal * n_vertical patches out of it
Args:
depth_image (np.ndarray): accepts a normalized depth image from the depth sensor
num_horizontal_patches (int): number of horizontal patches
num_vertical_patches (int): number of vertical patches
Return:
list: patches of the image
"""
patches = []
patch_height = depth_image.shape[0] // num_vertical_patches
patch_width = depth_image.shape[1] // num_horizontal_patches
for v in range(num_vertical_patches):
for h in range(num_horizontal_patches):
patch = depth_image[v * patch_height:(v + 1) * patch_height,
h * patch_width:(h + 1) * patch_width]
patches.append(patch)
return patches
def compute_safety_metric(patches: list):
"""Accepts a list of patches and computes a safety metric for each patch
Args:
patches (list): list of patches
Return:
list: mean distance (not metric)
"""
return [np.mean(patch) for patch in patches]
def determine_best_patch(safety_metrics, num_horizontal_patches, num_vertical_patches):
"""Accepts a list of safety metrics and returns the index of the best patch
Args:
safety_metrics: list of safety metrics
num_horizontal_patches: number of horizontal patches
num_vertical_patches: number of vertical patches
Return:
int: index of the best patch
"""
metrics_grid = np.array(safety_metrics).reshape((num_vertical_patches, num_horizontal_patches))
best_patch_index = np.unravel_index(np.argmin(metrics_grid), metrics_grid.shape)
return best_patch_index
def check_blocked(patches: list, means: list, num_horizontal_patches: int, num_vertical_patches: int,
obs_threshold_ratio: float=.2, dist_threshold: float=.2):
"""Check if the robot's path is blocked by analyzing depth image patches.
This function examines the center patches of the depth image to determine if there are obstacles
directly in front of the robot. It also checks if all patches indicate objects are too close.
Args:
patches (list): List of depth image patches
means (list): List of mean depth values for each patch
num_horizontal_patches (int): Number of horizontal patches the image is divided into
num_vertical_patches (int): Number of vertical patches the image is divided into
obs_threshold_ratio (float, optional): Threshold ratio for considering a patch blocked. Defaults to 0.2
dist_threshold (float, optional): Threshold distance for considering all patches too close. Defaults to 0.2
Returns:
tuple: (is_blocked, best_patch_index)
- is_blocked (bool): True if path is blocked, False otherwise
- best_patch_index (int): Index of best patch to move towards, -1 if blocked
"""
np_patches = np.array(patches)
odd = len(patches) % 2
middle_i = [len(patches) // 2]
if not odd:
middle_i.append(len(patches)//2 - 1)
for i in middle_i:
patch = np_patches[i]
patch[patch > .1] = 0
patch[patch > 0] = 1
print(f"Center patch has {np.mean(patch)} blocked ratio")
if np.mean(patch) > obs_threshold_ratio:
return True, -1
np_means = np.array(means)
print(f"means: {np_means}")
if np_means.all() < dist_threshold:
return True, -1
else:
return False, determine_best_patch(means, num_horizontal_patches, num_vertical_patches)
def map_patch_to_steering(patch_index, num_horizontal_patches, num_vertical_patches):
'''
accepts a patch index and maps it to a steering command
i/p:
patch_index: index of the best patch
num_horizontal_patches: number of horizontal patches
num_vertical_patches: number of vertical patches
o/p: yaw and pitch values
'''
yaw = (patch_index[1] - num_horizontal_patches // 2) * (180 / num_horizontal_patches)
pitch = (patch_index[0] - num_vertical_patches // 2) * (180 / num_vertical_patches)
print(yaw, pitch)
return yaw, pitch
def safenav_main(agent, rgb_image, depth_model):
grid_size = 3
depth_image = get_formatted_midas_image(rgb_image, depth_model)
log("grid/safenav/depth_image", Image(depth_image))
patches_img = divide_into_grid(depth_image, grid_size, 1)
means = compute_safety_metric(patches_img)
blocked, best_patch_index = check_blocked(patches_img, means, grid_size, 1)
if blocked:
agent.rotate(1, .3)
return
yaw, pitch = map_patch_to_steering(best_patch_index, grid_size, 1)
yaw_clip=(-30, 30)
yaw = np.clip(yaw, yaw_clip[0], yaw_clip[1])
yaw_rad = np.deg2rad(yaw)
vel = Velocity(0.4, yaw_rad, 0)
agent.moveByVelocity(vel)
def main_navigation_loop(agent):
depth = MIDAS()
while(True):
rgb_image = agent.getImage(camera_name="front_center", image_type="rgb")
if rgb_image is None:
continue
log("grid/rgb_image", rgb_image)
safenav_main(agent, rgb_image, depth)
def start(sim_mode: bool=False):
if sim_mode:
agent = AirGenCar()
else:
agent = JetBot("192.168.254.198")
main_navigation_loop(agent)
if __name__ == '__main__':
start(sim_mode=True)