Lab 08 - Tworzenie węzłów i przetwarzanie wiadomości

Tworzenie węzłów i przetwarzanie wiadomości

Konfiguracja środowiska

Przed rozpoczęciem pracy z ROS, w każdym nowo otwartym terminalu należy wywołać komendę:

source install/setup.bash
source /opt/ros/humble/setup.bash

Wprowadzenie

W tej instrukcji będziemy tworzyć węzły subskrybujące i publikujące różne typy wiadomości. Dodatkowo zintegrujemy bibliotekę OpenCV do pracy z ROS'em i wykorzystamy ją do sterowania robotem mobilnym.

Tworzenie węzłów

Utworzenie paczki

Przejdź do katalogu ~/ros2_ws/src. Utwórz paczkę o nazwie camera_subscriber o zależnościach cv_bridge python3-opencv sensor_msgs geometry_msgs. Jako --build-type podaj ament_python.

ros2 pkg create camera_subscriber --build-type ament_python --dependencies cv_bridge python3-opencv sensor_msgs geometry_msgs

Tworzenie węzła

Węzły mogą być tworzone wraz z paczką komendą ros2 pkg create, jednak w tym miejscu utworzymy węzeł manualnie. Ta metoda sprawdzi się również, gdy będziesz chciał dodać węzeł do już istniejącej paczki.

  1. Przejdź do katalogu ~/ros2_ws/src/camera_subscriber/camera_subscriber.
  2. Utwórz skrypt poleceniem touch camera_node.py.
  3. Wklej do skryptu poniższy kod.
#!/usr/bin/env python3

import rclpy
from rclpy.node import Node

def main(args=None):
    rclpy.init(args=args)
    node = Node('camera_node')
    node.get_logger().info('Camera node started')
    rclpy.spin(node)
    rclpy.shutdown()

if __name__ == '__main__':
    main()
  1. W pliku setup.py przeprowadź następującą modyfikację
    entry_points={
        'console_scripts': [
            'camera_node = camera_subscriber.camera_node:main',
        ],
    },
  1. Zbuduj środowisko poleceniem colcon build będąc w głównym katalogu.
  2. Uruchom węzeł poleceniem:
ros2 run camera_subscriber camera_node

W powyższym procesie istotne jest prawidłowe umiejscowienie skryptu (punkty 1,2) oraz dodanie pliku wykonywalnego w entry_points (punkt 4). Ważną uwagą jest również konieczność zbudowania środowiska, aby zaobserwować zmiany w działaniu programu (punkt 5) - niezależnie czy programujemy w Pythonie, czy w języku kompilowanym (np. C++).

Biblioteka rclpy (ROS Client Library for the Python) zawiera implementacje konceptów związanych z ROS'em. Dokumentacja jest dostępna tutaj.

Dodawanie subskrybenta

Otwórz plik camera_node.py i wklej do niego poniższą zawartość.

#!/usr/bin/env python3

import rclpy # Python Client Library for ROS 2
from rclpy.node import Node # Handles the creation of nodes
from sensor_msgs.msg import Image # Image is the message type
from cv_bridge import CvBridge # ROS2 package to convert between ROS and OpenCV Images
import cv2 # Python OpenCV library


def listener_callback(image_data):
    # Convert ROS Image message to OpenCV image
    cv_image = CvBridge().imgmsg_to_cv2(image_data,"bgr8")
    # Display image
    cv2.imshow("camera", cv_image)
    # Stop to show the image
    cv2.waitKey(1)

def main(args=None):
    rclpy.init(args=args)
    # Create the node
    node = Node('camera_node')
    # Log information into the console
    node.get_logger().info('Hello node')
    # Create the subscriber. This subscriber will receive an Image
    # from the image_raw topic. The queue size is 10 messages.
    subscription = node.create_subscription(Image,'image_raw',listener_callback,10)
    # Spin the node so the callback function is called.
    rclpy.spin(node)
    # Spin the node so the callback function is called.
    rclpy.shutdown()

if __name__ == '__main__':
    main()

💡 Uwaga! W przypadku braku wymaganych zależności do uruchomienia projektu (python3-opencv lub ros-humble-cv-bridge) skorzystaj z rosdep:

rosdep install --from-paths src -y --ignore-src --rosdistro humble

Zależności te zostały zdefiniowane w momencie tworzenia paczki.

Przedstawiony kod tworzy subskrybenta tematu /image_raw, który może być uzyskany np. przy pomocy paczki usb_cam. W wywołaniu zwrotnym subskrybenta, funkcji listener_callback, dokonywana jest konwersja obrazu z ROS'owego typu Image na typ obsługiwany przez bibliotekę OpenCV oraz wyświetlanie klatek z kamery. Dokumentacja metody do tworzenia subskrybentów (create_subscription) znajduje się tutaj.

Węzły w wersji obiektowej oraz węzeł publikujący

Dokumentacja ROS2 dotycząca tworzenia węzłów wskazuje metodę obiektową jako domyślną dla tworzenia węzłów. Tworzenie kodu skalowalnego i modułowego, często w zespole, jest domeną programowania obiektowego. Poniżej przedstawiono schemat kodu, który można wykorzystać do tworzenia węzłów obiektowo.

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
 
class MyCustomNode(Node): 
    def __init__(self):
        super().__init__("node_name")
 
def main(args=None):
    rclpy.init(args=args)
    node = MyCustomNode()
    rclpy.spin(node)
    rclpy.shutdown()
 
 
if __name__ == "__main__":
    main()

Rozbudowywanie funkcjonalności węzła odbywa się poprzez dodawanie funkcjonalności w klasie MyCustomNode.

Przykład węzła publikującego

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class MinimalPublisher(Node):
    def __init__(self):
        super().__init__('minimal_publisher')
        self.publisher_ = self.create_publisher(String, 'topic_name', 10)
        timer_period = 0.5  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.i = 0

    def timer_callback(self):
        msg = String()
        msg.data = 'Hello World: %d' % self.i
        self.publisher_.publish(msg)
        self.get_logger().info('Publishing: "%s"' % msg.data)
        self.i += 1

def main(args=None):
    rclpy.init(args=args)
    minimal_publisher = MinimalPublisher()
    rclpy.spin(minimal_publisher)
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Rozpoczynając od definicji konstruktora klasy, super().__init__ wewnętrznie wywołuje konstruktor klasy Node oraz nadaje nazwę węzłowi, w tym przypadku minimal_publisher. Metoda create_publisher tworzy obiekt publikujący wiadomości typu String (importowane z biblioteki std_msgs.msg) na temacie topic_name. Dodatkowo wartość 10 podana jako argument tej funkcji oznacza, tak jak w przykładzie z subskrybentem, rozmiar bufora równy tej wartości. W przypadku, gdy subskrybent nie nadąża z przetwarzaniem wiadomości, będą one odrzucane po przekroczeniu rozmiaru bufora.

Następnie tworzony jest timer z wywołaniem zwrotnym wykonującym się co 0.5 sekundy. Zmienna self.i jest inkrementowanym licznikiem. Funkcja timer_callback tworzy wiadomość zawierającą stan licznika, wyświetla go w konsoli przy pomocy metody get_logger().info i publikuje na temacie.

💡 Uwaga! Węzeł może jednocześnie zawierać obiekt publikujący i subskrybujący.

Przykład węzła subskrybującego znajduje się w dokumentacji.

Węzły w różnych językach programistycznych

ROS2 Client Libraries to interfejs API, który, umożliwia użytkownikom implementację kodu ROS. Korzystając z Client Libraries, deweloperzy kodu uzyskują dostęp do konceptów, takich jak węzły, tematy, serwisy itp. Client Libraries są dostępne w różnych językach programowania, dzięki czemu użytkownicy mogą pisać ROS'owy kod w języku, który najlepiej pasuje do ich aplikacji. Na przykład możesz chcieć napisać narzędzia do wizualizacji w Pythonie, ponieważ przyspiesza to prototypowanie, podczas gdy wydajne węzły mogą być zaimplementowane w C++.

Przykładowy szablon dla tworzenia węzła obiektowego dla języka C++:

#include "rclcpp/rclcpp.hpp"
 
class MyCustomNode : public rclcpp::Node
{
public:
    MyCustomNode() : Node("node_name")
    {
    }
 
private:
};
 
int main(int argc, char **argv)
{
    rclcpp::init(argc, argv);
    auto node = std::make_shared<MyCustomNode>();
    rclcpp::spin(node);
    rclcpp::shutdown();
    return 0;
}

Węzły napisane w różnych językach programistycznych nadal mogą się komunikować, ponieważ są budowane na wspólnym interfejcie rcl (ROS2 Client Libraries).

Funkcjonalności ROS'a dla C++(rclcpp) i Pythona (rclpy) są zarządzane i wspierane przez autorów i zespoły ROS'a. Natomiast są również inne języki, dla których funkcjonalności tworzy społeczność ROS'a (np. Rust, Node.js, C, Android). Więcej informacji tutaj.

Parametryzacja węzłów obiektowych

Dodawanie parametrów do węzła

Tworzenie parametrów w węźle odbywa się z wykorzystaniem metody self.declare_parameter('my_parameter', 'my_value'), gdzie my_parameter to nazwa parametru, a my_value to wartość domyślna. Typ parametru jest wnioskowany z wartości domyślnej, więc w tym przypadku byłby ustawiony na typ string.

Następnie, aby odczytać aktualną wartość parametru wykorzystuje się metodę self.get_parameter_value(). Zwracany obiekt jest typu rcl_interfaces.msg.ParameterValue, więc aby uzyskać wartość odwołujemy się do pola value.

Przykładowy szablon kodu:

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node

class MinimalParam(Node):
    def __init__(self):
        super().__init__('minimal_param_node')
        self.declare_parameter('param_name', 'default_value')
        self.my_param = self.get_parameter('param_name').value
        print(self.my_param)

def main():
    rclpy.init()
    node = MinimalParam()
    rclpy.spin(node)

if __name__ == '__main__':
    main()

Wszystkie dostępne typy parametrów oraz metody ich pozyskiwania opisane są w poniższym linku.

Uruchamianie węzła z parametrami

Węzły mogą być uruchamiane wraz z parametrami, np.:

ros2 run package_name node_name --ros-args -p param_name:=param_value

na przykład:

ros2 run demo_nodes_cpp parameter_blackboard --ros-args -p some_int:=42 -p "a_string:=Hello world" -p "some_lists.some_integers:=[1, 2, 3, 4]" -p "some_lists.some_doubles:=[3.14, 2.718]"

Możliwe jest wczytywanie różnych typów parametrów jak i zbiorów (np. list), a także całych plików. Więcej o tym tutaj.

Dane wymieniane pomiędzy węzłami - wiadomości

Wiadomość (ang. message) to struktura danych wymieniana pomiędzy węzłami. Pliki o rozszerzeniu *.msg zawierają deklarację wiadomości. Custom'owe wiadomości przechowywane są w katalogu paczki o nazwie msg. Przykładowe dostępne wiadomości: std_msgs/String.msg, sensor_msgs/Image.msg, trajectory_msgs/msg/JointTrajectory.

Za każdym razem, gdy zamierzasz wykorzystać wiadomość powinieneś zdefiniować ją w zależnościach w pliku package.xml:

<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>trajectory_msgs</depend>

W przypadku, gdy tworzysz własny typ wiadomości, proces przebiega następująco:

  1. W katalogu głównym paczki utwórz folder o nazwie msg.
  2. W katalogu msg utwórz plik o rozszerzeniu *.msg, np. MyMessage.msg. Wewnątrz pliku definiujemy strukturę według schematu:
fieldtype1 fieldname1
fieldtype2 fieldname2
fieldtype3 fieldname3

np.

int32 my_int
string[] my_string_array

lub też z wartościami domyślnymi:

int32 my_int 60
string[] my_string_array ["a","b","c"]

Wszystkie dostępne typy zmiennych są wymienione tutaj.

  1. W pliku CMakeLists.txt dodaj następujące linie dla wiadomości MyMessage.msg:
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/MyMessage.msg"
 )
  1. W pliku package.xml dodaj następujące zależności:
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
  1. Zbuduj środowisko poleceniem colcon build.

💡 Uwaga, przedstawiony proces zakłada paczkę utworzoną z ament_cmake. W praktyce, można utworzyć osobną paczkę zawierającą jedynie definicje wiadomości, aby móc wykorzystać je w drugiej, Pythonowej, paczce. Kompromisem, pozwalającym tworzyć jednocześnie węzły Pythonowe oraz wiadomości, jest zastosowanie ament_cmake_python. Więcej szczegółów w następującym przykładzie oraz tu.

Symulacja robota mobilnego TurtleBot

Instalacja

Jeśli symulacja jeszcze nie jest zainstalowana na Twoim komputerze, możesz tego dokonać poniższą komendą:

sudo apt install ros-humble-turtlebot3*

Konfiguracja

Wybierz model robota poleceniem:

export TURTLEBOT3_MODEL=burger

Wskaż ścieżkę do modelu robota poleceniem:

export GAZEBO_MODEL_PATH=$GAZEBO_MODEL_PATH:`ros2 pkg \
prefix turtlebot3_gazebo \
`/share/turtlebot3_gazebo/models/

Uruchamianie:

ros2 launch turtlebot3_gazebo empty_world.launch.py

Możliwe jest również uruchomienie innego symulowanego otoczenia, np. turtlebot3_house.launch.py

Sterowanie robotem:

Sterowanie robotem odbywa się poprzez publikację wiadomości na temacie /cmd_vel.

Możesz to zrobić komendą:

ros2 topic pub /cmd_vel geometry_msgs/msg/Twist "linear:
  x: 2.0
  y: 0.0
  z: 0.0
angular:
  x: 0.0
  y: 0.0
  z: 2.0"

Istnieje również dedykowany węzeł do sterowania TurtleBotem przy pomocy klawiatury. Konieczne jest jednak najpierw ustawienie nazwy robota, jeśli uruchomiłeś nowy terminal:

export TURTLEBOT3_MODEL=burger

Następnie uruchamiamy węzeł:

ros2 run turtlebot3_teleop teleop_keyboard

Zadania

  1. Wykonaj następującą próbę. Edytuj przykładowy kod z podpunktu "Tworzenie węzła" komentując linię zawierającą rclpy.spin(node). Zaobserwuj efekt i zastanów się, czym jaki jest cel tej linii kodu.

  2. Bazując na przykładzie z podpunktu "Dodawanie subskrybenta" napisz subskrybenta obrazu z kamery w wersji obiektowej.

  3. Uruchom poniższy kod (możesz zastąpić camera_node). Korzystając z przedstawionego kodu dokonaj publikacji wiadomości Point, pamiętaj o tym aby go zaimportować. Publikuj na temacie o nazwie /point. Wykorzystanie timera nie jest konieczne. Dla chętnych, można przetestować w wersji z obrazem z kamery.

#!/usr/bin/env python3
import rclpy # Python Client Library for ROS 2
from rclpy.node import Node # Handles the creation of nodes
from sensor_msgs.msg import Image # Image is the message type
from cv_bridge import CvBridge # ROS2 package to convert between ROS and OpenCV Images
import cv2 # Python OpenCV library
import numpy as np


class MinimalSubscriber(Node):
    def __init__(self):
        super().__init__('minimal_subscriber')
        self.window_name = "camera"
        self.subscription = self.create_subscription(Image,'image_raw',self.listener_callback,10)
        self.subscription  # prevent unused variable warning
        self.point = None

    def listener_callback(self, image_data):
        cv_image = np.zeros((512,700,3), np.uint8)
        if(self.point is not None):
            cv2.rectangle(cv_image,self.point,(self.point[0]+200,self.point[1]+200),(0,255,0),3)
        cv2.imshow(self.window_name, cv_image)
        cv2.waitKey(25)
        cv2.setMouseCallback(self.window_name, self.draw_rectangle)

    def draw_rectangle(self, event, x, y, flags, param):
        if event == cv2.EVENT_LBUTTONDOWN: # check if mouse event is click
            self.point = (x,y)

def main(args=None):
    rclpy.init(args=args)
    minimal_subscriber = MinimalSubscriber()
    rclpy.spin(minimal_subscriber)
    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    minimal_subscriber.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()
  1. Do kodu z poprzedniego zadania dodaj parametr specyfikujący długość kwadratu (obecnie jest to 200). Wywołaj węzeł ze zmienioną wartością tego parametru.

  2. Napisz węzeł, który cyklicznie będzie publikował wiadomości na temacie /cmd_vel dla robota TurtleBot. Możesz przyjąć cel, aby robot kręcił się w kółko.

  3. Dokonaj rozbudowy programu z poprzedniego zadania. Wykorzystaj publikowany temat /point, aby robot jechał do przodu (dodania prędkość liniowa w osi x), gdy kliknięty punkt jest powyżej środka ekranu oraz zatrzymywał się w miejscu, gdy jest poniżej. Jako domyślną długość okna przyjmij tą z zadania 3 (512).