Robot Operating System 2 (ROS 2) has become the backbone of modern robotics development. After spending years working with both ROS 1 and ROS 2 on various robotics projects—from autonomous mobile robots to robotic arms—I can confidently say that ROS 2’s improvements over its predecessor are substantial. In this guide, I’ll share everything you need to know to get started with ROS 2 and build real-world robotic applications.
What is ROS 2 and Why Does It Matter?
ROS (Robot Operating System) isn’t an operating system in the traditional sense—it’s a middleware framework that provides:
- Hardware abstraction: Write code once, run on different robots
- Device drivers: Pre-built drivers for sensors and actuators
- Libraries and tools: Navigation, perception, manipulation
- Package management: Modular, reusable code
- Communication infrastructure: Publish/subscribe messaging
Why ROS 2 Over ROS 1?
| Feature | ROS 1 | ROS 2 | Why It Matters |
|---|---|---|---|
| Communication | Custom (roscore) | DDS/RTPS | Industry standard, real-time |
| Network | Single point of failure | Decentralized | Production-ready |
| Real-time support | Limited | Native | Critical for safety |
| Security | None | DDS Security | Authentication, encryption |
| Language support | C++, Python | C++, Python, + more | Flexibility |
| Discovery | Manual | Automatic | Easier deployment |
DDS: The Game Changer
ROS 2 uses DDS (Data Distribution Service) as its middleware, which provides:
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Node 1 │ │ DDS │ │ Node 2 │
│ (Talker) │ -> │ Domain │ -> │ (Listener)│
│ │ │ Participant│ │ │
└─────────────┘ └──────────────┘ └─────────────┘
│
┌────────────────┼────────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Topic A │ │ Topic B │ │ Topic C │
└──────────┘ └──────────┘ └──────────┘
Installation and Setup
Ubuntu Installation
# Add ROS 2 repository
sudo apt update && sudo apt install curl gnupg lsb-release
curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key | sudo apt-key add -
echo "deb [arch=$(dpkg --print-architecture)] http://packages.ros.org/ros2/ubuntu $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/ros2.list
# Install ROS 2 Humble (LTS)
sudo apt update
sudo apt install ros-humble-desktop
sudo apt install ros-dev-tools
# Source ROS 2
echo "source /opt/ros/humble/setup.bash" >> ~/.bashrc
source ~/.bashrc
# Verify installation
ros2 --version
Development Environment
# Install additional tools
sudo apt install python3-rosdep python3-colcon-common-extensions
# Initialize rosdep
sudo rosdep init
rosdep update
# Create workspace
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws
colcon build
source install/setup.bash
Core Concepts Explained
Nodes: The Building Blocks
Nodes are independent processes that communicate. Here’s a minimal example:
#!/usr/bin/env python3
# minimal_node.py
import rclpy
from rclpy.node import Node
class MinimalNode(Node):
"""A simple ROS 2 node."""
def __init__(self):
super().__init__('minimal_node')
self.get_logger().info('Minimal Node started!')
def main(args=None):
rclpy.init(args=args)
node = MinimalNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Run the node:
cd ~/ros2_ws
ros2 run my_package minimal_node
Topics: Publish/Subscribe Communication
Topics are named buses for streaming data. Publishers and subscribers are decoupled.
#!/usr/bin/env python3
# talker.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import time
class Talker(Node):
"""Publishes string messages."""
def __init__(self):
super().__init__('talker')
self.publisher_ = self.create_publisher(String, 'chatter', 10)
self.timer = self.create_timer(1.0, self.publish_message)
self.count = 0
def publish_message(self):
msg = String()
msg.data = f'Hello ROS 2! Message {self.count}'
self.publisher_.publish(msg)
self.get_logger().info(f'Publishing: {msg.data}')
self.count += 1
def main(args=None):
rclpy.init(args=args)
node = Talker()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
#!/usr/bin/env python3
# listener.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class Listener(Node):
"""Subscribes to string messages."""
def __init__(self):
super().__init__('listener')
self.subscription = self.create_subscription(
String,
'chatter',
self.callback,
10
)
self.subscription # prevent unused variable warning
def callback(self, msg):
self.get_logger().info(f'Received: {msg.data}')
def main(args=None):
rclpy.init(args=args)
node = Listener()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Custom Message Types
# package.xml - Add dependencies
<depend>rosidl_default_generators</depend>
<member_of_group>rosidl_interface_packages</member_of_group>
# CMakeLists.txt
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/SensorData.msg"
"msg/RobotState.msg"
"srv/GetStatus.srv"
)
# msg/SensorData.msg
Header header
float64 temperature
float64 humidity
float64 pressure
geometry_msgs/Vector3 acceleration
geometry_msgs/Vector3 gyroscope
# Using custom messages
from my_robot_msgs.msg import SensorData
class SensorNode(Node):
def __init__(self):
super().__init__('sensor_node')
self.pub = self.create_publisher(SensorData, 'sensor_data', 10)
def publish_sensor_data(self):
msg = SensorData()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'sensor_link'
msg.temperature = 25.5
msg.humidity = 60.0
msg.pressure = 1013.25
self.pub.publish(msg)
Services: Request/Response Communication
#!/usr/bin/env python3
# status_server.py
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class StatusServer(Node):
"""Service server example."""
def __init__(self):
super().__init__('status_server')
self.srv = self.create_service(
AddTwoInts,
'add_two_ints',
self.callback
)
def callback(self, request, response):
a = request.a
b = request.b
response.sum = a + b
self.get_logger().info(f'Request: {a} + {b} = {response.sum}')
return response
def main(args=None):
rclpy.init(args=args)
node = StatusServer()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
#!/usr/bin/env python3
# status_client.py
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
import sys
class StatusClient(Node):
"""Service client example."""
def __init__(self, a, b):
super().__init__('status_client')
self.client = self.create_client(AddTwoInts, 'add_two_ints')
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service not available, waiting...')
self.req = AddTwoInts.Request()
self.req.a = a
self.req.b = b
def send_request(self):
future = self.client.call_async(self.req)
rclpy.spin_until_future_complete(self, future)
return future.result()
def main(args=None):
rclpy.init(args=args)
node = StatusClient(int(sys.argv[1]), int(sys.argv[2]))
response = node.send_request()
node.get_logger().info(f'Result: {response.sum}')
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Actions: Long-Running Tasks with Feedback
#!/usr/bin/env python3
# navigate_action_server.py
import rclpy
from rclpy.action import ActionServer, GoalResponse, CancelResponse
from rclpy.node import Node
from nav_msgs.msg import Odometry
import time
class NavigateToPoseActionServer(Node):
"""Action server for navigation."""
def __init__(self):
super().__init__('navigate_action_server')
self._action_server = ActionServer(
self,
NavigateToPose,
'navigate_to_pose',
execute_callback=self.execute_callback,
goal_callback=self.goal_callback,
cancel_callback=self.cancel_callback
)
self.odom_sub = self.create_subscription(
Odometry,
'/odom',
self.odom_callback,
10
)
self.current_pose = None
def odom_callback(self, msg):
self.current_pose = msg.pose.pose
def goal_callback(self, goal_request):
"""Accept or reject goals."""
self.get_logger().info('Received navigation goal')
return GoalResponse.ACCEPT
def cancel_callback(self, goal_handle):
"""Handle cancellation requests."""
self.get_logger().info('Received cancel request')
return CancelResponse.ACCEPT
async def execute_callback(self, goal_handle):
"""Execute navigation goal."""
feedback = NavigateToPose.Feedback()
result = NavigateToPose.Result()
# Get goal pose
goal_pose = goal_handle.request.pose
# Navigation loop
while True:
# Check if cancelled
if goal_handle.is_cancel_requested:
goal_handle.canceled()
self.get_logger().info('Goal canceled')
return result
# Calculate distance to goal
distance = self.calculate_distance(self.current_pose, goal_pose)
feedback.distance_remaining = distance
goal_handle.publish_feedback(feedback)
self.get_logger().info(f'Distance remaining: {distance:.2f}m')
# Check if arrived
if distance < 0.1:
goal_handle.succeed()
result.success = True
self.get_logger().info('Goal reached!')
return result
# Wait before next iteration
await asyncio.sleep(0.5)
def calculate_distance(self, pose1, pose2):
"""Calculate Euclidean distance."""
dx = pose1.position.x - pose2.position.x
dy = pose1.position.y - pose2.position.y
return (dx**2 + dy**2)**0.5
def main(args=None):
rclpy.init(args=args)
node = NavigateToPoseActionServer()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Action Client
#!/usr/bin/env python3
# navigate_action_client.py
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from nav2_msgs.action import NavigateToPose
class NavigateToPoseClient(Node):
"""Action client for navigation."""
def __init__(self):
super().__init__('navigate_to_pose_client')
self._action_client = ActionClient(
self,
NavigateToPose,
'navigate_to_pose'
)
def send_goal(self, x, y, z, w):
"""Send navigation goal."""
goal_msg = NavigateToPose.Goal()
goal_msg.pose.pose.position.x = x
goal_msg.pose.pose.position.y = y
goal_msg.pose.pose.orientation.z = z
goal_msg.pose.pose.orientation.w = w
self._action_client.wait_for_server()
self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
"""Handle goal response."""
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected')
return
self.get_logger().info('Goal accepted')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def feedback_callback(self, feedback_msg):
"""Handle feedback."""
feedback = feedback_msg.feedback
self.get_logger().info(f'Distance remaining: {feedback.distance_remaining:.2f}m')
def get_result_callback(self, future):
"""Handle result."""
result = future.result().result
self.get_logger().info(f'Navigation complete: {result.success}')
def main(args=None):
rclpy.init(args=args)
node = NavigateToPoseClient()
node.send_goal(x=1.0, y=2.0, z=0.0, w=1.0)
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Practical Robot Example: Mobile Robot Controller
#!/usr/bin/env python3
# robot_controller.py
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist, PoseStamped
from sensor_msgs.msg import LaserScan, Imu
from nav_msgs.msg import Odometry
from std_msgs.msg import Bool
import math
class RobotController(Node):
"""Main controller for autonomous mobile robot."""
def __init__(self):
super().__init__('robot_controller')
# Parameters
self.declare_parameter('max_linear_velocity', 0.5)
self.declare_parameter('max_angular_velocity', 1.0)
self.declare_parameter('obstacle_distance', 0.5)
self.declare_parameter('goal_tolerance', 0.1)
# Publishers
self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10)
# Subscribers
self.odom_sub = self.create_subscription(
Odometry, '/odom', self.odom_callback, 10
)
self.laser_sub = self.create_subscription(
LaserScan, '/scan', self.laser_callback, 10
)
self.imu_sub = self.create_subscription(
Imu, '/imu', self.imu_callback, 10
)
# Timers
self.control_timer = self.create_timer(0.1, self.control_loop)
# State
self.current_pose = None
self.current_yaw = 0.0
self.laser_scan = None
self.goal_pose = None
self.emergency_stop = False
def odom_callback(self, msg):
self.current_pose = msg.pose.pose
def laser_callback(self, msg):
self.laser_scan = msg
def imu_callback(self, msg):
# Extract yaw from quaternion
orientation = msg.orientation
_, _, yaw = self.quaternion_to_euler(
orientation.x, orientation.y, orientation.z, orientation.w
)
self.current_yaw = yaw
def set_goal(self, x, y):
"""Set navigation goal."""
self.goal_pose = PoseStamped()
self.goal_pose.header.stamp = self.get_clock().now().to_msg()
self.goal_pose.pose.position.x = x
self.goal_pose.pose.position.y = y
def control_loop(self):
"""Main control loop running at 10Hz."""
if self.emergency_stop:
self.publish_velocity(0.0, 0.0)
return
if self.goal_pose is None or self.current_pose is None:
return
# Calculate goal distance and angle
dx = self.goal_pose.pose.position.x - self.current_pose.position.x
dy = self.goal_pose.pose.position.y - self.current_pose.position.y
distance_to_goal = math.sqrt(dx**2 + dy**2)
goal_yaw = math.atan2(dy, dx)
angle_error = self.normalize_angle(goal_yaw - self.current_yaw)
# Check for obstacles
if self.laser_scan and self.check_obstacle():
self.get_logger().warn('Obstacle detected! Stopping.')
self.publish_velocity(0.0, 0.0)
return
# Control logic
if distance_to_goal < self.get_parameter('goal_tolerance').value:
self.get_logger().info('Goal reached!')
self.publish_velocity(0.0, 0.0)
self.goal_pose = None
return
# Calculate velocities
max_linear = self.get_parameter('max_linear_velocity').value
max_angular = self.get_parameter('max_angular_velocity').value
linear_vel = min(max_linear, distance_to_goal)
angular_vel = min(max_angular, angle_error * 2.0)
# Publish command
self.publish_velocity(linear_vel, angular_vel)
def check_obstacle(self):
"""Check if obstacle is within safety distance."""
if self.laser_scan is None:
return False
safety_distance = self.get_parameter('obstacle_distance').value
ranges = self.laser_scan.ranges
# Check forward sector (-30 to 30 degrees)
forward_ranges = ranges[
len(ranges)//2 - 10:len(ranges)//2 + 10
]
return any(r < safety_distance for r in forward_ranges if not math.isinf(r))
def publish_velocity(self, linear, angular):
"""Publish velocity command."""
twist = Twist()
twist.linear.x = linear
twist.angular.z = angular
self.cmd_pub.publish(twist)
def quaternion_to_euler(self, x, y, z, w):
"""Convert quaternion to Euler angles."""
t0 = +2.0 * (w * x + y * z)
t1 = +1.0 - 2.0 * (x * x + y * y)
roll = math.atan2(t0, t1)
t2 = +2.0 * (w * y - z * x)
t2 = +1.0 if t2 > +1.0 else t2
t2 = -1.0 if t2 < -1.0 else t2
pitch = math.asin(t2)
t3 = +2.0 * (w * z + x * y)
t4 = +1.0 - 2.0 * (y * y + z * z)
yaw = math.atan2(t3, t4)
return roll, pitch, yaw
def normalize_angle(self, angle):
"""Normalize angle to [-pi, pi]."""
while angle > math.pi:
angle -= 2 * math.pi
while angle < -math.pi:
angle += 2 * math.pi
return angle
def emergency_stop_callback(self, msg):
"""Handle emergency stop."""
self.emergency_stop = msg.data
if self.emergency_stop:
self.get_logger().error('EMERGENCY STOP ACTIVATED!')
def main(args=None):
rclpy.init(args=args)
controller = RobotController()
rclpy.spin(controller)
controller.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Launch Files and Configuration
# launch/robot_bringup.launch.py
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
def generate_launch_description():
# Declare arguments
use_sim_time = DeclareLaunchArgument(
'use_sim_time',
default_value='false',
description='Use simulation clock'
)
namespace = DeclareLaunchArgument(
'namespace',
default_value='',
description='Robot namespace'
)
# Create nodes
robot_controller = Node(
package='my_robot',
executable='robot_controller',
name='robot_controller',
namespace=LaunchConfiguration('namespace'),
parameters=[{
'use_sim_time': LaunchConfiguration('use_sim_time'),
'max_linear_velocity': 0.5,
'max_angular_velocity': 1.0
}]
)
slam_toolbox = IncludeLaunchDescription(
PythonLaunchDescriptionSource([
PathJoinSubstitution([
FindPackageShare('slam_toolbox'),
'launch',
'online_async_launch.py'
])
]),
launch_arguments={
'use_sim_time': LaunchConfiguration('use_sim_time'),
'namespace': LaunchConfiguration('namespace')
}.items()
)
navigation = IncludeLaunchDescription(
PythonLaunchDescriptionSource([
PathJoinSubstitution([
FindPackageShare('nav2_bringup'),
'launch',
'navigation_launch.py'
])
]),
launch_arguments={
'use_sim_time': LaunchConfiguration('use_sim_time'),
'namespace': LaunchConfiguration('namespace')
}.items()
)
return LaunchDescription([
use_sim_time,
namespace,
robot_controller,
slam_toolbox,
navigation
])
Key Takeaways
ROS 2 fundamentals include:
- Nodes: Independent, modular processes
- Topics: Publish/subscribe for streaming data
- Services: Request/response for commands
- Actions: Long-running tasks with feedback
- Launch files: Orchestrate multiple nodes
- DDS: Industry-standard middleware
Master these concepts, and you’ll be equipped to build sophisticated robotic systems.
Questions about ROS 2 or robotics development? Reach out through the contact page or connect on LinkedIn.
Related Posts
Python Essentials for Data Engineering: Libraries, Patterns, and Best Practices
Master the Python libraries, design patterns, and performance techniques that every data engineer needs. Comprehensive guide with real-world examples for building robust data pipelines.
RoboticsFrom Robotics to Software: My Career Transition and What I Learned
How robotics engineering shaped my approach to software architecture. Lessons on career transitions, transferable skills, and finding your unique value.
RoboticsFrom Robotics to Software: My Career Transition
How robotics engineering shaped my approach to software architecture.