# initialise the robot
import airgen
from airgen.utils.collect import data_collector
from airgen import WeatherParameter, Vector3r
from typing import List, Tuple, Dict, Any, Optional, Callable
import rerun as rr
import random, h5py, numpy as np

from grid.robot.wheeled.airgen_car import AirGenCar
client = AirGenCar().client

import torch
from grid.model.perception.tracking.cotracker import CoTracker
queries = torch.tensor([
    [0., 600., 350.],
    [0., 600., 250.],
    [10., 600., 500.],
    [20., 750., 600.],
    [30., 900., 200.]
])

model = CoTracker(queries = queries, save_results = False, use_local = True)

import airgen
search_radius = 25 # distance in meters
path = client.simPlanPathToRandomFreePoint(search_radius, smooth_path=True, draw_path=True) # generates the trajectory of points

points = []
for point in path:
    points.append(airgen.Vector3r(point['x_val'], point['y_val'], point['z_val']))

def readSensors(client: airgen.VehicleClient) -> dict:
    sensor_data = {}
    sensor_data['rgb'] = client.getImages("front_center",[airgen.ImageType.Scene])[0]
    return sensor_data

@data_collector(readSensors, time_delta=1)
def move_task(
    client: airgen.MultirotorClient, position: Tuple[float], **kwargs
) -> None | Tuple[None, List[dict]]:
    client.moveOnPath(points, velocity=5.0)


_, sensor_data = move_task(client, (0, 0, -10), _collect_data=True)
for i, data in enumerate(sensor_data):
    rgb, _ = data["rgb"]
    model.process_frame(rgb)

The CoTracker class implements a wrapper for the CoTracker model, which tracks points in video frames in an online manner (without having to look at the entire video).

class CoTracker()
queries
torch.Tensor
required

The queries to track points in the video.

save_results
boolean
default:
true

Whether to save the results. Defaults to False.

use_local
boolean
default:
true

If True, inference call is run on the local VM, else offloaded onto GRID-Cortex. Defaults to True.

def run()

Process a single video frame and updates tracking information.

This method appends the given frame to the window of frames and processes the frames at intervals defined by the CoTracker model’s step size. If the current frame count is a multiple of the step size, it updates the predicted tracks and visibility.

rgbimage
np.ndarray
required

Processes input video and tracks points.

Returns
Tuple[Optional[torch.Tensor], Optional[torch.Tensor]]

A tuple containing the predicted tracks and visibility tensors. If the frame count is not a multiple of the step size, both values will be None.

def finalize(self)

Finalize processing of remaining frames and return the final predicted tracks and visibility.

If there are any remaining frames that haven’t been processed in the window, they will be processed during this call. The results will be logged and optionally saved

Returns
Tuple[Optional[torch.Tensor], Optional[torch.Tensor]]

Final predicted tracks and visibility.

# initialise the robot
import airgen
from airgen.utils.collect import data_collector
from airgen import WeatherParameter, Vector3r
from typing import List, Tuple, Dict, Any, Optional, Callable
import rerun as rr
import random, h5py, numpy as np

from grid.robot.wheeled.airgen_car import AirGenCar
client = AirGenCar().client

import torch
from grid.model.perception.tracking.cotracker import CoTracker
queries = torch.tensor([
    [0., 600., 350.],
    [0., 600., 250.],
    [10., 600., 500.],
    [20., 750., 600.],
    [30., 900., 200.]
])

model = CoTracker(queries = queries, save_results = False, use_local = True)

import airgen
search_radius = 25 # distance in meters
path = client.simPlanPathToRandomFreePoint(search_radius, smooth_path=True, draw_path=True) # generates the trajectory of points

points = []
for point in path:
    points.append(airgen.Vector3r(point['x_val'], point['y_val'], point['z_val']))

def readSensors(client: airgen.VehicleClient) -> dict:
    sensor_data = {}
    sensor_data['rgb'] = client.getImages("front_center",[airgen.ImageType.Scene])[0]
    return sensor_data

@data_collector(readSensors, time_delta=1)
def move_task(
    client: airgen.MultirotorClient, position: Tuple[float], **kwargs
) -> None | Tuple[None, List[dict]]:
    client.moveOnPath(points, velocity=5.0)


_, sensor_data = move_task(client, (0, 0, -10), _collect_data=True)
for i, data in enumerate(sensor_data):
    rgb, _ = data["rgb"]
    model.process_frame(rgb)

This code is licensed under the CC-BY-NC 4.0 License.

Was this page helpful?