Skip to content

pipedude/ros2-ai-robot-guide

Repository files navigation

⭐ Preface

A practical learning guide to ROS 2 using an AI robot example with an architecture built around LLM + VLM + sensors + actuator nodes. The main goal is to demonstrate core ROS 2 patterns through real examples that can be run even without real hardware.

Who this guide is for

  • For people learning ROS 2 from scratch who want to quickly understand nodes, topics, services, and the overall architecture.
  • For people building an AI robot who want to validate the logic locally first and later move it to a real platform.

This guide grew out of a real ROS 2 mobile robot project where the role of the “brain” is handled by an AI agent built on the OpenAI API: a combination of LLM + VLM, the Realtime API, and the regular API for logic, vision, speech, and decision-making.

The target hardware platform for the project is:

  • Chassis and motion — Wheeltec Ackermann, Pololu Dual driver on VNH5019, Raspberry Pi Pico 2 controller.
  • Compute center — Raspberry Pi 5 (8 GB RAM) with Active Cooler.
  • Storage — SanDisk Extreme microSDXC 128 GB.
  • Vision and head — Orbbec Gemini 335 3D camera, Hiwonder 2 DoF pan-tilt bracket, LSC-16 servo controller.
  • Interfaces and safety — front and rear ToF sonars, USB microphone, Adafruit MAX98357A I2S amplifier, and speaker.

This repository is not meant to demonstrate hardware, but to provide a clear and reproducible way to study ROS 2. That is why the focus here is on architecture and educational stub nodes: camera, audio, sonar, AI modules, and basic ROS 2 scenarios that can be run locally. First — understanding the architecture in a safe sandbox, then — moving it to a real robot.

What is already in the repository

  • mock_sonar.py — a virtual ToF sonar that publishes distance to /sensor/sonar_front.
  • safety_stop.py — a simple safety node that reacts to a nearby obstacle.
  • head_controller.py — a pan-tilt head control service via set_head_position.
  • mock_camera.py — a virtual camera that publishes sensor_msgs/Image to /camera/color/image_raw.
  • vlm_agent_stub.py — a stub VLM node that takes the latest frames and simulates sending them to a vision model.
  • mock_audio.py — microphone and speaker emulation through ROS 2 topics.
  • ai_driver.py — a translator from simple text AI commands to AckermannDriveStamped.
  • electronic_differential.py — calculates left and right wheel speeds for an Ackermann chassis.
  • ai_brain.py — an educational central AI node that orchestrates sensors, speech, services, and movement commands.
  • llm_parser.py — a parser for structured JSON responses from an LLM with validation via Pydantic.

What you can study and run without real hardware

  • A basic ROS 2 node graph — who publishes, who subscribes, who calls services.
  • Sensors in simulation — camera, sonar, audio input, and audio output.
  • AI orchestration — how the central node receives data and initiates actions.
  • Ackermann kinematics — how an abstract movement command becomes chassis parameters.
  • Preparation for LLM Function Calling — how to safely accept structured commands from the model.

Important note

This repository shows the educational and architectural foundation. Some nodes here are intentionally implemented as mocks, stubs, and demonstration scenarios, not as production-ready integrations with real drivers, the OpenAI API, and hardware. This is intentional so that the entry path into ROS 2 remains simple, visual, and reproducible.

Many people think ROS (Robot Operating System) is an operating system like Windows or Ubuntu. In reality, it is a set of libraries and tools (framework/middleware) that helps developers build robot applications.

The main task of ROS 2 is to make different pieces of hardware (LiDARs, cameras, motors) and software (vision algorithms, navigation) exchange data quickly, reliably, and through a unified standard.

Below are the key terms you need to know before writing code.


🤖 Introduction to ROS 2: Core Concepts and Architecture (Beginner Cheat Sheet)

🏗️ 1. Nodes

Analogy: Employees in a large company or organs in the human body.

A Node is the minimum program unit in ROS 2 that performs one specific task. A proper robot architecture consists of many small nodes, not one huge program.

  • Example 1: The camera_node only receives an image from the camera and passes it along.
  • Example 2: The wheel_motor_node only spins a wheel.
  • Example 3: The brain_node makes decisions.

If one node crashes, for example if the camera driver hangs, the others keep working.


📡 2. Topics and Messages

Analogy: A radio station and radio listeners.

Nodes need to communicate. The main communication mechanism in ROS 2 is Topics. This is one-way continuous data transfer (Publish / Subscribe).

  • Publisher: A node that sends data to a topic.
  • Subscriber: A node that reads data from a topic.
  • Message: A strict data format sent through a topic, for example a number, a string, a laser data array, or an image.

Example: A camera node (Publisher) continuously broadcasts video to the /video_stream topic. A face recognition node (Subscriber) and a video recording node (Subscriber) both read that topic at the same time. The camera does not care who listens to it or whether anyone listens at all.


🛎️ 3. Services

Analogy: Calling a waiter in a restaurant.

Topics stream data continuously, for example 30 times per second. But what if we only need to ask a question and receive an answer once? That is what Services are for.

Services work using the Request -> Response principle. This is a synchronous operation: until the client gets a response from the server, it waits.

  • Example: A navigation node asks a map node: “Give me the current room map” (Request). The map node sends back the file (Response).

🎬 4. Actions

Analogy: Ordering pizza delivery. You place the order, receive periodic status updates like “Preparing” and “Courier on the way”, and in the end the courier delivers the pizza.

Actions are needed for long-running tasks. Unlike Services, they do not block the system while the task is running, and they can send intermediate progress (Feedback). An Action can be canceled at any time.

  • Example: You send the robot the command: “Drive to the next room.” The robot starts moving, sends Feedback once per second like “5 meters left... 4 meters...”, and at the end sends Result: “I’m here!”.

📦 5. Packages and Workspace

All code in ROS 2 is strictly organized into folders.

  • Workspace: The main folder of your project, meaning the repository root. Inside it there is usually a src folder containing the source code.
  • Package: A folder inside src that contains related nodes, libraries, and settings. A package is what developers share on GitHub. Packages can be written in C++ or Python.
  • Colcon: The build tool. It takes source code from the src folder and turns it into working programs (builds/compiles it).

🛠️ Cheat Sheet: Basic Terminal Commands (CLI)

ROS 2 has a very powerful command-line interface. Here are the commands you will use every day, assuming you run them in a terminal with the ROS 2 environment activated:

Nodes

  • ros2 node list — show all running nodes.
  • ros2 node info /node_name — show detailed information about a node, including which topics it reads and writes.

Topics

  • ros2 topic list — show all active topics in the system.
  • ros2 topic echo /topic_name — print the data currently flowing through the topic to the terminal. Very useful for debugging.
  • ros2 topic hz /topic_name — check the frequency, that is, how many messages per second arrive in the topic.

Running packages

  • ros2 run <package_name> <executable_name> — run a specific node from a package, for example ros2 run demo_nodes_cpp talker.
  • ros2 launch <package_name> <launch_file_name>.py — run several nodes at once with predefined settings. This is used to launch the entire robot.

💡 Summary

  1. Write nodes that do one thing, but do it well.
  2. Connect them with topics for continuous data flow.
  3. Use services for quick one-time requests.
  4. Use actions for long physical robot actions.
  5. Build everything with colcon and share your packages.

🚀 Installing ROS 2 Jazzy on Ubuntu 24.04 (via Pixi)

This method solves the main ROS 2 problem on modern Linux systems — pain with graphics drivers (Wayland, Intel Iris) when using Docker.

Why Pixi?

  • 100% isolation: ROS 2 is installed locally inside your project folder (.pixi). Your main system stays perfectly clean. No dependency conflicts.
  • Native graphics: RViz2 and Gazebo run instantly using Ubuntu’s native drivers. No need to forward xhost, DISPLAY, or struggle with Docker flags.
  • One-click removal: Want to remove ROS? Just delete the project folder.

🛠 Step 1: Install the Pixi package manager

Open a terminal and run the installation (if Pixi is not installed yet):

curl -fsSL https://pixi.sh/install.sh | bash

⚠️ Important: After installation, close the terminal and open a new one so the pixi command becomes available.


📁 Step 2: Create the project and configure channels (the main 2026 trick)

Standard conda channels may not contain fresh Jazzy builds. We will connect the direct prefix.dev server.

If you cloned this repository from GitHub, just go to the project folder and initialize the environment:

git clone https://github.com/pipedude/ros2-ai-robot-guide.git
cd ros2-ai-robot-guide
pixi init

Add the correct package sources (order matters):

pixi project channel add https://prefix.dev/robostack-jazzy
pixi project channel add conda-forge

📥 Step 3: Install ROS 2

Install the full ROS 2 Jazzy version (Desktop) and the colcon tool for building our future code. From the project root, run:

pixi add ros-jazzy-desktop colcon-common-extensions

(The download will take a few minutes. Around 1.5–2 GB of data will be downloaded: Python, ROS, Qt libraries, and simulators.)


🎥 Step 4: Test graphics (RViz2)

Let’s check whether the environment detected your graphics card:

pixi run rviz2

If a window with a 3D grid opens (even if there is a No tf data warning) — congratulations, the graphics subsystem is working perfectly. Close the window, or press Ctrl+C in the terminal, to continue.


🤖 Step 5: Write your first node (Hello World)

Now let’s write code. To avoid typing pixi run before every command, let’s “enter” our isolated environment:

pixi shell

(A prefix with your project name will appear on the left side of the terminal.)

If you work in Windsurf, it is convenient to immediately open the current project folder from the terminal:

windsurf .

The dot at the end means: open the current project folder.

1. Create the workspace and package:

mkdir src && cd src
ros2 pkg create --build-type ament_python my_first_package --dependencies rclpy std_msgs

2. Write the node code: Go to src/my_first_package/my_first_package/ and create hello_node.py there. Paste this code:

import rclpy
from rclpy.node import Node

class HelloNode(Node):
    def __init__(self):
        super().__init__('hello_node')
        self.get_logger().info("Hello, ROS 2! The robot has woken up!")
        self.count = 1
        # Timer: call the function every second
        self.create_timer(1.0, self.timer_callback)

    def timer_callback(self):
        self.get_logger().info(f"I have been working for {self.count} seconds already. All systems normal!")
        self.count += 1


def main(args=None):
    rclpy.init(args=args)
    node = HelloNode()
    rclpy.spin(node)
    rclpy.shutdown()

if __name__ == '__main__':
    main()

3. Register the script: Open setup.py (it is located in src/my_first_package/) and find the entry_points block at the bottom. Change it like this:

    entry_points={
        'console_scripts':[
            'hello = my_first_package.hello_node:main'
        ],
    },

🚀 Step 6: Build and run

If you are currently in the src folder, go one level up — back to the project root — and build the code:

cd ..
colcon build

Load the newly built files into the current session:

source install/setup.bash

Run our robot:

ros2 run my_first_package hello

Expected terminal output:

[INFO] [hello_node]: Hello, ROS 2! The robot has woken up!
[INFO] [hello_node]: I have been working for 1 seconds already. All systems normal!
[INFO] [hello_node]: I have been working for 2 seconds already. All systems normal!

🎉 Excellent. Now you have a powerful, reliable, and completely safe environment for your OS to study robotics.


🧭 Learning Plan: "ROS 2 for an AI Robot (Ubuntu 24.04 + Python)"

Below is the current learning path for the repository in its present state. It is tied to the nodes and scenarios that already exist in the project or are directly prepared within this guide.

Module 1: Basic skeleton (Environment and Nodes)

  • Lesson 1 (Starting today): Initialize ROS 2 through Pixi, create a workspace, and write your first Python nodes in Windsurf.
  • Lesson 2: Topics. Create a virtual ToF sonar (Publisher) and a safety node (Subscriber) that will shout “Stop!” if an obstacle is close.
  • Lesson 3: Services and Actions. Write a service for the pan-tilt “head” (turn the camera to a target angle) and an action for long-running tasks (“drive to a point”).

Module 2: Kinematics and Control (Ackermann chassis)

  • Lesson 4: Control messages. Study ackermann_msgs. Learn how to convert commands from AI (forward, left, right) into a steering angle and speed for the Pico 2 + VNH5019 setup.
  • Lesson 5: TF2 (Transforms). Understand the robot coordinate systems: how the “eyes” (the Gemini 335 camera) are positioned relative to the wheel center.
  • Lesson 6: Visualization. Launch RViz2. Display our robot and “fake” sensor data virtually without real hardware.

Module 3: Senses (Vision and Hearing)

  • Lesson 7: Working with images. Subscribe to sensor_msgs/Image. Learn how to intercept frames in Python to send them to a VLM (OpenAI Vision).
  • Lesson 8: Working with sound in ROS 2. Learn how to pack a microphone audio stream into topics and how to play sound (LLM responses) through an Adafruit I2S amplifier.

Module 4: AI Orchestrator (Project core)

  • Lesson 9: Educational AI brain and orchestration. Break down ai_brain.py, which listens to audio and sensors, calls the head service, and publishes movement commands.
  • Lesson 10: Parsing commands from an LLM in ROS 2. Learn how to make an LLM produce structured JSON commands that llm_parser.py can translate into Ackermann movement and servo turns.

Important: The bridge to real hardware, Micro-ROS, running the Orbbec Gemini on Raspberry Pi 5, and hardware integration will be added later. They are intentionally not included in the current version of this guide.


🚀 Lesson 1. First steps: isolated Pixi environment and the first Node

In the Windsurf IDE (you can use VS Code too, but I like Windsurf), we will use pixi for dependency management. This saves us from dealing with painful system-level Python and ROS 2 dependencies.

Step 1: Project initialization Open a terminal in Windsurf and move to the project folder:

# If the repository is already cloned from GitHub
cd ros2-ai-robot-guide

# If you are starting from scratch without cloning
# mkdir my_project && cd my_project
pixi init

Now add the https://prefix.dev/robostack-jazzy and conda-forge channels (as in the current pixi.toml) and install the base ROS 2 Jazzy and Python packages:

pixi project channel add https://prefix.dev/robostack-jazzy
pixi project channel add conda-forge

# Install the base ROS 2 packages and build tools
pixi add ros-jazzy-desktop ros-jazzy-rclpy compilers cmake pkg-config

Step 2: Create the first script (virtual “Brain” and “Sonar”) Before we create a more complex ROS package structure (with colcon), let’s write a simple script to understand the essence of rclpy (ROS Client Library for Python).

Create the file mock_sonar.py in the project root:

import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
import random

class MockSonarNode(Node):
    def __init__(self):
        # Node name in the ROS 2 graph
        super().__init__('mock_sonar_front')

        # Create a Publisher: publish distance (Float32) to '/sensor/sonar_front'
        self.publisher_ = self.create_publisher(Float32, '/sensor/sonar_front', 10)

        # Timer that calls the function every 1 second
        timer_period = 1.0
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.get_logger().info('Virtual front ToF sonar started!')

    def timer_callback(self):
        msg = Float32()
        # Simulate distance from 0.2 to 3.0 meters
        msg.data = random.uniform(0.2, 3.0)
        self.publisher_.publish(msg)
        self.get_logger().info(f'Publishing distance: {msg.data:.2f} m')


def main(args=None):
    rclpy.init(args=args)
    node = MockSonarNode()
    try:
        rclpy.spin(node) # Spin until Ctrl+C is pressed
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

Step 3: Run it Since we use Pixi, we need to run the script inside its environment:

pixi run python mock_sonar.py

You should see logs in the terminal showing your virtual sonar (future ToF) publishing data.

Your task:

  1. Follow these steps in Windsurf.
  2. Check whether the script works.
  3. Extra challenge: While the script is running, open a second terminal in Windsurf, activate the pixi shell (pixi shell), and run ros2 topic list, then ros2 topic echo /sensor/sonar_front.

What just happened “under the hood”? In ROS 2 there is no central “server” (as there was in ROS 1). When you launched your Python script, it created a Publisher node. When you ran ros2 topic echo, ROS 2 temporarily created an unnamed Subscriber node. Thanks to DDS (Data Distribution Service), they automatically “found” each other on the local network (in this case inside your OS) and started exchanging Float32 messages through the /sensor/sonar_front channel (topic).


📡 Lesson 2: Writing a Subscriber — the Safety Node

Since your robot will be controlled by an AI agent (LLM), we need to account for network delays and possible model hallucinations. If the LLM says “Drive forward” while there is a wall in front of the robot, the robot should not wait for the AI — it needs a basic reflex.

Now we will write a subscriber node that listens to our sonar and shouts “STOP!” if the distance is less than 0.5 meters.

Step 1: Create the subscriber script In your project root (the same place as mock_sonar.py), create a new file safety_stop.py:

import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32

class SafetyNode(Node):
    def __init__(self):
        super().__init__('safety_node')

        # Create Subscriber.
        # Arguments: Message type, Topic name, Callback function, Queue size (QoS)
        self.subscription = self.create_subscription(
            Float32,
            '/sensor/sonar_front',
            self.sonar_callback,
            10
        )
        self.subscription  # prevent garbage collection of the variable

        # Set the critical distance in meters
        self.critical_distance = 0.5

        self.get_logger().info('Safety node started. Guarding the perimeter!')

    def sonar_callback(self, msg):
        # This function is called AUTOMATICALLY every time a new message arrives in the topic
        distance = msg.data

        if distance < self.critical_distance:
            # Use the WARN logging level (it will be highlighted)
            self.get_logger().warning(f'EMERGENCY STOP! Obstacle at distance {distance:.2f} m!')
        else:
            self.get_logger().info(f'Path is clear (obstacle at {distance:.2f} m).')


def main(args=None):
    rclpy.init(args=args)
    safety_node = SafetyNode()

    try:
        rclpy.spin(safety_node) # Wait for messages indefinitely
    except KeyboardInterrupt:
        pass
    finally:
        safety_node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

Step 2: Run the pair

  1. In the first terminal, run the sonar:
    pixi run python mock_sonar.py
  2. In the second terminal, run the safety node:
    pixi run python safety_stop.py

What should happen: As long as the distance is above 0.5 meters, the node logs that the path is clear. When the sonar publishes a smaller value, the safety node immediately prints a warning.

This is a simple but extremely important architectural layer for your future robot.


🛎️ Lesson 3: Services — controlling the robot “head”

How is this different from Topics?

  • A Topic is a radio station. The sonar (Publisher) endlessly broadcasts “Distance 1 meter!”, and the Safety node (Subscriber) listens. This is a continuous data stream without delivery guarantees.
  • A Service is a phone call (Request -> Response). The client says: “Turn the head 45 degrees to the right.” The server does it and replies: “Done, turned it.”

For your Hiwonder 2 DoF Pan Tilt bracket (where one servo turns the head left-right / Pan and the other up-down / Tilt), services are a perfect fit.

Your future AI agent (LLM) will not constantly stream coordinates. It will say once: “Look at the person on the left” -> call the Service -> wait for the response that the camera has turned -> then grab a frame from the Orbbec Gemini.

Step 1: Write the head control server

Since we have not yet created our own message type (that will require colcon, which we will reach later), we will use the standard ROS 2 service type AddTwoInts. Let’s “cheat” and pretend that:

  • Number a — is the Pan angle (left-right, from -90 to 90 degrees)
  • Number b — is the Tilt angle (up-down, from -90 to 90 degrees)

Create head_controller.py in the project root:

import rclpy
from rclpy.node import Node
# Import the standard service type (Request: a, b. Response: sum)
from example_interfaces.srv import AddTwoInts
import time

class HeadControllerService(Node):
    def __init__(self):
        super().__init__('head_controller')

        # Create the Service Server
        # Arguments: Service type, Service name, Handler function
        self.srv = self.create_service(AddTwoInts, 'set_head_position', self.move_head_callback)

        # Current angles (the robot is looking straight)
        self.current_pan = 0
        self.current_tilt = 0

        self.get_logger().info('Pan-Tilt head controller started. Waiting for commands (Services)...')

    def move_head_callback(self, request, response):
        # This function is called when someone “calls” this service
        target_pan = request.a
        target_tilt = request.b

        self.get_logger().info(f'Received command: turn the head to Pan={target_pan}°, Tilt={target_tilt}°')

        # Simulate the physical motion time of the LSC-16 servos (1 second)
        self.get_logger().info('Motors buzzing...')
        time.sleep(1.0)

        # Update the current coordinates
        self.current_pan = target_pan
        self.current_tilt = target_tilt

        # Build the response.
        # In AddTwoInts the response is stored in the "sum" field.
        # We will write 1 there to indicate success.
        response.sum = 1
        self.get_logger().info('Motion complete. Sending response.')

        return response


def main(args=None):
    rclpy.init(args=args)
    head_controller = HeadControllerService()

    try:
        rclpy.spin(head_controller)
    except KeyboardInterrupt:
        pass
    finally:
        head_controller.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

Step 2: Run the server

In the first terminal, run our new node:

pixi run python head_controller.py

Step 3: Call the service from the terminal (Client)

Open a second terminal (make sure you are in the pixi shell environment or use the pixi run prefix). ROS 2 has a fantastic testing utility — we can become a “client” right from the command line.

Enter the command that tells the robot to look right (45°) and slightly up (15°):

pixi run ros2 service call /set_head_position example_interfaces/srv/AddTwoInts "{a: 45, b: 15}"

What should happen:

  1. In the second terminal you will see the request and, after one second, the response response: example_interfaces.srv.AddTwoInts_Response(sum=1).
  2. In the first terminal (where the script is running) you will see logs: command received, motor buzzing Motors buzzing..., and a success report.

Try calling the service a few times with different angles. This is exactly the action your LLM will perform through Python code when it decides to “look around.”

How this fits into your future AI use case: In the future your LLM, through the OpenAI API, will generate JSON like {"action": "look_around", "pan": 45, "tilt": 15}. Your central Python script will parse that JSON and call this exact ROS 2 service. Until the robot head physically turns, the LLM will not receive a frame from the Orbbec Gemini camera.

(Note: ROS 2 also has Actions. They are like Services, but for very long-running tasks that can be canceled — for example, “drive to another room.” We will study them a bit later when we get to navigation.)

Now we move on to the most interesting part.


🚗 Lesson 4: Chassis kinematics (Ackermann) and the AI command translator

Your Wheeltec chassis uses Ackermann steering. That means your robot is controlled like a real car: it has front steering wheels (which define the steering angle) and rear drive wheels (which define the speed). It cannot spin in place like a robot vacuum.

In ROS 2 there is a dedicated message standard for such robots — ackermann_msgs. In the future your Raspberry Pi Pico 2 controller will subscribe to these messages and convert them into PWM signals for the Pololu VNH5019 driver.

Let’s write a Translator node that listens to simple text commands from the AI (for example, forward, left) and converts them into physical chassis commands.

Step 1: Add the message package

Since ackermann_msgs is not part of the most minimal installation, we need to add it to our pixi environment. Open a terminal and run:

pixi add ros-jazzy-ackermann-msgs

Step 2: Write the Translator node

Create ai_driver.py in the project root:

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from ackermann_msgs.msg import AckermannDriveStamped

class AIDriverNode(Node):
    def __init__(self):
        super().__init__('ai_driver_translator')

        # Subscriber for text commands (Topic: /ai_command)
        self.subscription = self.create_subscription(
            String, '/ai_command', self.command_callback, 10)

        # Publisher of physical commands for the chassis (Topic: /drive)
        self.publisher = self.create_publisher(
            AckermannDriveStamped, '/drive', 10)

        self.get_logger().info('AI -> Ackermann translator started. Waiting for text commands on /ai_command')

    def command_callback(self, msg):
        command = msg.data.lower()

        # Create an empty Ackermann message
        drive_msg = AckermannDriveStamped()

        # Logic for translating text into physics:
        # speed (m/s) - linear speed
        # steering_angle (radians) - steering wheel angle

        if command == 'forward':
            drive_msg.drive.speed = 0.5
            drive_msg.drive.steering_angle = 0.0
        elif command == 'left':
            drive_msg.drive.speed = 0.3
            drive_msg.drive.steering_angle = 0.5  # About 28 degrees left
        elif command == 'right':
            drive_msg.drive.speed = 0.3
            drive_msg.drive.steering_angle = -0.5 # About 28 degrees right
        elif command == 'backward':
            drive_msg.drive.speed = -0.3
            drive_msg.drive.steering_angle = 0.0
        elif command == 'stop':
            drive_msg.drive.speed = 0.0
            drive_msg.drive.steering_angle = 0.0
        else:
            self.get_logger().warning(f'AI issued an unknown command: {command}')
            return

        self.publisher.publish(drive_msg)
        self.get_logger().info(f'AI said "{command}". Driving: Speed={drive_msg.drive.speed} m/s, Steering={drive_msg.drive.steering_angle} rad')


def main(args=None):
    rclpy.init(args=args)
    node = AIDriverNode()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

Step 3: Test the setup!

  1. In the first terminal run the translator:

    pixi run python ai_driver.py
  2. In the second terminal we will play the role of the AI. Send the command left to the /ai_command topic:

    pixi run ros2 topic pub --once /ai_command std_msgs/msg/String "{data: 'left'}"

    Try sending different commands: forward, backward, stop, jump (to test an unknown command).

  3. (Optional) In the third terminal you can see how these commands are converted into the motor-friendly format:

    pixi run ros2 topic echo /drive

Testing: Send a command through the terminal and look at the output. But there is an important nuance: our chassis uses two independent rear motors without a mechanical differential. How do we handle that?

Solution for two motors without a differential: A classic car uses a mechanical differential. If the rear axle is rigidly locked, the wheels will slip, the robot will “hop” in turns, and the motors plus the VNH5019 driver will overheat.

We will implement an Electronic Differential (E-Diff).

How do we reflect this in the ROS 2 architecture?

The ros-jazzy-ackermann-msgs package describes the ideal kinematic model. In the AckermannDrive message we pass the desired speed of the robot center and the desired steering angle of the front wheels.

The AI agent does not need to know how many motors you have. It simply says: “Drive forward at 0.5 m/s and turn the wheel by 0.3 radians.”

Then, between the /drive topic and the Raspberry Pi Pico 2 controller, we insert a special node — the Kinematics Solver.

Here is how it works mathematically: Let $L$ be the wheelbase, and $W$ the track width. If we turn left, the robot moves along an arc of radius $R = L / \tan(steering_angle)$.

  • Left wheel speed: $V_{left} = V \cdot (1 - \frac{W}{2R})$
  • Right wheel speed: $V_{right} = V \cdot (1 + \frac{W}{2R})$

Let’s write this node right now!

This is a great continuation of Lesson 4. We will write a script that listens to AckermannDrive from the previous step and calculates speeds for the left and right motor.

Create electronic_differential.py in the project folder:

import rclpy
from rclpy.node import Node
from ackermann_msgs.msg import AckermannDriveStamped
import math

class ElectronicDifferential(Node):
    def __init__(self):
        super().__init__('electronic_differential')

        # Subscribe to commands from AI (or navigation)
        self.subscription = self.create_subscription(
            AckermannDriveStamped, '/drive', self.drive_callback, 10)

        # Approximate dimensions of your Wheeltec chassis in meters (measure later)
        self.wheelbase = 0.25  # L: distance between front and rear axle
        self.track_width = 0.15 # W: distance between the left and right rear wheel

        self.get_logger().info('Electronic differential started. Waiting for /drive commands...')

    def drive_callback(self, msg):
        v = msg.drive.speed
        delta = msg.drive.steering_angle

        # If we drive straight or stand still (steering angle close to zero)
        if abs(delta) < 0.01:
            v_left = v
            v_right = v
        else:
            # Turning radius of the rear axle center
            R = self.wheelbase / math.tan(delta)

            # Calculate the speed for each wheel
            v_left = v * (1 - self.track_width / (2 * R))
            v_right = v * (1 + self.track_width / (2 * R))

        # In the future this node will send v_left, v_right, and delta
        # to the Raspberry Pi Pico 2 via Serial or I2C.
        # For now we just log it nicely:

        if v == 0:
            status = "STOPPED"
        elif delta > 0.01:
            status = "TURNING LEFT"
        elif delta < -0.01:
            status = "TURNING RIGHT"
        else:
            status = "MOVING STRAIGHT"

        self.get_logger().info(
            f'[{status}] left={v_left:.2f} m/s, right={v_right:.2f} m/s, steer={delta:.2f} rad'
        )


def main(args=None):
    rclpy.init(args=args)
    node = ElectronicDifferential()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

How do you test this magic?

  1. Run ai_driver.py from the previous step in the first terminal.
  2. Run electronic_differential.py in the second terminal.
  3. In the third terminal, send the command left through the topic (as we did in the previous lesson):
    pixi run ros2 topic pub --once /ai_command std_msgs/msg/String "{data: 'left'}"

What you will see in the second terminal: When the command left arrives, ai_driver.py will say: “drive at speed 0.3 and turn by 0.5 rad.” The new node will catch that and calculate that the left motor must spin slower and the right motor faster.

This architectural layer decouples the high-level AI from the physics of specific hardware. When the Raspberry Pi Pico 2 arrives, this code can either move into the microcontroller or stay in Python on the Raspberry Pi 5, sending ready-to-use wheel speeds.

Try running it and sending a turning command. It is a great feeling when the math comes alive in the logs.

We finish the second module (Kinematics and Control) with a crucial topic — without it, computer vision (VLM) and navigation simply cannot work.


🧭 Lesson 5 and 6: The robot’s spatial imagination (TF2) and Visualization (RViz2)

Why does your AI robot need this? Imagine your Orbbec Gemini 335 3D camera sees an obstacle exactly 1 meter away. The VLM model says: “Obstacle at distance 1 meter.” But the camera is mounted on a bracket at the front of the robot, while the wheel rotation center (from which the robot computes its coordinates) is 20 centimeters behind it. If the robot drives forward by 1 meter, it will crash into the obstacle.

To prevent this, ROS 2 has TF2 (Transform Framework). This is a system that stores a “tree” of coordinate frames. We will have at least two points:

  • base_link — the center of the robot (between the rear wheels).
  • camera_link — the eye of the camera.

Let’s look at this with our own eyes in a virtual space, since we do not yet have the real hardware.

Step 1: Launch RViz2 (The Matrix for the robot)

RViz2 (ROS Visualization) is a very powerful 3D tool. It lets you see what the robot “thinks.”

Open a terminal in Windsurf and run:

pixi run rviz2

A black window with a grid will open. This is the empty mind of your robot.

  1. In the left panel (Displays), find the Fixed Frame field. Right now it says map. Change it to base_link (type it manually and press Enter).
  2. Click Add (bottom left) -> choose the By topic tab -> find /tf -> choose TF -> click OK. Nothing will appear yet because we have not created our frames. Do not close RViz2.

Step 2: Broadcast coordinates (Static Transform Publisher)

Since the bracket is rigidly attached to the chassis, the distance between the robot center and the head base does not change. For this, ROS 2 has a built-in node, and we do not even need to write Python code.

Open a second terminal in Windsurf and enter this command (it launches the standard ROS 2 node for publishing static transforms):

pixi run ros2 run tf2_ros static_transform_publisher --x 0.15 --y 0.0 --z 0.25 --yaw 0 --pitch 0 --roll 0 --frame-id base_link --child-frame-id camera_link

What do these numbers mean? We are telling ROS 2: "The camera (camera_link) is 15 cm in front (x 0.15) and 25 cm above (z 0.25) the robot center (base_link)".

Step 3: Look at the result

Go back to the RViz2 window. You should see coordinate axes appear in the center of the grid.

  • One coordinate frame in the center of the grid — that is your base_link (center of the rear axle).
  • A second coordinate frame slightly in front and above — that is your camera_link (where the Orbbec Gemini will be mounted).
  • A yellow line between them showing their relationship.

(Red arrow = X axis / forward, Green = Y axis / left, Blue = Z axis / up).

Task:

  1. Run rviz2 and the static_transform_publisher command.
  2. Configure TF display in RViz2.
  3. Take a screenshot of this 3D space.

Why is this so useful for your VLM agent? When the VLM (OpenAI) recognizes an object in the camera image (for example, “I see a ball 1 meter away”), ROS 2 will automatically recalculate through the TF framework: “Okay, the ball is 1 meter from the camera, so it is 1.15 meters from the wheels. It is safe to drive.” You will not need to calculate those offsets manually in code.


👁️ Lesson 7: Working with images

Now we move to Lesson 7. Since we do not yet have the physical Orbbec Gemini 335 camera, we will write a Virtual Camera that generates a video stream. Then we will write a VLM agent stub node that intercepts those frames.

In the ROS 2 + Python world, the gold standard for working with images is the combination of OpenCV and the cv_bridge library (it converts images from the OpenCV format into ROS 2 sensor_msgs/Image messages and back).

Step 1: Add the libraries for vision

Open a terminal in Windsurf (make sure you are in the project root) and add the necessary packages to the Pixi environment:

pixi add ros-jazzy-cv-bridge ros-jazzy-sensor-msgs opencv

Step 2: Create the Virtual Camera

Create mock_camera.py:

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import numpy as np

class MockCamera(Node):
    def __init__(self):
        super().__init__('mock_camera')
        # Publish to the standard topic for Orbbec color cameras
        self.publisher_ = self.create_publisher(Image, '/camera/color/image_raw', 10)

        # Timer for 10 FPS (1 frame every 0.1 sec)
        self.timer = self.create_timer(0.1, self.timer_callback)
        self.bridge = CvBridge()
        self.frame_count = 0

        self.get_logger().info('Virtual Orbbec camera started (10 FPS)...')

    def timer_callback(self):
        # 1. Generate a fake frame with OpenCV (black background 640x480)
        img = np.zeros((480, 640, 3), dtype=np.uint8)

        # 2. Draw changing text so we can see that the video is live
        text = f"Simulated Vision: Frame {self.frame_count}"
        cv2.putText(img, text, (50, 240), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        self.frame_count += 1

        # 3. Convert the OpenCV image (numpy array) to a ROS 2 Image Message
        msg = self.bridge.cv2_to_imgmsg(img, encoding="bgr8")

        # 4. Publish to the topic
        self.publisher_.publish(msg)


def main(args=None):
    rclpy.init(args=args)
    node = MockCamera()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

Step 3: Write the “Eyes” for the VLM (foundation of the future AI)

Sending 10–30 FPS video to the OpenAI API is the fastest way to burn through all limits and money in 5 minutes. That is why the AI robot architecture is built differently: we subscribe to the video stream, always keep only the freshest frame in memory, and send it to the API only once every few seconds (or when an important event/command happens).

Create vlm_agent_stub.py:

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge

class VLMAgentStub(Node):
    def __init__(self):
        super().__init__('vlm_agent')
        self.subscription = self.create_subscription(
            Image,
            '/camera/color/image_raw',
            self.image_callback,
            10 # QoS (queue size)
        )
        self.bridge = CvBridge()
        self.latest_cv_image = None

        # Timer that simulates calling OpenAI (for example, once every 3 seconds)
        self.timer = self.create_timer(3.0, self.process_vision_api)
        self.get_logger().info('AI Vision Agent started. Waiting for video stream...')

    def image_callback(self, msg):
        # This function is called 10 times per second.
        # We do NOT do heavy computations here, we only refresh the latest frame cache.
        self.latest_cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')

    def process_vision_api(self):
        # This function is called every 3 seconds.
        if self.latest_cv_image is not None:
            height, width, channels = self.latest_cv_image.shape
            self.get_logger().info(
                f'>>> [API CALL] Taking a fresh frame of size {width}x{height} '
                f'and "sending" it to OpenAI GPT-4o Vision...'
            )
            # In the future, the code here will be:
            # 1. cv2.imencode('.jpg', self.latest_cv_image)
            # 2. base64.b64encode
            # 3. client.chat.completions.create(...)
        else:
            self.get_logger().warning('No frames yet...')


def main(args=None):
    rclpy.init(args=args)
    node = VLMAgentStub()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

Step 4: Testing in terminals

  1. In the first terminal, run the virtual camera:
    pixi run python mock_camera.py
  2. In the second terminal, run the AI agent:
    pixi run python vlm_agent_stub.py

(Optional) ROS 2 (and RViz2) can display live camera feeds. 3. In the third terminal, open pixi run rviz2. 4. Click Add in the bottom left -> choose the By topic tab -> find /camera/color/image_raw -> choose Image -> click OK. A small window will appear in RViz showing your green text overlay.

Run both scripts (camera and agent) and watch the logs. The VLM agent should regularly report that it is “sending” frames.

You just implemented an asynchronous computer vision pipeline. In the future, instead of the stub text = ... in the frame, there will be a real image of your room, and instead of get_logger().info there will be a call to the openai library.

We are getting close to the finish line of our virtual build. We have “Muscles” (head Services and Ackermann chassis), we have “Vision” and “Touch” (Sonar). What is missing? Hearing, Voice, and Brain.


🔊 Lesson 8: Working with sound (Hearing and Voice)

Since your robot will use the OpenAI Realtime API (which works directly with streaming audio, bypassing local text recognition / STT on the robot), we need to prepare the “pipes” for audio transport.

On a real Raspberry Pi 5, audio from your USB microphone and output to the Adafruit I2S amplifier would be captured by libraries like sounddevice or pyaudio, packed into raw bytes (PCM 16-bit).

For now, to avoid dealing with your PC audio drivers inside the virtual environment, we will create topic stubs. Add the package for raw arrays in the terminal:

pixi add ros-jazzy-std-msgs

Create mock_audio.py. It will simulate a person speaking a command into the microphone every 10 seconds, and a “speaker” waiting for AI responses:

import rclpy
from rclpy.node import Node
from std_msgs.msg import ByteMultiArray, String
import time

class AudioSystem(Node):
    def __init__(self):
        super().__init__('audio_system')

        # Microphone (publishes audio bytes)
        self.mic_pub = self.create_publisher(ByteMultiArray, '/audio/mic_raw', 10)

        # Speaker (listens to audio bytes from the AI and “plays” them)
        self.speaker_sub = self.create_subscription(
            ByteMultiArray, '/audio/speaker_raw', self.speaker_callback, 10)

        self.timer = self.create_timer(10.0, self.simulate_user_speech)
        self.get_logger().info('Audio system (Microphone + Adafruit Speaker) started.')

    def simulate_user_speech(self):
        # Simulate that the user said something (for example, “Robot, what do you see?”)
        msg = ByteMultiArray()
        # In reality there will be a PCM 16-bit byte array here.
        # For testing we pack a fake string into bytes.
        msg.data = b"VOICE_COMMAND: Look around and tell me what you see!"

        self.get_logger().info('🎙️ A person said a phrase into the microphone...')
        self.mic_pub.publish(msg)

    def speaker_callback(self, msg):
        # In reality these bytes would be sent to the I2S Amp
        # Here we simply decode them for testing
        text = bytes(msg.data).decode('utf-8', errors='ignore')
        self.get_logger().info(f'🔊 SPEAKER PLAYS: {text}')


def main(args=None):
    rclpy.init(args=args)
    node = AudioSystem()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

🧠 Lesson 9: Central Orchestrator (AI Brain)

And now the most important part. We will write the central node that ties everything from the previous lessons together.

This node will receive audio from the microphone, grab the latest frame from the camera, check the sonar, send everything to OpenAI (for now virtually), and issue commands to the chassis, head, and speaker.

Create ai_brain.py:

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import ByteMultiArray, String, Float32
from example_interfaces.srv import AddTwoInts
import json

class AIBrain(Node):
    def __init__(self):
        super().__init__('ai_brain')

        # --- SUBSCRIPTIONS (Senses) ---
        self.create_subscription(Image, '/camera/color/image_raw', self.image_cb, 10)
        self.create_subscription(ByteMultiArray, '/audio/mic_raw', self.audio_cb, 10)
        self.create_subscription(Float32, '/sensor/sonar_front', self.sonar_cb, 10)

        # --- PUBLISHERS (Actions) ---
        self.drive_pub = self.create_publisher(String, '/ai_command', 10)
        self.speaker_pub = self.create_publisher(ByteMultiArray, '/audio/speaker_raw', 10)

        # --- SERVICE CLIENTS ---
        self.head_client = self.create_client(AddTwoInts, 'set_head_position')

        # Robot memory state
        self.latest_image = None
        self.front_distance = 999.0

        self.get_logger().info('🧠 AI Brain started and waiting for data...')

    def image_cb(self, msg):
        self.latest_image = "FRAME_IN_MEMORY" # In reality: intercepted via cv_bridge

    def sonar_cb(self, msg):
        self.front_distance = msg.data

    def audio_cb(self, msg):
        user_voice = bytes(msg.data).decode('utf-8')
        self.get_logger().info(f'🧠 Heard voice: {user_voice}')
        self.process_ai_reasoning()

    def process_ai_reasoning(self):
        self.get_logger().info('🧠 Sending [Voice + Photo + Sonar Data] to OpenAI Realtime API...')

        # --- SIMULATING A RESPONSE FROM OPENAI (LLM+VLM) ---
        # In reality the OpenAI Realtime API would send us an audio stream
        # and JSON with function calls for the motors.

        if self.front_distance < 1.0:
            ai_thought = "I see an obstacle nearby. I need to stop and say so."
            drive_cmd = "stop"
            voice_reply = b"I stopped because there is an obstacle right in front of me!"
        else:
            ai_thought = "The path is clear. I will turn my head to look and move forward."
            drive_cmd = "forward"
            voice_reply = b"Moving forward and scanning the area."
            # Request a head turn (asynchronously)
            self.turn_head(45, 0)

        self.get_logger().info(f'🧠 AI Decision: {ai_thought}')

        # Speak out loud
        speaker_msg = ByteMultiArray()
        speaker_msg.data = voice_reply
        self.speaker_pub.publish(speaker_msg)

        # Send a command to the chassis
        drive_msg = String()
        drive_msg.data = drive_cmd
        self.drive_pub.publish(drive_msg)

    def turn_head(self, pan, tilt):
        if not self.head_client.wait_for_service(timeout_sec=1.0):
            self.get_logger().warning('Head service unavailable!')
            return

        req = AddTwoInts.Request()
        req.a = pan
        req.b = tilt
        self.get_logger().info(f'🧠 Sending command to the head motors: Pan={pan}, Tilt={tilt}')
        self.head_client.call_async(req) # Async call


def main(args=None):
    rclpy.init(args=args)
    node = AIBrain()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

Grand system test! (Your task)

Now we will run the educational version of the architecture that reproduces the logic of the future robot, but still works on local mocks and stubs. You will need to open 5 terminals (in Windsurf you can split the panels) and launch our nodes from previous lessons in each of them:

  1. Terminal 1 (Sonar and Camera): (You can run them in one terminal using & or open two sub-terminals) pixi run python mock_sonar.py
  2. Terminal 2 (Hardware - Head and Chassis): pixi run python head_controller.py
  3. Terminal 3 (Hardware - Translator / E-Diff): pixi run python ai_driver.py
  4. Terminal 4 (Audio system): pixi run python mock_audio.py
  5. Terminal 5 (AI Brain): pixi run python ai_brain.py

What you should see in this symphony: Every 10 seconds mock_audio will “shout” into the microphone. ai_brain will hear it and check the distance from mock_sonar. If the distance is greater than 1 meter, the Brain will send a service request to head_controller (it will buzz its motors), send the command forward to ai_driver, and say “Moving forward” through the speaker. If the sonar reports < 1.0 meter, the Brain will say stop.

Run it, watch the logs in Terminal 5 (AI Brain) and Terminal 2/3 (Hardware). If it all connects — congratulations, you have fully designed the software architecture of your agent without real hardware.

In the previous lessons our robot understood only hardcoded text commands like "forward" or "left". But modern LLMs (especially combined with the OpenAI Realtime API) can produce structured outputs. Making an LLM write plain text and then parsing it with if/else is an anti-pattern in 2026.

Instead, we use Function Calling or Structured Outputs. We give the model a strict schema describing what the robot can do, and it returns exact JSON with numbers.


🧩 Lesson 10: Parsing commands from the LLM (Structured output)

In Python, the gold standard for validating data from an LLM is the Pydantic library. The OpenAI API natively supports generating responses directly into Pydantic schemas.

Now we use the llm_parser.py node, which accepts structured JSON from the LLM and turns it into direct commands for the chassis and head. This is a newer and stricter control path that can be used instead of the older text translator ai_driver.py.

Step 1: Add Pydantic to the environment

Open a terminal in Windsurf and add the library:

pixi add pydantic

Step 2: Create the parser-executor

Create llm_parser.py in the project root:

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from ackermann_msgs.msg import AckermannDriveStamped
from example_interfaces.srv import AddTwoInts
from pydantic import BaseModel, Field, ValidationError
from typing import Literal, Union, List

# --- DEFINE THE SCHEMA (OUR ROBOT API FOR THE LLM) ---

class DriveCommand(BaseModel):
    action: Literal["drive"]
    speed: float = Field(..., description="Speed in m/s. From -0.5 (backward) to 0.5 (forward)")
    steering: float = Field(..., description="Steering angle in radians. From -0.5 (right) to 0.5 (left)")

class HeadCommand(BaseModel):
    action: Literal["turn_head"]
    pan: int = Field(..., description="Head pan left-right (degrees, from -90 to 90)")
    tilt: int = Field(..., description="Head tilt up-down (degrees, from -90 to 90)")

class LlmResponse(BaseModel):
    thought: str = Field(..., description="Agent thoughts out loud: why it made this decision")
    commands: List[Union[DriveCommand, HeadCommand]] = Field(..., description="List of actions")

# --- ROS 2 NODE ---

class LlmCommandParser(Node):
    def __init__(self):
        super().__init__('llm_parser')

        # Listen for raw JSON from the LLM
        self.subscription = self.create_subscription(
            String, '/llm_raw_json', self.json_callback, 10)

        # Publisher directly into the Electronic Differential (bypassing the text translator)
        self.drive_pub = self.create_publisher(AckermannDriveStamped, '/drive', 10)

        # Client for controlling the head
        self.head_client = self.create_client(AddTwoInts, 'set_head_position')

        self.get_logger().info('🤖 LLM JSON Parser started. Waiting for structured commands...')

    def json_callback(self, msg):
        raw_json = msg.data

        try:
            # 1. Pydantic validates the JSON automatically.
            # If the LLM makes a mistake or hallucinates, the code throws an error and the robot does not go crazy.
            parsed_data = LlmResponse.model_validate_json(raw_json)

            self.get_logger().info(f'💡 AI Thought: "{parsed_data.thought}"')

            # 2. Execute the list of commands one by one
            for cmd in parsed_data.commands:
                if cmd.action == "drive":
                    self.execute_drive(cmd.speed, cmd.steering)
                elif cmd.action == "turn_head":
                    self.execute_head(cmd.pan, cmd.tilt)

        except ValidationError as e:
            self.get_logger().error(f'❌ LLM sent invalid JSON (Hallucination): {e}')

    def execute_drive(self, speed, steering):
        drive_msg = AckermannDriveStamped()
        drive_msg.drive.speed = speed
        drive_msg.drive.steering_angle = steering
        self.drive_pub.publish(drive_msg)
        self.get_logger().info(f'🚗 [CHASSIS] Command sent: speed {speed} m/s, steering {steering} rad')

    def execute_head(self, pan, tilt):
        if self.head_client.wait_for_service(timeout_sec=0.2):
            req = AddTwoInts.Request(a=pan, b=tilt)
            self.head_client.call_async(req)
            self.get_logger().info(f'👀 [HEAD] Request sent: Pan={pan}°, Tilt={tilt}°')
        else:
            self.get_logger().warning('⚠️ Head service is offline!')


def main(args=None):
    rclpy.init(args=args)
    node = LlmCommandParser()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

What is the magic in this code?

In the future, when you configure the OpenAI API, you will be able to use these schemas as the basis for Structured Outputs / Function Calling, so the model returns not free-form text but a validated data structure. Then the ROS 2 node can translate the model response directly into physical actions.

Step 3: Test the Parser

Let’s simulate a perfect response from the LLM. The model saw something interesting on the right, decided to turn the head there, and move closer slowly.

  1. In the first terminal run our E-Diff (Electronic differential), so it can receive the chassis command:
    pixi run python electronic_differential.py
  2. In the second terminal run the Head Server:
    pixi run python head_controller.py
  3. In the third terminal run our new Parser node:
    pixi run python llm_parser.py
  4. In the fourth terminal send the “model response” (JSON string) to the /llm_raw_json topic:
    pixi run ros2 topic pub --once /llm_raw_json std_msgs/msg/String '{data: "{\"thought\": \"I see an interesting shadow on the right. I will turn the camera and slowly move closer.\", \"commands\":[{\"action\": \"turn_head\", \"pan\": -45, \"tilt\": 10}, {\"action\": \"drive\", \"speed\": 0.2, \"steering\": -0.3}]}"}'

What will happen? The llm_parser.py parser will immediately understand this JSON, print the AI thought to the console, then asynchronously trigger the head_controller service and the /drive topic (where electronic_differential.py will gladly receive it and calculate wheel speeds).

This is the architectural peak for LLM-based robots. Try running this test.


Current project state

  • The repository is already useful for studying ROS 2 without a physical robot.
  • The core educational nodes already exist in code and match most of this guide.
  • OpenAI integration is currently shown at the architecture and stub level, not as a production-ready client.
  • The hardware bridge to the real robot is postponed for now and is intentionally not part of the current README.

About

A practical ROS 2 guide for an AI robot using LLM + VLM: nodes, topics, services, sensors, and architecture without real hardware

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages