PX4Flow simulated "ideal" optical flow from ground truth velocities from simulator

Hey all,

I am working with some basic control loops simulating inside of Isaac-Sim with the Pegasus extension to make the drone go in a square pattern. I have need for controlling the drone using just optical flow data instead of GPS but currently Pegasus does not have a simulated optical flow sensor out of the box.

My solution was to try and create an “ideal” optical flow sensor and range finder that sends optical flow and range finder data based on ground truth velocities from the simulator. In essence creating “perfect” optical flow. However, I am having trouble with this conversion and while the drone can arm, it quick loses track of where it is and how fast it is going, probably because the conversion from ground truth velocity to integrated y and integrated x is wrong. I’ll admit that I am new to both PX4 and optical flow sensor so I find the exact data I am supposed to be sending over plus how it is used on the PX4 side a bit confusing. The drone works well inside the simulator using a GPS, but it seems that relying on optical flow so far causes it to lose track of itself.

Here is the PX4 Flight Review log: https://review.px4.io/plot_app?log=7733851d-f633-446d-a055-6344099ead8e

Here is the ROS2 node for the ideal optical flow node:

#!/usr/bin/env python3

import csv
from datetime import datetime
import time
import numpy as np
import threading
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Odometry
from sensor_msgs.msg import Range
from mavros_msgs.msg import State, OpticalFlowRad
from tf_transformations import quaternion_from_euler, euler_from_quaternion

from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy

class OpticalFlowSim(Node):

    def __init__(self):
        super().__init__('optical_flow_sen_sim')

        self.declare_parameter('publish_rate', 30.0)

        qos_profile = QoSProfile(
            reliability=ReliabilityPolicy.BEST_EFFORT,
            history=HistoryPolicy.KEEP_LAST,
            depth=1
        ) 

        self.ground_truth_state = {
            'position': np.zeros(3),
            'linear_vel': np.zeros(3),
            'angular_vel': np.zeros(3)
        }

        self.of_pub = self.create_publisher(OpticalFlowRad, '/mavros/px4flow/raw/send', 10)

        self.range_pub = self.create_publisher(Range, '/mavros/px4flow/ground_distance', 10)

        self.time = 0

        self.gt_odom_sub = self.create_subscription(
            Odometry,
            '/body/odom',
             self.ground_truth_odometry_callback,
             qos_profile,
        )

        self.state_sub = self.create_subscription(
            State,
            '/mavros/state',
            self.state_callback,
            qos_profile
        )

        self.get_logger().info('ROS2 MAVROS subscribers initialized')

        self.state_lock = threading.Lock()
        self.ground_truth_lock = threading.Lock()

        self.stop_sim = False
        self.data_received = False

        self.start_time = self.get_time()

        self.dt = 1.0 / self.get_parameter('publish_rate').value
        self.integration_time_us = int(self.dt * 1e6)
        self.timer = self.create_timer(self.dt, self.publish)

    def get_time(self):
        current_time = self.get_clock().now()
        time_seconds = current_time.nanoseconds * 1e-9
        return time_seconds

    def state_callback(self, msg):
        if msg.connected and not self.data_received:
            self.get_logger().info('MAVROS connected to PX4!')

    def ground_truth_odometry_callback(self, msg):
        with self.state_lock:
            self.ground_truth_state['position'] = np.array([
                msg.pose.pose.position.x,
                msg.pose.pose.position.y,
                msg.pose.pose.position.z
            ])

            self.ground_truth_state['linear_vel'] = np.array([
                msg.twist.twist.linear.x,
                msg.twist.twist.linear.y,
                msg.twist.twist.linear.z
            ])

            self.ground_truth_state['angular_vel'] = np.array([
                msg.twist.twist.angular.x,
                msg.twist.twist.angular.y,
                msg.twist.twist.angular.z
            ])
            
            if not self.data_received:
                self.data_received = True
                print("Receiving MAVROS odometry data!")
        
    def get_ground_truth_state(self):
        with self.state_lock:
            return {
                'position': self.ground_truth_state['position'].copy(),
                'linear_vel': self.ground_truth_state['linear_vel'].copy(),
                'angular_vel': self.ground_truth_state['angular_vel'].copy(),
            }
    
    def publish(self, height_offset=0.1):
        state = self.get_ground_truth_state()

        height = state['position'][2] + height_offset
        linear_vel = state['linear_vel']
        angular_vel = state['angular_vel']

        quality = 255

        total_flow_x = angular_vel[0] + linear_vel[1] / height
        total_flow_y = angular_vel[1] + linear_vel[0] / height

        integrated_x = total_flow_x
        integrated_y = total_flow_y
        integrated_xgyro = angular_vel[0] * self.dt
        integrated_ygyro = angular_vel[1] * self.dt
        integrated_zgyro = angular_vel[2] * self.dt

        flow_msg = OpticalFlowRad()
        flow_msg.header.stamp = self.get_clock().now().to_msg()
        
        flow_msg.integration_time_us = self.integration_time_us
        flow_msg.integrated_x = integrated_x
        flow_msg.integrated_y = integrated_y
        flow_msg.integrated_xgyro = integrated_xgyro
        flow_msg.integrated_ygyro = integrated_ygyro
        flow_msg.integrated_zgyro = integrated_zgyro
        flow_msg.temperature = 20  # Dummy temperature
        flow_msg.quality = quality
        flow_msg.time_delta_distance_us = self.integration_time_us
        flow_msg.distance = height

        self.of_pub.publish(flow_msg)

        range_msg = Range()
        range_msg.header.stamp = self.get_clock().now().to_msg()
        range_msg.radiation_type = Range.INFRARED
        range_msg.field_of_view = 0.0523599  # 3 degrees
        range_msg.min_range = 0.2
        range_msg.max_range = 20.0
        range_msg.range = height
        
        self.range_pub.publish(range_msg)

        self.get_logger().info(f"integrated_x: {integrated_x} ; integrated_y: {integrated_y} ; integrated_xgyro: {integrated_xgyro} ; integrated_ygyro: {integrated_ygyro} ; integrated_zgyro: {integrated_zgyro} ; height: {height} ; linear vel: {linear_vel} ; angular vel: {angular_vel} ; total_flow_x: {total_flow_x} ; total_flow_y: {total_flow_y}")

    def shutdown(self):
        self.stop_sim = True
        self.destroy_node()
        rclpy.shutdown()

def main(args=None):
    rclpy.init(args=args)
    node = OpticalFlowSim()
    
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.shutdown()

if __name__ == '__main__':
    main()

Updating with my current code. Same issues and still does not work.

#!/usr/bin/env python3

import csv
from datetime import datetime
import time
import numpy as np
import threading
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Odometry
from sensor_msgs.msg import Range
from mavros_msgs.msg import State, OpticalFlowRad
from tf_transformations import quaternion_from_euler, euler_from_quaternion

from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy

class OpticalFlowSim(Node):

    def __init__(self):
        super().__init__('optical_flow_sen_sim')

        self.declare_parameter('publish_rate', 10.0)

        qos_profile = QoSProfile(
            reliability=ReliabilityPolicy.BEST_EFFORT,
            history=HistoryPolicy.KEEP_LAST,
            depth=1
        ) 

        self.ground_truth_state = {
            'position': np.zeros(3),
            'linear_vel': np.zeros(3),
            'angular_vel': np.zeros(3)
        }

        self.integrated_flow_x = 0.0
        self.integrated_flow_y = 0.0
        self.integrated_gyro_x = 0.0
        self.integrated_gyro_y = 0.0
        self.integrated_gyro_z = 0.0
        self.height = 0.0
        self.last_time = None
        self.integration_start_time = None

        self.of_pub = self.create_publisher(OpticalFlowRad, '/mavros/px4flow/raw/send', 10)

        self.time = 0

        self.gt_odom_sub = self.create_subscription(
            Odometry,
            '/body/odom',
             self.ground_truth_odometry_callback,
             qos_profile,
        )

        self.state_sub = self.create_subscription(
            State,
            '/mavros/state',
            self.state_callback,
            qos_profile
        )

        self.get_logger().info('ROS2 MAVROS subscribers initialized')

        self.state_lock = threading.Lock()
        self.ground_truth_lock = threading.Lock()

        self.stop_sim = False
        self.data_received = False

        self.start_time = self.get_time()

        self.dt = 1.0 / self.get_parameter('publish_rate').value
        self.timer = self.create_timer(self.dt, self.publish)

    def get_time(self):
        current_time = self.get_clock().now()
        time_seconds = current_time.nanoseconds * 1e-9
        return time_seconds

    def state_callback(self, msg):
        if msg.connected and not self.data_received:
            self.get_logger().info('MAVROS connected to PX4!')

    def ground_truth_odometry_callback(self, msg):

        with self.state_lock:
            current_time = self.get_time()
            if self.last_time is not None:
                dt = current_time - self.last_time

                # Get current state (don't accumulate!)
                position = np.array([msg.pose.pose.position.x,
                                msg.pose.pose.position.y,
                                msg.pose.pose.position.z])

                linear_vel = np.array([msg.twist.twist.linear.x,
                                msg.twist.twist.linear.y,
                                msg.twist.twist.linear.z])

                angular_vel = np.array([msg.twist.twist.angular.x,
                                    msg.twist.twist.angular.y,
                                    msg.twist.twist.angular.z])

                height = position[2] + 0.1
                flow_x = angular_vel[0] + linear_vel[1] / height
                flow_y = angular_vel[1] - linear_vel[0] / height

                self.integrated_flow_x += flow_x * dt
                self.integrated_flow_y += flow_y * dt
                self.integrated_gyro_x += angular_vel[0] * dt
                self.integrated_gyro_y += angular_vel[1] * dt
                self.integrated_gyro_z += angular_vel[2] * dt
                self.height = height

                if not self.data_received:
                    self.data_received = True
                    print("Receiving MAVROS odometry data!")

            self.last_time = current_time

            if self.integration_start_time is None:
                self.integration_start_time = current_time
        
    def publish(self):
        with self.state_lock:
            if self.integration_start_time is None:
                  return

            current_time = self.get_time()
            integration_time = current_time - self.integration_start_time

            if integration_time <= 0:
                return

            quality = 255

            flow_msg = OpticalFlowRad()
            flow_msg.header.stamp = self.get_clock().now().to_msg()

            flow_msg.integration_time_us = int(integration_time * 1e6)
            flow_msg.integrated_x = self.integrated_flow_x
            flow_msg.integrated_y = self.integrated_flow_y
            flow_msg.integrated_xgyro = self.integrated_gyro_x
            flow_msg.integrated_ygyro = self.integrated_gyro_y
            flow_msg.integrated_zgyro = self.integrated_gyro_z
            flow_msg.temperature = 20  # Dummy temperature
            flow_msg.quality = quality
            flow_msg.distance = self.height

            self.integrated_flow_x = 0.0
            self.integrated_flow_y = 0.0
            self.integrated_gyro_x = 0.0
            self.integrated_gyro_y = 0.0
            self.integrated_gyro_z = 0.0
            self.integration_start_time = current_time

            self.of_pub.publish(flow_msg)

    def shutdown(self):
        self.stop_sim = True
        self.destroy_node()
        rclpy.shutdown()

def main(args=None):
    rclpy.init(args=args)
    node = OpticalFlowSim()
    
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.shutdown()

if __name__ == '__main__':
    main()

Fixed my own issue. My EKF2_HGT_REF parameter was at 2 (rangefinder) but needs to be set at 0 (barometer). I didn’t know that setting this parameter to rangefinder excluded the rangefinder from being fused into terrain estimates.

Not sure if the code above it correct, but it fails a lot less worse now!

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.