Menu

© 2026 Furkanul Islam

}
{
</>

ROS 2 Fundamentals for Robotics Development: A Practical Guide

Master Robot Operating System 2 (ROS 2) for building autonomous robots. Learn nodes, topics, services, actions, and practical patterns for real-world robotics applications.

Robot Operating System 2 (ROS 2) has become the backbone of modern robotics development. After spending years working with both ROS 1 and ROS 2 on various robotics projects—from autonomous mobile robots to robotic arms—I can confidently say that ROS 2’s improvements over its predecessor are substantial. In this guide, I’ll share everything you need to know to get started with ROS 2 and build real-world robotic applications.

What is ROS 2 and Why Does It Matter?

ROS (Robot Operating System) isn’t an operating system in the traditional sense—it’s a middleware framework that provides:

  • Hardware abstraction: Write code once, run on different robots
  • Device drivers: Pre-built drivers for sensors and actuators
  • Libraries and tools: Navigation, perception, manipulation
  • Package management: Modular, reusable code
  • Communication infrastructure: Publish/subscribe messaging

Why ROS 2 Over ROS 1?

FeatureROS 1ROS 2Why It Matters
CommunicationCustom (roscore)DDS/RTPSIndustry standard, real-time
NetworkSingle point of failureDecentralizedProduction-ready
Real-time supportLimitedNativeCritical for safety
SecurityNoneDDS SecurityAuthentication, encryption
Language supportC++, PythonC++, Python, + moreFlexibility
DiscoveryManualAutomaticEasier deployment

DDS: The Game Changer

ROS 2 uses DDS (Data Distribution Service) as its middleware, which provides:

┌─────────────┐    ┌──────────────┐    ┌─────────────┐
│   Node 1    │    │   DDS        │    │   Node 2    │
│   (Talker)  │ -> │   Domain     │ -> │   (Listener)│
│             │    │   Participant│    │             │
└─────────────┘    └──────────────┘    └─────────────┘

         ┌────────────────┼────────────────┐
         ▼                ▼                ▼
   ┌──────────┐    ┌──────────┐    ┌──────────┐
   │  Topic A │    │  Topic B │    │  Topic C │
   └──────────┘    └──────────┘    └──────────┘

Installation and Setup

Ubuntu Installation

# Add ROS 2 repository
sudo apt update && sudo apt install curl gnupg lsb-release
curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key | sudo apt-key add -
echo "deb [arch=$(dpkg --print-architecture)] http://packages.ros.org/ros2/ubuntu $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/ros2.list

# Install ROS 2 Humble (LTS)
sudo apt update
sudo apt install ros-humble-desktop
sudo apt install ros-dev-tools

# Source ROS 2
echo "source /opt/ros/humble/setup.bash" >> ~/.bashrc
source ~/.bashrc

# Verify installation
ros2 --version

Development Environment

# Install additional tools
sudo apt install python3-rosdep python3-colcon-common-extensions

# Initialize rosdep
sudo rosdep init
rosdep update

# Create workspace
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws
colcon build
source install/setup.bash

Core Concepts Explained

Nodes: The Building Blocks

Nodes are independent processes that communicate. Here’s a minimal example:

#!/usr/bin/env python3
# minimal_node.py

import rclpy
from rclpy.node import Node

class MinimalNode(Node):
    """A simple ROS 2 node."""

    def __init__(self):
        super().__init__('minimal_node')
        self.get_logger().info('Minimal Node started!')

def main(args=None):
    rclpy.init(args=args)
    node = MinimalNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Run the node:

cd ~/ros2_ws
ros2 run my_package minimal_node

Topics: Publish/Subscribe Communication

Topics are named buses for streaming data. Publishers and subscribers are decoupled.

#!/usr/bin/env python3
# talker.py

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import time

class Talker(Node):
    """Publishes string messages."""

    def __init__(self):
        super().__init__('talker')
        self.publisher_ = self.create_publisher(String, 'chatter', 10)
        self.timer = self.create_timer(1.0, self.publish_message)
        self.count = 0

    def publish_message(self):
        msg = String()
        msg.data = f'Hello ROS 2! Message {self.count}'
        self.publisher_.publish(msg)
        self.get_logger().info(f'Publishing: {msg.data}')
        self.count += 1

def main(args=None):
    rclpy.init(args=args)
    node = Talker()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()
#!/usr/bin/env python3
# listener.py

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class Listener(Node):
    """Subscribes to string messages."""

    def __init__(self):
        super().__init__('listener')
        self.subscription = self.create_subscription(
            String,
            'chatter',
            self.callback,
            10
        )
        self.subscription  # prevent unused variable warning

    def callback(self, msg):
        self.get_logger().info(f'Received: {msg.data}')

def main(args=None):
    rclpy.init(args=args)
    node = Listener()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Custom Message Types

# package.xml - Add dependencies
<depend>rosidl_default_generators</depend>
<member_of_group>rosidl_interface_packages</member_of_group>
# CMakeLists.txt
find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/SensorData.msg"
  "msg/RobotState.msg"
  "srv/GetStatus.srv"
)
# msg/SensorData.msg
Header header
float64 temperature
float64 humidity
float64 pressure
geometry_msgs/Vector3 acceleration
geometry_msgs/Vector3 gyroscope
# Using custom messages
from my_robot_msgs.msg import SensorData

class SensorNode(Node):
    def __init__(self):
        super().__init__('sensor_node')
        self.pub = self.create_publisher(SensorData, 'sensor_data', 10)

    def publish_sensor_data(self):
        msg = SensorData()
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.header.frame_id = 'sensor_link'
        msg.temperature = 25.5
        msg.humidity = 60.0
        msg.pressure = 1013.25
        self.pub.publish(msg)

Services: Request/Response Communication

#!/usr/bin/env python3
# status_server.py

import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts

class StatusServer(Node):
    """Service server example."""

    def __init__(self):
        super().__init__('status_server')
        self.srv = self.create_service(
            AddTwoInts,
            'add_two_ints',
            self.callback
        )

    def callback(self, request, response):
        a = request.a
        b = request.b
        response.sum = a + b
        self.get_logger().info(f'Request: {a} + {b} = {response.sum}')
        return response

def main(args=None):
    rclpy.init(args=args)
    node = StatusServer()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()
#!/usr/bin/env python3
# status_client.py

import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
import sys

class StatusClient(Node):
    """Service client example."""

    def __init__(self, a, b):
        super().__init__('status_client')
        self.client = self.create_client(AddTwoInts, 'add_two_ints')

        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('Service not available, waiting...')

        self.req = AddTwoInts.Request()
        self.req.a = a
        self.req.b = b

    def send_request(self):
        future = self.client.call_async(self.req)
        rclpy.spin_until_future_complete(self, future)
        return future.result()

def main(args=None):
    rclpy.init(args=args)
    node = StatusClient(int(sys.argv[1]), int(sys.argv[2]))
    response = node.send_request()
    node.get_logger().info(f'Result: {response.sum}')
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Actions: Long-Running Tasks with Feedback

#!/usr/bin/env python3
# navigate_action_server.py

import rclpy
from rclpy.action import ActionServer, GoalResponse, CancelResponse
from rclpy.node import Node
from nav_msgs.msg import Odometry
import time

class NavigateToPoseActionServer(Node):
    """Action server for navigation."""

    def __init__(self):
        super().__init__('navigate_action_server')

        self._action_server = ActionServer(
            self,
            NavigateToPose,
            'navigate_to_pose',
            execute_callback=self.execute_callback,
            goal_callback=self.goal_callback,
            cancel_callback=self.cancel_callback
        )

        self.odom_sub = self.create_subscription(
            Odometry,
            '/odom',
            self.odom_callback,
            10
        )
        self.current_pose = None

    def odom_callback(self, msg):
        self.current_pose = msg.pose.pose

    def goal_callback(self, goal_request):
        """Accept or reject goals."""
        self.get_logger().info('Received navigation goal')
        return GoalResponse.ACCEPT

    def cancel_callback(self, goal_handle):
        """Handle cancellation requests."""
        self.get_logger().info('Received cancel request')
        return CancelResponse.ACCEPT

    async def execute_callback(self, goal_handle):
        """Execute navigation goal."""
        feedback = NavigateToPose.Feedback()
        result = NavigateToPose.Result()

        # Get goal pose
        goal_pose = goal_handle.request.pose

        # Navigation loop
        while True:
            # Check if cancelled
            if goal_handle.is_cancel_requested:
                goal_handle.canceled()
                self.get_logger().info('Goal canceled')
                return result

            # Calculate distance to goal
            distance = self.calculate_distance(self.current_pose, goal_pose)
            feedback.distance_remaining = distance
            goal_handle.publish_feedback(feedback)

            self.get_logger().info(f'Distance remaining: {distance:.2f}m')

            # Check if arrived
            if distance < 0.1:
                goal_handle.succeed()
                result.success = True
                self.get_logger().info('Goal reached!')
                return result

            # Wait before next iteration
            await asyncio.sleep(0.5)

    def calculate_distance(self, pose1, pose2):
        """Calculate Euclidean distance."""
        dx = pose1.position.x - pose2.position.x
        dy = pose1.position.y - pose2.position.y
        return (dx**2 + dy**2)**0.5

def main(args=None):
    rclpy.init(args=args)
    node = NavigateToPoseActionServer()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Action Client

#!/usr/bin/env python3
# navigate_action_client.py

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from nav2_msgs.action import NavigateToPose

class NavigateToPoseClient(Node):
    """Action client for navigation."""

    def __init__(self):
        super().__init__('navigate_to_pose_client')
        self._action_client = ActionClient(
            self,
            NavigateToPose,
            'navigate_to_pose'
        )

    def send_goal(self, x, y, z, w):
        """Send navigation goal."""
        goal_msg = NavigateToPose.Goal()
        goal_msg.pose.pose.position.x = x
        goal_msg.pose.pose.position.y = y
        goal_msg.pose.pose.orientation.z = z
        goal_msg.pose.pose.orientation.w = w

        self._action_client.wait_for_server()

        self._send_goal_future = self._action_client.send_goal_async(
            goal_msg,
            feedback_callback=self.feedback_callback
        )

        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        """Handle goal response."""
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected')
            return

        self.get_logger().info('Goal accepted')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def feedback_callback(self, feedback_msg):
        """Handle feedback."""
        feedback = feedback_msg.feedback
        self.get_logger().info(f'Distance remaining: {feedback.distance_remaining:.2f}m')

    def get_result_callback(self, future):
        """Handle result."""
        result = future.result().result
        self.get_logger().info(f'Navigation complete: {result.success}')

def main(args=None):
    rclpy.init(args=args)
    node = NavigateToPoseClient()
    node.send_goal(x=1.0, y=2.0, z=0.0, w=1.0)
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Practical Robot Example: Mobile Robot Controller

#!/usr/bin/env python3
# robot_controller.py

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist, PoseStamped
from sensor_msgs.msg import LaserScan, Imu
from nav_msgs.msg import Odometry
from std_msgs.msg import Bool
import math

class RobotController(Node):
    """Main controller for autonomous mobile robot."""

    def __init__(self):
        super().__init__('robot_controller')

        # Parameters
        self.declare_parameter('max_linear_velocity', 0.5)
        self.declare_parameter('max_angular_velocity', 1.0)
        self.declare_parameter('obstacle_distance', 0.5)
        self.declare_parameter('goal_tolerance', 0.1)

        # Publishers
        self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10)

        # Subscribers
        self.odom_sub = self.create_subscription(
            Odometry, '/odom', self.odom_callback, 10
        )
        self.laser_sub = self.create_subscription(
            LaserScan, '/scan', self.laser_callback, 10
        )
        self.imu_sub = self.create_subscription(
            Imu, '/imu', self.imu_callback, 10
        )

        # Timers
        self.control_timer = self.create_timer(0.1, self.control_loop)

        # State
        self.current_pose = None
        self.current_yaw = 0.0
        self.laser_scan = None
        self.goal_pose = None
        self.emergency_stop = False

    def odom_callback(self, msg):
        self.current_pose = msg.pose.pose

    def laser_callback(self, msg):
        self.laser_scan = msg

    def imu_callback(self, msg):
        # Extract yaw from quaternion
        orientation = msg.orientation
        _, _, yaw = self.quaternion_to_euler(
            orientation.x, orientation.y, orientation.z, orientation.w
        )
        self.current_yaw = yaw

    def set_goal(self, x, y):
        """Set navigation goal."""
        self.goal_pose = PoseStamped()
        self.goal_pose.header.stamp = self.get_clock().now().to_msg()
        self.goal_pose.pose.position.x = x
        self.goal_pose.pose.position.y = y

    def control_loop(self):
        """Main control loop running at 10Hz."""
        if self.emergency_stop:
            self.publish_velocity(0.0, 0.0)
            return

        if self.goal_pose is None or self.current_pose is None:
            return

        # Calculate goal distance and angle
        dx = self.goal_pose.pose.position.x - self.current_pose.position.x
        dy = self.goal_pose.pose.position.y - self.current_pose.position.y
        distance_to_goal = math.sqrt(dx**2 + dy**2)
        goal_yaw = math.atan2(dy, dx)
        angle_error = self.normalize_angle(goal_yaw - self.current_yaw)

        # Check for obstacles
        if self.laser_scan and self.check_obstacle():
            self.get_logger().warn('Obstacle detected! Stopping.')
            self.publish_velocity(0.0, 0.0)
            return

        # Control logic
        if distance_to_goal < self.get_parameter('goal_tolerance').value:
            self.get_logger().info('Goal reached!')
            self.publish_velocity(0.0, 0.0)
            self.goal_pose = None
            return

        # Calculate velocities
        max_linear = self.get_parameter('max_linear_velocity').value
        max_angular = self.get_parameter('max_angular_velocity').value

        linear_vel = min(max_linear, distance_to_goal)
        angular_vel = min(max_angular, angle_error * 2.0)

        # Publish command
        self.publish_velocity(linear_vel, angular_vel)

    def check_obstacle(self):
        """Check if obstacle is within safety distance."""
        if self.laser_scan is None:
            return False

        safety_distance = self.get_parameter('obstacle_distance').value
        ranges = self.laser_scan.ranges

        # Check forward sector (-30 to 30 degrees)
        forward_ranges = ranges[
            len(ranges)//2 - 10:len(ranges)//2 + 10
        ]

        return any(r < safety_distance for r in forward_ranges if not math.isinf(r))

    def publish_velocity(self, linear, angular):
        """Publish velocity command."""
        twist = Twist()
        twist.linear.x = linear
        twist.angular.z = angular
        self.cmd_pub.publish(twist)

    def quaternion_to_euler(self, x, y, z, w):
        """Convert quaternion to Euler angles."""
        t0 = +2.0 * (w * x + y * z)
        t1 = +1.0 - 2.0 * (x * x + y * y)
        roll = math.atan2(t0, t1)

        t2 = +2.0 * (w * y - z * x)
        t2 = +1.0 if t2 > +1.0 else t2
        t2 = -1.0 if t2 < -1.0 else t2
        pitch = math.asin(t2)

        t3 = +2.0 * (w * z + x * y)
        t4 = +1.0 - 2.0 * (y * y + z * z)
        yaw = math.atan2(t3, t4)

        return roll, pitch, yaw

    def normalize_angle(self, angle):
        """Normalize angle to [-pi, pi]."""
        while angle > math.pi:
            angle -= 2 * math.pi
        while angle < -math.pi:
            angle += 2 * math.pi
        return angle

    def emergency_stop_callback(self, msg):
        """Handle emergency stop."""
        self.emergency_stop = msg.data
        if self.emergency_stop:
            self.get_logger().error('EMERGENCY STOP ACTIVATED!')

def main(args=None):
    rclpy.init(args=args)
    controller = RobotController()
    rclpy.spin(controller)
    controller.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Launch Files and Configuration

# launch/robot_bringup.launch.py

from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare

def generate_launch_description():
    # Declare arguments
    use_sim_time = DeclareLaunchArgument(
        'use_sim_time',
        default_value='false',
        description='Use simulation clock'
    )

    namespace = DeclareLaunchArgument(
        'namespace',
        default_value='',
        description='Robot namespace'
    )

    # Create nodes
    robot_controller = Node(
        package='my_robot',
        executable='robot_controller',
        name='robot_controller',
        namespace=LaunchConfiguration('namespace'),
        parameters=[{
            'use_sim_time': LaunchConfiguration('use_sim_time'),
            'max_linear_velocity': 0.5,
            'max_angular_velocity': 1.0
        }]
    )

    slam_toolbox = IncludeLaunchDescription(
        PythonLaunchDescriptionSource([
            PathJoinSubstitution([
                FindPackageShare('slam_toolbox'),
                'launch',
                'online_async_launch.py'
            ])
        ]),
        launch_arguments={
            'use_sim_time': LaunchConfiguration('use_sim_time'),
            'namespace': LaunchConfiguration('namespace')
        }.items()
    )

    navigation = IncludeLaunchDescription(
        PythonLaunchDescriptionSource([
            PathJoinSubstitution([
                FindPackageShare('nav2_bringup'),
                'launch',
                'navigation_launch.py'
            ])
        ]),
        launch_arguments={
            'use_sim_time': LaunchConfiguration('use_sim_time'),
            'namespace': LaunchConfiguration('namespace')
        }.items()
    )

    return LaunchDescription([
        use_sim_time,
        namespace,
        robot_controller,
        slam_toolbox,
        navigation
    ])

Key Takeaways

ROS 2 fundamentals include:

  1. Nodes: Independent, modular processes
  2. Topics: Publish/subscribe for streaming data
  3. Services: Request/response for commands
  4. Actions: Long-running tasks with feedback
  5. Launch files: Orchestrate multiple nodes
  6. DDS: Industry-standard middleware

Master these concepts, and you’ll be equipped to build sophisticated robotic systems.


Questions about ROS 2 or robotics development? Reach out through the contact page or connect on LinkedIn.

MD Furkanul Islam

MD Furkanul Islam

Data Engineer & AI/ML Specialist

9+ years building intelligent data systems at scale. Passionate about bridging the gap between data engineering, AI, and robotics.