A practical learning guide to ROS 2 using an AI robot example with an architecture built around LLM + VLM + sensors + actuator nodes. The main goal is to demonstrate core ROS 2 patterns through real examples that can be run even without real hardware.
- For people learning ROS 2 from scratch who want to quickly understand nodes, topics, services, and the overall architecture.
- For people building an AI robot who want to validate the logic locally first and later move it to a real platform.
This guide grew out of a real ROS 2 mobile robot project where the role of the “brain” is handled by an AI agent built on the OpenAI API: a combination of LLM + VLM, the Realtime API, and the regular API for logic, vision, speech, and decision-making.
The target hardware platform for the project is:
- Chassis and motion — Wheeltec Ackermann, Pololu Dual driver on VNH5019, Raspberry Pi Pico 2 controller.
- Compute center — Raspberry Pi 5 (8 GB RAM) with Active Cooler.
- Storage — SanDisk Extreme microSDXC 128 GB.
- Vision and head — Orbbec Gemini 335 3D camera, Hiwonder 2 DoF pan-tilt bracket, LSC-16 servo controller.
- Interfaces and safety — front and rear ToF sonars, USB microphone, Adafruit MAX98357A I2S amplifier, and speaker.
This repository is not meant to demonstrate hardware, but to provide a clear and reproducible way to study ROS 2. That is why the focus here is on architecture and educational stub nodes: camera, audio, sonar, AI modules, and basic ROS 2 scenarios that can be run locally. First — understanding the architecture in a safe sandbox, then — moving it to a real robot.
mock_sonar.py— a virtual ToF sonar that publishes distance to/sensor/sonar_front.safety_stop.py— a simple safety node that reacts to a nearby obstacle.head_controller.py— a pan-tilt head control service viaset_head_position.mock_camera.py— a virtual camera that publishessensor_msgs/Imageto/camera/color/image_raw.vlm_agent_stub.py— a stub VLM node that takes the latest frames and simulates sending them to a vision model.mock_audio.py— microphone and speaker emulation through ROS 2 topics.ai_driver.py— a translator from simple text AI commands toAckermannDriveStamped.electronic_differential.py— calculates left and right wheel speeds for an Ackermann chassis.ai_brain.py— an educational central AI node that orchestrates sensors, speech, services, and movement commands.llm_parser.py— a parser for structured JSON responses from an LLM with validation via Pydantic.
- A basic ROS 2 node graph — who publishes, who subscribes, who calls services.
- Sensors in simulation — camera, sonar, audio input, and audio output.
- AI orchestration — how the central node receives data and initiates actions.
- Ackermann kinematics — how an abstract movement command becomes chassis parameters.
- Preparation for LLM Function Calling — how to safely accept structured commands from the model.
This repository shows the educational and architectural foundation. Some nodes here are intentionally implemented as mocks, stubs, and demonstration scenarios, not as production-ready integrations with real drivers, the OpenAI API, and hardware. This is intentional so that the entry path into ROS 2 remains simple, visual, and reproducible.
Many people think ROS (Robot Operating System) is an operating system like Windows or Ubuntu. In reality, it is a set of libraries and tools (framework/middleware) that helps developers build robot applications.
The main task of ROS 2 is to make different pieces of hardware (LiDARs, cameras, motors) and software (vision algorithms, navigation) exchange data quickly, reliably, and through a unified standard.
Below are the key terms you need to know before writing code.
Analogy: Employees in a large company or organs in the human body.
A Node is the minimum program unit in ROS 2 that performs one specific task. A proper robot architecture consists of many small nodes, not one huge program.
- Example 1: The
camera_nodeonly receives an image from the camera and passes it along. - Example 2: The
wheel_motor_nodeonly spins a wheel. - Example 3: The
brain_nodemakes decisions.
If one node crashes, for example if the camera driver hangs, the others keep working.
Analogy: A radio station and radio listeners.
Nodes need to communicate. The main communication mechanism in ROS 2 is Topics. This is one-way continuous data transfer (Publish / Subscribe).
- Publisher: A node that sends data to a topic.
- Subscriber: A node that reads data from a topic.
- Message: A strict data format sent through a topic, for example a number, a string, a laser data array, or an image.
Example: A camera node (Publisher) continuously broadcasts video to the /video_stream topic. A face recognition node (Subscriber) and a video recording node (Subscriber) both read that topic at the same time. The camera does not care who listens to it or whether anyone listens at all.
Analogy: Calling a waiter in a restaurant.
Topics stream data continuously, for example 30 times per second. But what if we only need to ask a question and receive an answer once? That is what Services are for.
Services work using the Request -> Response principle. This is a synchronous operation: until the client gets a response from the server, it waits.
- Example: A navigation node asks a map node: “Give me the current room map” (Request). The map node sends back the file (Response).
Analogy: Ordering pizza delivery. You place the order, receive periodic status updates like “Preparing” and “Courier on the way”, and in the end the courier delivers the pizza.
Actions are needed for long-running tasks. Unlike Services, they do not block the system while the task is running, and they can send intermediate progress (Feedback). An Action can be canceled at any time.
- Example: You send the robot the command: “Drive to the next room.” The robot starts moving, sends Feedback once per second like “5 meters left... 4 meters...”, and at the end sends Result: “I’m here!”.
All code in ROS 2 is strictly organized into folders.
- Workspace: The main folder of your project, meaning the repository root. Inside it there is usually a
srcfolder containing the source code. - Package: A folder inside
srcthat contains related nodes, libraries, and settings. A package is what developers share on GitHub. Packages can be written in C++ or Python. - Colcon: The build tool. It takes source code from the
srcfolder and turns it into working programs (builds/compiles it).
ROS 2 has a very powerful command-line interface. Here are the commands you will use every day, assuming you run them in a terminal with the ROS 2 environment activated:
ros2 node list— show all running nodes.ros2 node info /node_name— show detailed information about a node, including which topics it reads and writes.
ros2 topic list— show all active topics in the system.ros2 topic echo /topic_name— print the data currently flowing through the topic to the terminal. Very useful for debugging.ros2 topic hz /topic_name— check the frequency, that is, how many messages per second arrive in the topic.
ros2 run <package_name> <executable_name>— run a specific node from a package, for exampleros2 run demo_nodes_cpp talker.ros2 launch <package_name> <launch_file_name>.py— run several nodes at once with predefined settings. This is used to launch the entire robot.
- Write nodes that do one thing, but do it well.
- Connect them with topics for continuous data flow.
- Use services for quick one-time requests.
- Use actions for long physical robot actions.
- Build everything with colcon and share your packages.
This method solves the main ROS 2 problem on modern Linux systems — pain with graphics drivers (Wayland, Intel Iris) when using Docker.
Why Pixi?
- 100% isolation: ROS 2 is installed locally inside your project folder (
.pixi). Your main system stays perfectly clean. No dependency conflicts. - Native graphics: RViz2 and Gazebo run instantly using Ubuntu’s native drivers. No need to forward
xhost,DISPLAY, or struggle with Docker flags. - One-click removal: Want to remove ROS? Just delete the project folder.
Open a terminal and run the installation (if Pixi is not installed yet):
curl -fsSL https://pixi.sh/install.sh | bashpixi command becomes available.
Standard conda channels may not contain fresh Jazzy builds. We will connect the direct prefix.dev server.
If you cloned this repository from GitHub, just go to the project folder and initialize the environment:
git clone https://github.com/pipedude/ros2-ai-robot-guide.git
cd ros2-ai-robot-guide
pixi initAdd the correct package sources (order matters):
pixi project channel add https://prefix.dev/robostack-jazzy
pixi project channel add conda-forgeInstall the full ROS 2 Jazzy version (Desktop) and the colcon tool for building our future code. From the project root, run:
pixi add ros-jazzy-desktop colcon-common-extensions(The download will take a few minutes. Around 1.5–2 GB of data will be downloaded: Python, ROS, Qt libraries, and simulators.)
Let’s check whether the environment detected your graphics card:
pixi run rviz2If a window with a 3D grid opens (even if there is a No tf data warning) — congratulations, the graphics subsystem is working perfectly. Close the window, or press Ctrl+C in the terminal, to continue.
Now let’s write code. To avoid typing pixi run before every command, let’s “enter” our isolated environment:
pixi shell(A prefix with your project name will appear on the left side of the terminal.)
If you work in Windsurf, it is convenient to immediately open the current project folder from the terminal:
windsurf .The dot at the end means: open the current project folder.
1. Create the workspace and package:
mkdir src && cd src
ros2 pkg create --build-type ament_python my_first_package --dependencies rclpy std_msgs2. Write the node code:
Go to src/my_first_package/my_first_package/ and create hello_node.py there. Paste this code:
import rclpy
from rclpy.node import Node
class HelloNode(Node):
def __init__(self):
super().__init__('hello_node')
self.get_logger().info("Hello, ROS 2! The robot has woken up!")
self.count = 1
# Timer: call the function every second
self.create_timer(1.0, self.timer_callback)
def timer_callback(self):
self.get_logger().info(f"I have been working for {self.count} seconds already. All systems normal!")
self.count += 1
def main(args=None):
rclpy.init(args=args)
node = HelloNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()3. Register the script:
Open setup.py (it is located in src/my_first_package/) and find the entry_points block at the bottom. Change it like this:
entry_points={
'console_scripts':[
'hello = my_first_package.hello_node:main'
],
},If you are currently in the src folder, go one level up — back to the project root — and build the code:
cd ..
colcon buildLoad the newly built files into the current session:
source install/setup.bashRun our robot:
ros2 run my_first_package helloExpected terminal output:
[INFO] [hello_node]: Hello, ROS 2! The robot has woken up!
[INFO] [hello_node]: I have been working for 1 seconds already. All systems normal!
[INFO] [hello_node]: I have been working for 2 seconds already. All systems normal!
🎉 Excellent. Now you have a powerful, reliable, and completely safe environment for your OS to study robotics.
Below is the current learning path for the repository in its present state. It is tied to the nodes and scenarios that already exist in the project or are directly prepared within this guide.
Module 1: Basic skeleton (Environment and Nodes)
- Lesson 1 (Starting today): Initialize ROS 2 through Pixi, create a workspace, and write your first Python nodes in Windsurf.
- Lesson 2: Topics. Create a virtual ToF sonar (Publisher) and a safety node (Subscriber) that will shout “Stop!” if an obstacle is close.
- Lesson 3: Services and Actions. Write a service for the pan-tilt “head” (turn the camera to a target angle) and an action for long-running tasks (“drive to a point”).
Module 2: Kinematics and Control (Ackermann chassis)
- Lesson 4: Control messages. Study
ackermann_msgs. Learn how to convert commands from AI (forward,left,right) into a steering angle and speed for the Pico 2 + VNH5019 setup. - Lesson 5: TF2 (Transforms). Understand the robot coordinate systems: how the “eyes” (the Gemini 335 camera) are positioned relative to the wheel center.
- Lesson 6: Visualization. Launch RViz2. Display our robot and “fake” sensor data virtually without real hardware.
Module 3: Senses (Vision and Hearing)
- Lesson 7: Working with images. Subscribe to
sensor_msgs/Image. Learn how to intercept frames in Python to send them to a VLM (OpenAI Vision). - Lesson 8: Working with sound in ROS 2. Learn how to pack a microphone audio stream into topics and how to play sound (LLM responses) through an Adafruit I2S amplifier.
Module 4: AI Orchestrator (Project core)
- Lesson 9: Educational AI brain and orchestration. Break down
ai_brain.py, which listens to audio and sensors, calls the head service, and publishes movement commands. - Lesson 10: Parsing commands from an LLM in ROS 2. Learn how to make an LLM produce structured JSON commands that
llm_parser.pycan translate into Ackermann movement and servo turns.
Important: The bridge to real hardware, Micro-ROS, running the Orbbec Gemini on Raspberry Pi 5, and hardware integration will be added later. They are intentionally not included in the current version of this guide.
In the Windsurf IDE (you can use VS Code too, but I like Windsurf), we will use pixi for dependency management. This saves us from dealing with painful system-level Python and ROS 2 dependencies.
Step 1: Project initialization Open a terminal in Windsurf and move to the project folder:
# If the repository is already cloned from GitHub
cd ros2-ai-robot-guide
# If you are starting from scratch without cloning
# mkdir my_project && cd my_project
pixi initNow add the https://prefix.dev/robostack-jazzy and conda-forge channels (as in the current pixi.toml) and install the base ROS 2 Jazzy and Python packages:
pixi project channel add https://prefix.dev/robostack-jazzy
pixi project channel add conda-forge
# Install the base ROS 2 packages and build tools
pixi add ros-jazzy-desktop ros-jazzy-rclpy compilers cmake pkg-configStep 2: Create the first script (virtual “Brain” and “Sonar”)
Before we create a more complex ROS package structure (with colcon), let’s write a simple script to understand the essence of rclpy (ROS Client Library for Python).
Create the file mock_sonar.py in the project root:
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
import random
class MockSonarNode(Node):
def __init__(self):
# Node name in the ROS 2 graph
super().__init__('mock_sonar_front')
# Create a Publisher: publish distance (Float32) to '/sensor/sonar_front'
self.publisher_ = self.create_publisher(Float32, '/sensor/sonar_front', 10)
# Timer that calls the function every 1 second
timer_period = 1.0
self.timer = self.create_timer(timer_period, self.timer_callback)
self.get_logger().info('Virtual front ToF sonar started!')
def timer_callback(self):
msg = Float32()
# Simulate distance from 0.2 to 3.0 meters
msg.data = random.uniform(0.2, 3.0)
self.publisher_.publish(msg)
self.get_logger().info(f'Publishing distance: {msg.data:.2f} m')
def main(args=None):
rclpy.init(args=args)
node = MockSonarNode()
try:
rclpy.spin(node) # Spin until Ctrl+C is pressed
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()Step 3: Run it Since we use Pixi, we need to run the script inside its environment:
pixi run python mock_sonar.pyYou should see logs in the terminal showing your virtual sonar (future ToF) publishing data.
- Follow these steps in Windsurf.
- Check whether the script works.
- Extra challenge: While the script is running, open a second terminal in Windsurf, activate the pixi shell (
pixi shell), and runros2 topic list, thenros2 topic echo /sensor/sonar_front.
What just happened “under the hood”?
In ROS 2 there is no central “server” (as there was in ROS 1). When you launched your Python script, it created a Publisher node. When you ran ros2 topic echo, ROS 2 temporarily created an unnamed Subscriber node. Thanks to DDS (Data Distribution Service), they automatically “found” each other on the local network (in this case inside your OS) and started exchanging Float32 messages through the /sensor/sonar_front channel (topic).
Since your robot will be controlled by an AI agent (LLM), we need to account for network delays and possible model hallucinations. If the LLM says “Drive forward” while there is a wall in front of the robot, the robot should not wait for the AI — it needs a basic reflex.
Now we will write a subscriber node that listens to our sonar and shouts “STOP!” if the distance is less than 0.5 meters.
Step 1: Create the subscriber script
In your project root (the same place as mock_sonar.py), create a new file safety_stop.py:
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
class SafetyNode(Node):
def __init__(self):
super().__init__('safety_node')
# Create Subscriber.
# Arguments: Message type, Topic name, Callback function, Queue size (QoS)
self.subscription = self.create_subscription(
Float32,
'/sensor/sonar_front',
self.sonar_callback,
10
)
self.subscription # prevent garbage collection of the variable
# Set the critical distance in meters
self.critical_distance = 0.5
self.get_logger().info('Safety node started. Guarding the perimeter!')
def sonar_callback(self, msg):
# This function is called AUTOMATICALLY every time a new message arrives in the topic
distance = msg.data
if distance < self.critical_distance:
# Use the WARN logging level (it will be highlighted)
self.get_logger().warning(f'EMERGENCY STOP! Obstacle at distance {distance:.2f} m!')
else:
self.get_logger().info(f'Path is clear (obstacle at {distance:.2f} m).')
def main(args=None):
rclpy.init(args=args)
safety_node = SafetyNode()
try:
rclpy.spin(safety_node) # Wait for messages indefinitely
except KeyboardInterrupt:
pass
finally:
safety_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()Step 2: Run the pair
- In the first terminal, run the sonar:
pixi run python mock_sonar.py
- In the second terminal, run the safety node:
pixi run python safety_stop.py
What should happen: As long as the distance is above 0.5 meters, the node logs that the path is clear. When the sonar publishes a smaller value, the safety node immediately prints a warning.
This is a simple but extremely important architectural layer for your future robot.
How is this different from Topics?
- A Topic is a radio station. The sonar (Publisher) endlessly broadcasts “Distance 1 meter!”, and the Safety node (Subscriber) listens. This is a continuous data stream without delivery guarantees.
- A Service is a phone call (Request -> Response). The client says: “Turn the head 45 degrees to the right.” The server does it and replies: “Done, turned it.”
For your Hiwonder 2 DoF Pan Tilt bracket (where one servo turns the head left-right / Pan and the other up-down / Tilt), services are a perfect fit.
Your future AI agent (LLM) will not constantly stream coordinates. It will say once: “Look at the person on the left” -> call the Service -> wait for the response that the camera has turned -> then grab a frame from the Orbbec Gemini.
Since we have not yet created our own message type (that will require colcon, which we will reach later), we will use the standard ROS 2 service type AddTwoInts.
Let’s “cheat” and pretend that:
- Number
a— is the Pan angle (left-right, from -90 to 90 degrees) - Number
b— is the Tilt angle (up-down, from -90 to 90 degrees)
Create head_controller.py in the project root:
import rclpy
from rclpy.node import Node
# Import the standard service type (Request: a, b. Response: sum)
from example_interfaces.srv import AddTwoInts
import time
class HeadControllerService(Node):
def __init__(self):
super().__init__('head_controller')
# Create the Service Server
# Arguments: Service type, Service name, Handler function
self.srv = self.create_service(AddTwoInts, 'set_head_position', self.move_head_callback)
# Current angles (the robot is looking straight)
self.current_pan = 0
self.current_tilt = 0
self.get_logger().info('Pan-Tilt head controller started. Waiting for commands (Services)...')
def move_head_callback(self, request, response):
# This function is called when someone “calls” this service
target_pan = request.a
target_tilt = request.b
self.get_logger().info(f'Received command: turn the head to Pan={target_pan}°, Tilt={target_tilt}°')
# Simulate the physical motion time of the LSC-16 servos (1 second)
self.get_logger().info('Motors buzzing...')
time.sleep(1.0)
# Update the current coordinates
self.current_pan = target_pan
self.current_tilt = target_tilt
# Build the response.
# In AddTwoInts the response is stored in the "sum" field.
# We will write 1 there to indicate success.
response.sum = 1
self.get_logger().info('Motion complete. Sending response.')
return response
def main(args=None):
rclpy.init(args=args)
head_controller = HeadControllerService()
try:
rclpy.spin(head_controller)
except KeyboardInterrupt:
pass
finally:
head_controller.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()In the first terminal, run our new node:
pixi run python head_controller.pyOpen a second terminal (make sure you are in the pixi shell environment or use the pixi run prefix).
ROS 2 has a fantastic testing utility — we can become a “client” right from the command line.
Enter the command that tells the robot to look right (45°) and slightly up (15°):
pixi run ros2 service call /set_head_position example_interfaces/srv/AddTwoInts "{a: 45, b: 15}"What should happen:
- In the second terminal you will see the request and, after one second, the response
response: example_interfaces.srv.AddTwoInts_Response(sum=1). - In the first terminal (where the script is running) you will see logs: command received, motor buzzing
Motors buzzing..., and a success report.
Try calling the service a few times with different angles. This is exactly the action your LLM will perform through Python code when it decides to “look around.”
How this fits into your future AI use case:
In the future your LLM, through the OpenAI API, will generate JSON like {"action": "look_around", "pan": 45, "tilt": 15}. Your central Python script will parse that JSON and call this exact ROS 2 service. Until the robot head physically turns, the LLM will not receive a frame from the Orbbec Gemini camera.
(Note: ROS 2 also has Actions. They are like Services, but for very long-running tasks that can be canceled — for example, “drive to another room.” We will study them a bit later when we get to navigation.)
Now we move on to the most interesting part.
Your Wheeltec chassis uses Ackermann steering. That means your robot is controlled like a real car: it has front steering wheels (which define the steering angle) and rear drive wheels (which define the speed). It cannot spin in place like a robot vacuum.
In ROS 2 there is a dedicated message standard for such robots — ackermann_msgs. In the future your Raspberry Pi Pico 2 controller will subscribe to these messages and convert them into PWM signals for the Pololu VNH5019 driver.
Let’s write a Translator node that listens to simple text commands from the AI (for example, forward, left) and converts them into physical chassis commands.
Since ackermann_msgs is not part of the most minimal installation, we need to add it to our pixi environment. Open a terminal and run:
pixi add ros-jazzy-ackermann-msgsCreate ai_driver.py in the project root:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from ackermann_msgs.msg import AckermannDriveStamped
class AIDriverNode(Node):
def __init__(self):
super().__init__('ai_driver_translator')
# Subscriber for text commands (Topic: /ai_command)
self.subscription = self.create_subscription(
String, '/ai_command', self.command_callback, 10)
# Publisher of physical commands for the chassis (Topic: /drive)
self.publisher = self.create_publisher(
AckermannDriveStamped, '/drive', 10)
self.get_logger().info('AI -> Ackermann translator started. Waiting for text commands on /ai_command')
def command_callback(self, msg):
command = msg.data.lower()
# Create an empty Ackermann message
drive_msg = AckermannDriveStamped()
# Logic for translating text into physics:
# speed (m/s) - linear speed
# steering_angle (radians) - steering wheel angle
if command == 'forward':
drive_msg.drive.speed = 0.5
drive_msg.drive.steering_angle = 0.0
elif command == 'left':
drive_msg.drive.speed = 0.3
drive_msg.drive.steering_angle = 0.5 # About 28 degrees left
elif command == 'right':
drive_msg.drive.speed = 0.3
drive_msg.drive.steering_angle = -0.5 # About 28 degrees right
elif command == 'backward':
drive_msg.drive.speed = -0.3
drive_msg.drive.steering_angle = 0.0
elif command == 'stop':
drive_msg.drive.speed = 0.0
drive_msg.drive.steering_angle = 0.0
else:
self.get_logger().warning(f'AI issued an unknown command: {command}')
return
self.publisher.publish(drive_msg)
self.get_logger().info(f'AI said "{command}". Driving: Speed={drive_msg.drive.speed} m/s, Steering={drive_msg.drive.steering_angle} rad')
def main(args=None):
rclpy.init(args=args)
node = AIDriverNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()-
In the first terminal run the translator:
pixi run python ai_driver.py
-
In the second terminal we will play the role of the AI. Send the command
leftto the/ai_commandtopic:pixi run ros2 topic pub --once /ai_command std_msgs/msg/String "{data: 'left'}"Try sending different commands:
forward,backward,stop,jump(to test an unknown command). -
(Optional) In the third terminal you can see how these commands are converted into the motor-friendly format:
pixi run ros2 topic echo /drive
Testing: Send a command through the terminal and look at the output. But there is an important nuance: our chassis uses two independent rear motors without a mechanical differential. How do we handle that?
Solution for two motors without a differential: A classic car uses a mechanical differential. If the rear axle is rigidly locked, the wheels will slip, the robot will “hop” in turns, and the motors plus the VNH5019 driver will overheat.
We will implement an Electronic Differential (E-Diff).
The ros-jazzy-ackermann-msgs package describes the ideal kinematic model. In the AckermannDrive message we pass the desired speed of the robot center and the desired steering angle of the front wheels.
The AI agent does not need to know how many motors you have. It simply says: “Drive forward at 0.5 m/s and turn the wheel by 0.3 radians.”
Then, between the /drive topic and the Raspberry Pi Pico 2 controller, we insert a special node — the Kinematics Solver.
Here is how it works mathematically:
Let
- Left wheel speed:
$V_{left} = V \cdot (1 - \frac{W}{2R})$ - Right wheel speed:
$V_{right} = V \cdot (1 + \frac{W}{2R})$
This is a great continuation of Lesson 4. We will write a script that listens to AckermannDrive from the previous step and calculates speeds for the left and right motor.
Create electronic_differential.py in the project folder:
import rclpy
from rclpy.node import Node
from ackermann_msgs.msg import AckermannDriveStamped
import math
class ElectronicDifferential(Node):
def __init__(self):
super().__init__('electronic_differential')
# Subscribe to commands from AI (or navigation)
self.subscription = self.create_subscription(
AckermannDriveStamped, '/drive', self.drive_callback, 10)
# Approximate dimensions of your Wheeltec chassis in meters (measure later)
self.wheelbase = 0.25 # L: distance between front and rear axle
self.track_width = 0.15 # W: distance between the left and right rear wheel
self.get_logger().info('Electronic differential started. Waiting for /drive commands...')
def drive_callback(self, msg):
v = msg.drive.speed
delta = msg.drive.steering_angle
# If we drive straight or stand still (steering angle close to zero)
if abs(delta) < 0.01:
v_left = v
v_right = v
else:
# Turning radius of the rear axle center
R = self.wheelbase / math.tan(delta)
# Calculate the speed for each wheel
v_left = v * (1 - self.track_width / (2 * R))
v_right = v * (1 + self.track_width / (2 * R))
# In the future this node will send v_left, v_right, and delta
# to the Raspberry Pi Pico 2 via Serial or I2C.
# For now we just log it nicely:
if v == 0:
status = "STOPPED"
elif delta > 0.01:
status = "TURNING LEFT"
elif delta < -0.01:
status = "TURNING RIGHT"
else:
status = "MOVING STRAIGHT"
self.get_logger().info(
f'[{status}] left={v_left:.2f} m/s, right={v_right:.2f} m/s, steer={delta:.2f} rad'
)
def main(args=None):
rclpy.init(args=args)
node = ElectronicDifferential()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()- Run
ai_driver.pyfrom the previous step in the first terminal. - Run
electronic_differential.pyin the second terminal. - In the third terminal, send the command
leftthrough the topic (as we did in the previous lesson):pixi run ros2 topic pub --once /ai_command std_msgs/msg/String "{data: 'left'}"
What you will see in the second terminal:
When the command left arrives, ai_driver.py will say: “drive at speed 0.3 and turn by 0.5 rad.”
The new node will catch that and calculate that the left motor must spin slower and the right motor faster.
This architectural layer decouples the high-level AI from the physics of specific hardware. When the Raspberry Pi Pico 2 arrives, this code can either move into the microcontroller or stay in Python on the Raspberry Pi 5, sending ready-to-use wheel speeds.
Try running it and sending a turning command. It is a great feeling when the math comes alive in the logs.
We finish the second module (Kinematics and Control) with a crucial topic — without it, computer vision (VLM) and navigation simply cannot work.
Why does your AI robot need this? Imagine your Orbbec Gemini 335 3D camera sees an obstacle exactly 1 meter away. The VLM model says: “Obstacle at distance 1 meter.” But the camera is mounted on a bracket at the front of the robot, while the wheel rotation center (from which the robot computes its coordinates) is 20 centimeters behind it. If the robot drives forward by 1 meter, it will crash into the obstacle.
To prevent this, ROS 2 has TF2 (Transform Framework). This is a system that stores a “tree” of coordinate frames. We will have at least two points:
base_link— the center of the robot (between the rear wheels).camera_link— the eye of the camera.
Let’s look at this with our own eyes in a virtual space, since we do not yet have the real hardware.
RViz2 (ROS Visualization) is a very powerful 3D tool. It lets you see what the robot “thinks.”
Open a terminal in Windsurf and run:
pixi run rviz2A black window with a grid will open. This is the empty mind of your robot.
- In the left panel (Displays), find the Fixed Frame field. Right now it says
map. Change it tobase_link(type it manually and press Enter). - Click Add (bottom left) -> choose the By topic tab -> find
/tf-> choose TF -> click OK. Nothing will appear yet because we have not created our frames. Do not close RViz2.
Since the bracket is rigidly attached to the chassis, the distance between the robot center and the head base does not change. For this, ROS 2 has a built-in node, and we do not even need to write Python code.
Open a second terminal in Windsurf and enter this command (it launches the standard ROS 2 node for publishing static transforms):
pixi run ros2 run tf2_ros static_transform_publisher --x 0.15 --y 0.0 --z 0.25 --yaw 0 --pitch 0 --roll 0 --frame-id base_link --child-frame-id camera_linkWhat do these numbers mean? We are telling ROS 2: "The camera (camera_link) is 15 cm in front (x 0.15) and 25 cm above (z 0.25) the robot center (base_link)".
Go back to the RViz2 window. You should see coordinate axes appear in the center of the grid.
- One coordinate frame in the center of the grid — that is your
base_link(center of the rear axle). - A second coordinate frame slightly in front and above — that is your
camera_link(where the Orbbec Gemini will be mounted). - A yellow line between them showing their relationship.
(Red arrow = X axis / forward, Green = Y axis / left, Blue = Z axis / up).
- Run
rviz2and thestatic_transform_publishercommand. - Configure TF display in RViz2.
- Take a screenshot of this 3D space.
Why is this so useful for your VLM agent? When the VLM (OpenAI) recognizes an object in the camera image (for example, “I see a ball 1 meter away”), ROS 2 will automatically recalculate through the TF framework: “Okay, the ball is 1 meter from the camera, so it is 1.15 meters from the wheels. It is safe to drive.” You will not need to calculate those offsets manually in code.
Now we move to Lesson 7. Since we do not yet have the physical Orbbec Gemini 335 camera, we will write a Virtual Camera that generates a video stream. Then we will write a VLM agent stub node that intercepts those frames.
In the ROS 2 + Python world, the gold standard for working with images is the combination of OpenCV and the cv_bridge library (it converts images from the OpenCV format into ROS 2 sensor_msgs/Image messages and back).
Open a terminal in Windsurf (make sure you are in the project root) and add the necessary packages to the Pixi environment:
pixi add ros-jazzy-cv-bridge ros-jazzy-sensor-msgs opencvCreate mock_camera.py:
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import numpy as np
class MockCamera(Node):
def __init__(self):
super().__init__('mock_camera')
# Publish to the standard topic for Orbbec color cameras
self.publisher_ = self.create_publisher(Image, '/camera/color/image_raw', 10)
# Timer for 10 FPS (1 frame every 0.1 sec)
self.timer = self.create_timer(0.1, self.timer_callback)
self.bridge = CvBridge()
self.frame_count = 0
self.get_logger().info('Virtual Orbbec camera started (10 FPS)...')
def timer_callback(self):
# 1. Generate a fake frame with OpenCV (black background 640x480)
img = np.zeros((480, 640, 3), dtype=np.uint8)
# 2. Draw changing text so we can see that the video is live
text = f"Simulated Vision: Frame {self.frame_count}"
cv2.putText(img, text, (50, 240), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
self.frame_count += 1
# 3. Convert the OpenCV image (numpy array) to a ROS 2 Image Message
msg = self.bridge.cv2_to_imgmsg(img, encoding="bgr8")
# 4. Publish to the topic
self.publisher_.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = MockCamera()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()Sending 10–30 FPS video to the OpenAI API is the fastest way to burn through all limits and money in 5 minutes. That is why the AI robot architecture is built differently: we subscribe to the video stream, always keep only the freshest frame in memory, and send it to the API only once every few seconds (or when an important event/command happens).
Create vlm_agent_stub.py:
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
class VLMAgentStub(Node):
def __init__(self):
super().__init__('vlm_agent')
self.subscription = self.create_subscription(
Image,
'/camera/color/image_raw',
self.image_callback,
10 # QoS (queue size)
)
self.bridge = CvBridge()
self.latest_cv_image = None
# Timer that simulates calling OpenAI (for example, once every 3 seconds)
self.timer = self.create_timer(3.0, self.process_vision_api)
self.get_logger().info('AI Vision Agent started. Waiting for video stream...')
def image_callback(self, msg):
# This function is called 10 times per second.
# We do NOT do heavy computations here, we only refresh the latest frame cache.
self.latest_cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
def process_vision_api(self):
# This function is called every 3 seconds.
if self.latest_cv_image is not None:
height, width, channels = self.latest_cv_image.shape
self.get_logger().info(
f'>>> [API CALL] Taking a fresh frame of size {width}x{height} '
f'and "sending" it to OpenAI GPT-4o Vision...'
)
# In the future, the code here will be:
# 1. cv2.imencode('.jpg', self.latest_cv_image)
# 2. base64.b64encode
# 3. client.chat.completions.create(...)
else:
self.get_logger().warning('No frames yet...')
def main(args=None):
rclpy.init(args=args)
node = VLMAgentStub()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()- In the first terminal, run the virtual camera:
pixi run python mock_camera.py
- In the second terminal, run the AI agent:
pixi run python vlm_agent_stub.py
(Optional)
ROS 2 (and RViz2) can display live camera feeds.
3. In the third terminal, open pixi run rviz2.
4. Click Add in the bottom left -> choose the By topic tab -> find /camera/color/image_raw -> choose Image -> click OK.
A small window will appear in RViz showing your green text overlay.
Run both scripts (camera and agent) and watch the logs. The VLM agent should regularly report that it is “sending” frames.
You just implemented an asynchronous computer vision pipeline. In the future, instead of the stub text = ... in the frame, there will be a real image of your room, and instead of get_logger().info there will be a call to the openai library.
We are getting close to the finish line of our virtual build. We have “Muscles” (head Services and Ackermann chassis), we have “Vision” and “Touch” (Sonar). What is missing? Hearing, Voice, and Brain.
Since your robot will use the OpenAI Realtime API (which works directly with streaming audio, bypassing local text recognition / STT on the robot), we need to prepare the “pipes” for audio transport.
On a real Raspberry Pi 5, audio from your USB microphone and output to the Adafruit I2S amplifier would be captured by libraries like sounddevice or pyaudio, packed into raw bytes (PCM 16-bit).
For now, to avoid dealing with your PC audio drivers inside the virtual environment, we will create topic stubs. Add the package for raw arrays in the terminal:
pixi add ros-jazzy-std-msgsCreate mock_audio.py. It will simulate a person speaking a command into the microphone every 10 seconds, and a “speaker” waiting for AI responses:
import rclpy
from rclpy.node import Node
from std_msgs.msg import ByteMultiArray, String
import time
class AudioSystem(Node):
def __init__(self):
super().__init__('audio_system')
# Microphone (publishes audio bytes)
self.mic_pub = self.create_publisher(ByteMultiArray, '/audio/mic_raw', 10)
# Speaker (listens to audio bytes from the AI and “plays” them)
self.speaker_sub = self.create_subscription(
ByteMultiArray, '/audio/speaker_raw', self.speaker_callback, 10)
self.timer = self.create_timer(10.0, self.simulate_user_speech)
self.get_logger().info('Audio system (Microphone + Adafruit Speaker) started.')
def simulate_user_speech(self):
# Simulate that the user said something (for example, “Robot, what do you see?”)
msg = ByteMultiArray()
# In reality there will be a PCM 16-bit byte array here.
# For testing we pack a fake string into bytes.
msg.data = b"VOICE_COMMAND: Look around and tell me what you see!"
self.get_logger().info('🎙️ A person said a phrase into the microphone...')
self.mic_pub.publish(msg)
def speaker_callback(self, msg):
# In reality these bytes would be sent to the I2S Amp
# Here we simply decode them for testing
text = bytes(msg.data).decode('utf-8', errors='ignore')
self.get_logger().info(f'🔊 SPEAKER PLAYS: {text}')
def main(args=None):
rclpy.init(args=args)
node = AudioSystem()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()And now the most important part. We will write the central node that ties everything from the previous lessons together.
This node will receive audio from the microphone, grab the latest frame from the camera, check the sonar, send everything to OpenAI (for now virtually), and issue commands to the chassis, head, and speaker.
Create ai_brain.py:
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import ByteMultiArray, String, Float32
from example_interfaces.srv import AddTwoInts
import json
class AIBrain(Node):
def __init__(self):
super().__init__('ai_brain')
# --- SUBSCRIPTIONS (Senses) ---
self.create_subscription(Image, '/camera/color/image_raw', self.image_cb, 10)
self.create_subscription(ByteMultiArray, '/audio/mic_raw', self.audio_cb, 10)
self.create_subscription(Float32, '/sensor/sonar_front', self.sonar_cb, 10)
# --- PUBLISHERS (Actions) ---
self.drive_pub = self.create_publisher(String, '/ai_command', 10)
self.speaker_pub = self.create_publisher(ByteMultiArray, '/audio/speaker_raw', 10)
# --- SERVICE CLIENTS ---
self.head_client = self.create_client(AddTwoInts, 'set_head_position')
# Robot memory state
self.latest_image = None
self.front_distance = 999.0
self.get_logger().info('🧠 AI Brain started and waiting for data...')
def image_cb(self, msg):
self.latest_image = "FRAME_IN_MEMORY" # In reality: intercepted via cv_bridge
def sonar_cb(self, msg):
self.front_distance = msg.data
def audio_cb(self, msg):
user_voice = bytes(msg.data).decode('utf-8')
self.get_logger().info(f'🧠 Heard voice: {user_voice}')
self.process_ai_reasoning()
def process_ai_reasoning(self):
self.get_logger().info('🧠 Sending [Voice + Photo + Sonar Data] to OpenAI Realtime API...')
# --- SIMULATING A RESPONSE FROM OPENAI (LLM+VLM) ---
# In reality the OpenAI Realtime API would send us an audio stream
# and JSON with function calls for the motors.
if self.front_distance < 1.0:
ai_thought = "I see an obstacle nearby. I need to stop and say so."
drive_cmd = "stop"
voice_reply = b"I stopped because there is an obstacle right in front of me!"
else:
ai_thought = "The path is clear. I will turn my head to look and move forward."
drive_cmd = "forward"
voice_reply = b"Moving forward and scanning the area."
# Request a head turn (asynchronously)
self.turn_head(45, 0)
self.get_logger().info(f'🧠 AI Decision: {ai_thought}')
# Speak out loud
speaker_msg = ByteMultiArray()
speaker_msg.data = voice_reply
self.speaker_pub.publish(speaker_msg)
# Send a command to the chassis
drive_msg = String()
drive_msg.data = drive_cmd
self.drive_pub.publish(drive_msg)
def turn_head(self, pan, tilt):
if not self.head_client.wait_for_service(timeout_sec=1.0):
self.get_logger().warning('Head service unavailable!')
return
req = AddTwoInts.Request()
req.a = pan
req.b = tilt
self.get_logger().info(f'🧠 Sending command to the head motors: Pan={pan}, Tilt={tilt}')
self.head_client.call_async(req) # Async call
def main(args=None):
rclpy.init(args=args)
node = AIBrain()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()Now we will run the educational version of the architecture that reproduces the logic of the future robot, but still works on local mocks and stubs. You will need to open 5 terminals (in Windsurf you can split the panels) and launch our nodes from previous lessons in each of them:
- Terminal 1 (Sonar and Camera):
(You can run them in one terminal using
&or open two sub-terminals)pixi run python mock_sonar.py - Terminal 2 (Hardware - Head and Chassis):
pixi run python head_controller.py - Terminal 3 (Hardware - Translator / E-Diff):
pixi run python ai_driver.py - Terminal 4 (Audio system):
pixi run python mock_audio.py - Terminal 5 (AI Brain):
pixi run python ai_brain.py
What you should see in this symphony:
Every 10 seconds mock_audio will “shout” into the microphone. ai_brain will hear it and check the distance from mock_sonar. If the distance is greater than 1 meter, the Brain will send a service request to head_controller (it will buzz its motors), send the command forward to ai_driver, and say “Moving forward” through the speaker. If the sonar reports < 1.0 meter, the Brain will say stop.
Run it, watch the logs in Terminal 5 (AI Brain) and Terminal 2/3 (Hardware). If it all connects — congratulations, you have fully designed the software architecture of your agent without real hardware.
In the previous lessons our robot understood only hardcoded text commands like "forward" or "left". But modern LLMs (especially combined with the OpenAI Realtime API) can produce structured outputs. Making an LLM write plain text and then parsing it with if/else is an anti-pattern in 2026.
Instead, we use Function Calling or Structured Outputs. We give the model a strict schema describing what the robot can do, and it returns exact JSON with numbers.
In Python, the gold standard for validating data from an LLM is the Pydantic library. The OpenAI API natively supports generating responses directly into Pydantic schemas.
Now we use the llm_parser.py node, which accepts structured JSON from the LLM and turns it into direct commands for the chassis and head. This is a newer and stricter control path that can be used instead of the older text translator ai_driver.py.
Open a terminal in Windsurf and add the library:
pixi add pydanticCreate llm_parser.py in the project root:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from ackermann_msgs.msg import AckermannDriveStamped
from example_interfaces.srv import AddTwoInts
from pydantic import BaseModel, Field, ValidationError
from typing import Literal, Union, List
# --- DEFINE THE SCHEMA (OUR ROBOT API FOR THE LLM) ---
class DriveCommand(BaseModel):
action: Literal["drive"]
speed: float = Field(..., description="Speed in m/s. From -0.5 (backward) to 0.5 (forward)")
steering: float = Field(..., description="Steering angle in radians. From -0.5 (right) to 0.5 (left)")
class HeadCommand(BaseModel):
action: Literal["turn_head"]
pan: int = Field(..., description="Head pan left-right (degrees, from -90 to 90)")
tilt: int = Field(..., description="Head tilt up-down (degrees, from -90 to 90)")
class LlmResponse(BaseModel):
thought: str = Field(..., description="Agent thoughts out loud: why it made this decision")
commands: List[Union[DriveCommand, HeadCommand]] = Field(..., description="List of actions")
# --- ROS 2 NODE ---
class LlmCommandParser(Node):
def __init__(self):
super().__init__('llm_parser')
# Listen for raw JSON from the LLM
self.subscription = self.create_subscription(
String, '/llm_raw_json', self.json_callback, 10)
# Publisher directly into the Electronic Differential (bypassing the text translator)
self.drive_pub = self.create_publisher(AckermannDriveStamped, '/drive', 10)
# Client for controlling the head
self.head_client = self.create_client(AddTwoInts, 'set_head_position')
self.get_logger().info('🤖 LLM JSON Parser started. Waiting for structured commands...')
def json_callback(self, msg):
raw_json = msg.data
try:
# 1. Pydantic validates the JSON automatically.
# If the LLM makes a mistake or hallucinates, the code throws an error and the robot does not go crazy.
parsed_data = LlmResponse.model_validate_json(raw_json)
self.get_logger().info(f'💡 AI Thought: "{parsed_data.thought}"')
# 2. Execute the list of commands one by one
for cmd in parsed_data.commands:
if cmd.action == "drive":
self.execute_drive(cmd.speed, cmd.steering)
elif cmd.action == "turn_head":
self.execute_head(cmd.pan, cmd.tilt)
except ValidationError as e:
self.get_logger().error(f'❌ LLM sent invalid JSON (Hallucination): {e}')
def execute_drive(self, speed, steering):
drive_msg = AckermannDriveStamped()
drive_msg.drive.speed = speed
drive_msg.drive.steering_angle = steering
self.drive_pub.publish(drive_msg)
self.get_logger().info(f'🚗 [CHASSIS] Command sent: speed {speed} m/s, steering {steering} rad')
def execute_head(self, pan, tilt):
if self.head_client.wait_for_service(timeout_sec=0.2):
req = AddTwoInts.Request(a=pan, b=tilt)
self.head_client.call_async(req)
self.get_logger().info(f'👀 [HEAD] Request sent: Pan={pan}°, Tilt={tilt}°')
else:
self.get_logger().warning('⚠️ Head service is offline!')
def main(args=None):
rclpy.init(args=args)
node = LlmCommandParser()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()In the future, when you configure the OpenAI API, you will be able to use these schemas as the basis for Structured Outputs / Function Calling, so the model returns not free-form text but a validated data structure. Then the ROS 2 node can translate the model response directly into physical actions.
Let’s simulate a perfect response from the LLM. The model saw something interesting on the right, decided to turn the head there, and move closer slowly.
- In the first terminal run our E-Diff (Electronic differential), so it can receive the chassis command:
pixi run python electronic_differential.py
- In the second terminal run the Head Server:
pixi run python head_controller.py
- In the third terminal run our new Parser node:
pixi run python llm_parser.py
- In the fourth terminal send the “model response” (JSON string) to the
/llm_raw_jsontopic:pixi run ros2 topic pub --once /llm_raw_json std_msgs/msg/String '{data: "{\"thought\": \"I see an interesting shadow on the right. I will turn the camera and slowly move closer.\", \"commands\":[{\"action\": \"turn_head\", \"pan\": -45, \"tilt\": 10}, {\"action\": \"drive\", \"speed\": 0.2, \"steering\": -0.3}]}"}'
What will happen?
The llm_parser.py parser will immediately understand this JSON, print the AI thought to the console, then asynchronously trigger the head_controller service and the /drive topic (where electronic_differential.py will gladly receive it and calculate wheel speeds).
This is the architectural peak for LLM-based robots. Try running this test.
- The repository is already useful for studying ROS 2 without a physical robot.
- The core educational nodes already exist in code and match most of this guide.
- OpenAI integration is currently shown at the architecture and stub level, not as a production-ready client.
- The hardware bridge to the real robot is postponed for now and is intentionally not part of the current README.