Building ROS 2 Nodes with Python (rclpy)
Now that we understand the theory, let's write code. We will use rclpy (ROS Client Library for Python) to build a simple control loop.
The Workspace
In ROS 2, you work within a Workspace (usually named ros2_ws). This is where you build your packages.
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws/src
Creating a Package
A Package is a container for your code. It makes it easy to share and build.
ros2 pkg create --build-type ament_python my_robot_controller
This creates a directory structure:
package.xml: Defines dependencies.setup.py: Build instructions.my_robot_controller/: Where your Python scripts live.
Writing a Simple Node
Let's create a node that simulates a heartbeat for our robot. Create my_robot_controller/heartbeat_node.py.
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
class HeartbeatNode(Node):
def __init__(self):
super().__init__("heartbeat_node")
self.get_logger().info("Heartbeat Node has started.")
# Create a timer that fires every 1.0 second
self.create_timer(1.0, self.timer_callback)
self.counter = 0
def timer_callback(self):
self.get_logger().info(f"Lub-Dub: {self.counter}")
self.counter += 1
def main(args=None):
rclpy.init(args=args)
node = HeartbeatNode()
try:
rclpy.spin(node) # Keeps the node alive
except KeyboardInterrupt:
pass
rclpy.shutdown()
if __name__ == '__main__':
main()
Breaking Down the Code
- Inheritance:
class HeartbeatNode(Node)imports all standard ROS functionality. __init__: We name the node "heartbeat_node". This name must be unique in the graph.- Logging:
self.get_logger().info(...)is preferred overprint(). It adds timestamps and formatting. - Timers:
create_timer(1.0, callback)allows us to run code periodically without blocking execution loops (non-blocking). - Spinning:
rclpy.spin(node)pauses the program here and waits for callbacks (timers, incoming messages) to trigger. It yields control to ROS.
Registering the Executable
For ROS to find your node (so you can run ros2 run ...), you must edit setup.py.
entry_points={
'console_scripts': [
'heartbeat = my_robot_controller.heartbeat_node:main',
],
},
Now, from the workspace root:
cd ~/ros2_ws
colcon build --symlink-install
source install/setup.bash
ros2 run my_robot_controller heartbeat
Using --symlink-install allows you to change your Python scripts without rebuilding the package every time. It links the file in install/ directly to your src/ file.
Publishers and Subscribers
Let's make it interactive. We'll create a publisher that sends "Motor Commands" and a subscriber that listens.
The Publisher
from std_msgs.msg import String
# Inside your class, in __init__:
self.cmd_pub = self.create_publisher(String, "motor_commands", 10)
# In your timer callback:
msg = String()
msg.data = f"Move forward: {self.counter}"
self.cmd_pub.publish(msg)
The Subscriber
# In a new node class:
self.create_subscription(String, "motor_commands", self.cmd_callback, 10)
def cmd_callback(self, msg):
self.get_logger().info(f"Received command: {msg.data}")
QoS Profiles (Quality of Service)
Notice the 10 argument? That's the Queue Size. If the subscriber is slow, ROS keeps 10 messages in buffer.
ROS 2 allows fine-tuning QoS:
- Reliability:
Reliable(TCP-like, guarantees delivery) vsBest Effort(UDP-like, drops packets if fast). - Durability:
Transient Local(new subscribers get the last message) vsVolatile.
For high-speed sensor data (like LiDAR), we usually use Best Effort. For critical commands ("Stop!"), we use Reliable.