Hey all,
I am working with some basic control loops simulating inside of Isaac-Sim with the Pegasus extension to make the drone go in a square pattern. I have need for controlling the drone using just optical flow data instead of GPS but currently Pegasus does not have a simulated optical flow sensor out of the box.
My solution was to try and create an “ideal” optical flow sensor and range finder that sends optical flow and range finder data based on ground truth velocities from the simulator. In essence creating “perfect” optical flow. However, I am having trouble with this conversion and while the drone can arm, it quick loses track of where it is and how fast it is going, probably because the conversion from ground truth velocity to integrated y and integrated x is wrong. I’ll admit that I am new to both PX4 and optical flow sensor so I find the exact data I am supposed to be sending over plus how it is used on the PX4 side a bit confusing. The drone works well inside the simulator using a GPS, but it seems that relying on optical flow so far causes it to lose track of itself.
Here is the PX4 Flight Review log: https://review.px4.io/plot_app?log=7733851d-f633-446d-a055-6344099ead8e
Here is the ROS2 node for the ideal optical flow node:
#!/usr/bin/env python3
import csv
from datetime import datetime
import time
import numpy as np
import threading
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Odometry
from sensor_msgs.msg import Range
from mavros_msgs.msg import State, OpticalFlowRad
from tf_transformations import quaternion_from_euler, euler_from_quaternion
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
class OpticalFlowSim(Node):
def __init__(self):
super().__init__('optical_flow_sen_sim')
self.declare_parameter('publish_rate', 30.0)
qos_profile = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=1
)
self.ground_truth_state = {
'position': np.zeros(3),
'linear_vel': np.zeros(3),
'angular_vel': np.zeros(3)
}
self.of_pub = self.create_publisher(OpticalFlowRad, '/mavros/px4flow/raw/send', 10)
self.range_pub = self.create_publisher(Range, '/mavros/px4flow/ground_distance', 10)
self.time = 0
self.gt_odom_sub = self.create_subscription(
Odometry,
'/body/odom',
self.ground_truth_odometry_callback,
qos_profile,
)
self.state_sub = self.create_subscription(
State,
'/mavros/state',
self.state_callback,
qos_profile
)
self.get_logger().info('ROS2 MAVROS subscribers initialized')
self.state_lock = threading.Lock()
self.ground_truth_lock = threading.Lock()
self.stop_sim = False
self.data_received = False
self.start_time = self.get_time()
self.dt = 1.0 / self.get_parameter('publish_rate').value
self.integration_time_us = int(self.dt * 1e6)
self.timer = self.create_timer(self.dt, self.publish)
def get_time(self):
current_time = self.get_clock().now()
time_seconds = current_time.nanoseconds * 1e-9
return time_seconds
def state_callback(self, msg):
if msg.connected and not self.data_received:
self.get_logger().info('MAVROS connected to PX4!')
def ground_truth_odometry_callback(self, msg):
with self.state_lock:
self.ground_truth_state['position'] = np.array([
msg.pose.pose.position.x,
msg.pose.pose.position.y,
msg.pose.pose.position.z
])
self.ground_truth_state['linear_vel'] = np.array([
msg.twist.twist.linear.x,
msg.twist.twist.linear.y,
msg.twist.twist.linear.z
])
self.ground_truth_state['angular_vel'] = np.array([
msg.twist.twist.angular.x,
msg.twist.twist.angular.y,
msg.twist.twist.angular.z
])
if not self.data_received:
self.data_received = True
print("Receiving MAVROS odometry data!")
def get_ground_truth_state(self):
with self.state_lock:
return {
'position': self.ground_truth_state['position'].copy(),
'linear_vel': self.ground_truth_state['linear_vel'].copy(),
'angular_vel': self.ground_truth_state['angular_vel'].copy(),
}
def publish(self, height_offset=0.1):
state = self.get_ground_truth_state()
height = state['position'][2] + height_offset
linear_vel = state['linear_vel']
angular_vel = state['angular_vel']
quality = 255
total_flow_x = angular_vel[0] + linear_vel[1] / height
total_flow_y = angular_vel[1] + linear_vel[0] / height
integrated_x = total_flow_x
integrated_y = total_flow_y
integrated_xgyro = angular_vel[0] * self.dt
integrated_ygyro = angular_vel[1] * self.dt
integrated_zgyro = angular_vel[2] * self.dt
flow_msg = OpticalFlowRad()
flow_msg.header.stamp = self.get_clock().now().to_msg()
flow_msg.integration_time_us = self.integration_time_us
flow_msg.integrated_x = integrated_x
flow_msg.integrated_y = integrated_y
flow_msg.integrated_xgyro = integrated_xgyro
flow_msg.integrated_ygyro = integrated_ygyro
flow_msg.integrated_zgyro = integrated_zgyro
flow_msg.temperature = 20 # Dummy temperature
flow_msg.quality = quality
flow_msg.time_delta_distance_us = self.integration_time_us
flow_msg.distance = height
self.of_pub.publish(flow_msg)
range_msg = Range()
range_msg.header.stamp = self.get_clock().now().to_msg()
range_msg.radiation_type = Range.INFRARED
range_msg.field_of_view = 0.0523599 # 3 degrees
range_msg.min_range = 0.2
range_msg.max_range = 20.0
range_msg.range = height
self.range_pub.publish(range_msg)
self.get_logger().info(f"integrated_x: {integrated_x} ; integrated_y: {integrated_y} ; integrated_xgyro: {integrated_xgyro} ; integrated_ygyro: {integrated_ygyro} ; integrated_zgyro: {integrated_zgyro} ; height: {height} ; linear vel: {linear_vel} ; angular vel: {angular_vel} ; total_flow_x: {total_flow_x} ; total_flow_y: {total_flow_y}")
def shutdown(self):
self.stop_sim = True
self.destroy_node()
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
node = OpticalFlowSim()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.shutdown()
if __name__ == '__main__':
main()