Advanced Functionalities

This section covers slightly more advanced, but useful features that enrich your implementation.

Topics and Keys

The RTPS standard contemplates the use of keys to define multiple data sources/sinks within a single topic.

There are three ways of implementing keys into your topic:

  • Defining a @Key field in the IDL file when using FastRTPSGen (see the examples that come with the distribution).
  • Manually implementing and using a getKey() method.
  • Adding the attribute Key to the member and its parents when using dynamic types (see Dynamic Topic Types).

Publishers and Subscribers using topics with keys must be configured to use them, otherwise, they will have no effect:

C++
// Publisher-Subscriber Layer configuration.
publisher_attr.topic.topicKind = WITH_KEY;
XML
<publisher profile_name="publisher_profile_qos_key">
    <topic>
        <kind>WITH_KEY</kind>
    </topic>
</publisher>

The RTPS Layer requires you to call the getKey() method manually within your callbacks.

You can tweak the History to accommodate data from multiple keys based on your current configuration. This consist of defining a maximum number of data sinks and a maximum size for each sink:

C++
// Set the subscriber to remember and store up to 3 different keys.
subscriber_attr.topic.resourceLimitsQos.max_instances = 3;
// Hold a maximum of 20 samples per key.
subscriber_attr.topic.resourceLimitsQos.max_samples_per_instance = 20;
XML
<subscriber profile_name="subscriber_profile_qos_resourcelimit">
    <topic>
        <resourceLimitsQos>
            <max_instances>3</max_instances>
            <max_samples_per_instance>20</max_samples_per_instance>
        </resourceLimitsQos>
    </topic>
</subscriber>

Note that your History must be big enough to accommodate the maximum number of samples for each key. eProsima Fast RTPS will notify you if your History is too small.

Intra-process delivery

eProsima Fast RTPS allows to speed up Intra-process communications by avoiding any copy operation involved with the transport layer. This feature is disabled by default and must be enable using XML profiles. Currently the following options are available:

  • INTRAPROCESS_OFF. Default value, the feature is disabled.
  • INTRAPROCESS_USER_DATA_ONLY. Discovery metadata keeps using ordinary transport.
  • INTRAPROCESS_FULL. Both user data and discovery metadata using Intra-process delivery.

Transports

eProsima Fast RTPS implements an architecture of pluggable transports. Current version implements four transports: UDPv4, UDPv6, TCPv4 and TCPv6. By default, when a Participant is created, one built-in UDPv4 transport is configured. You can add custom transports using the attribute rtps.userTransports.

C++
//Create a descriptor for the new transport.
auto custom_transport = std::make_shared<UDPv4TransportDescriptor>();
    custom_transport->sendBufferSize = 9216;
    custom_transport->receiveBufferSize = 9216;

//Disable the built-in Transport Layer.
participant_attr.rtps.useBuiltinTransports = false;

//Link the Transport Layer to the Participant.
participant_attr.rtps.userTransports.push_back(custom_transport);
XML
<transport_descriptors>
    <transport_descriptor>
        <transport_id>my_transport</transport_id>
        <type>UDPv4</type>
        <sendBufferSize>9216</sendBufferSize>
        <receiveBufferSize>9216</receiveBufferSize>
    </transport_descriptor>
</transport_descriptors>

<participant profile_name="my_transport">
    <rtps>
        <userTransports>
            <transport_id>my_transport</transport_id>
        </userTransports>
        <useBuiltinTransports>false</useBuiltinTransports>
    </rtps>
</participant>

All Transport configuration options can be found in the section Transport descriptors.

TCP Transport

Unlike UDP, TCP transport is connection oriented and for that Fast-RTPS must establish a TCP connection before sending the RTPS messages. Therefore TCP transport can have two behaviors, acting as a server (TCP Server) or as a client (TCP Client). The server opens a TCP port listening for incoming connections and the client tries to connect to the server. The server and the client concepts are independent from the RTPS concepts: Publisher, Subscriber, Writer, and Reader. Any of them can operate as a TCP Server or a TCP Client because these entities are used only to establish the TCP connection and the RTPS protocol works over it.

To use TCP transports you need to define some more configurations:

You must create a new TCP transport descriptor, for example TCPv4. This transport descriptor has a field named listening_ports that indicates to Fast-RTPS in which physical TCP ports our participant will listen for input connections. If omitted, the participant will not be able to receive incoming connections but will be able to connect to other participants that have configured their listening ports. The transport must be added to the userTransports list of the participant attributes. The field wan_addr can be used to allow incoming connections using the public IP in a WAN environment or the Internet. See WAN or Internet Communication over TCP/IPv4 for more information about how to configure a TCP Transport to allow or connect to WAN connections.

C++
//Create a descriptor for the new transport.
auto tcp_transport = std::make_shared<TCPv4TransportDescriptor>();
tcp_transport->add_listener_port(5100);
tcp_transport->set_WAN_address("80.80.99.45");

//Disable the built-in Transport Layer.
participant_attr.rtps.useBuiltinTransports = false;

//Link the Transport Layer to the Participant.
participant_attr.rtps.userTransports.push_back(tcp_transport);
XML
<transport_descriptors>
    <transport_descriptor>
        <transport_id>tcp_transport</transport_id>
        <type>TCPv4</type>
        <listening_ports>
            <port>5100</port>
        </listening_ports>
        <wan_addr>80.80.99.45</wan_addr>
    </transport_descriptor>
</transport_descriptors>

<participant profile_name="TCPParticipant">
    <rtps>
        <userTransports>
            <transport_id>tcp_transport</transport_id>
        </userTransports>
        <useBuiltinTransports>false</useBuiltinTransports>
    </rtps>
</participant>

To configure the participant to connect to another node through TCP, you must configure and add a Locator to its initialPeersList that points to the remote listening port.

C++
auto tcp2_transport = std::make_shared<TCPv4TransportDescriptor>();

//Disable the built-in Transport Layer.
participant_attr.rtps.useBuiltinTransports = false;

//Set initial peers.
Locator_t initial_peer_locator;
initial_peer_locator.kind = LOCATOR_KIND_TCPv4;
IPLocator::setIPv4(initial_peer_locator, "80.80.99.45");
initial_peer_locator.port = 5100;
participant_attr.rtps.builtin.initialPeersList.push_back(initial_peer_locator);

//Link the Transport Layer to the Participant.
participant_attr.rtps.userTransports.push_back(tcp2_transport);
XML
<transport_descriptors>
    <transport_descriptor>
        <transport_id>tcp2_transport</transport_id>
        <type>TCPv4</type>
    </transport_descriptor>
</transport_descriptors>

<participant profile_name="TCP2Participant">
    <rtps>
        <userTransports>
            <transport_id>tcp2_transport</transport_id>
        </userTransports>
        <builtin>
            <initialPeersList>
                <locator>
                    <tcpv4>
                        <address>80.80.99.45</address>
                        <physical_port>5100</physical_port>
                    </tcpv4>
                </locator>
            </initialPeersList>
        </builtin>
        <useBuiltinTransports>false</useBuiltinTransports>
    </rtps>
</participant>

A TCP version of helloworld example can be found in this link.

WAN or Internet Communication over TCP/IPv4

Fast-RTPS is able to connect through the Internet or other WAN networks when configured properly. To achieve this kind of scenarios, the involved network devices such as routers and firewalls should add the rules to allow the communication.

For example, to allow incoming connections through our NAT, Fast-RTPS must be configured as a TCP Server listening to incoming TCP connections. To allow incoming connections through a WAN, the TCP descriptor associated must indicate its public IP through its field wan_addr.

C++
//Create a descriptor for the new transport.
auto tcp_transport = std::make_shared<TCPv4TransportDescriptor>();
tcp_transport->add_listener_port(5100);
tcp_transport->set_WAN_address("80.80.99.45");

//Disable the built-in Transport Layer.
participant_attr.rtps.useBuiltinTransports = false;

//Link the Transport Layer to the Participant.
participant_attr.rtps.userTransports.push_back(tcp_transport);
XML
<transport_descriptors>
    <transport_descriptor>
        <transport_id>tcp_transport</transport_id>
        <type>TCPv4</type>
        <listening_ports>
            <port>5100</port>
        </listening_ports>
        <wan_addr>80.80.99.45</wan_addr>
    </transport_descriptor>
</transport_descriptors>

<participant profile_name="TCPParticipant">
    <rtps>
        <userTransports>
            <transport_id>tcp_transport</transport_id>
        </userTransports>
        <useBuiltinTransports>false</useBuiltinTransports>
    </rtps>
</participant>

In this case, configuring the router (which public IP is 80.80.99.45) is mandatory to allow the incoming traffic to reach the TCP Server. Typically a NAT routing with the listening_port 5100 to our machine is enough. Any existing firewall should be configured as well.

In the client side, it is needed to specify the public IP of the TCP Server with its listening_port as initial_peer.

C++
auto tcp2_transport = std::make_shared<TCPv4TransportDescriptor>();

//Disable the built-in Transport Layer.
participant_attr.rtps.useBuiltinTransports = false;

//Set initial peers.
Locator_t initial_peer_locator;
initial_peer_locator.kind = LOCATOR_KIND_TCPv4;
IPLocator::setIPv4(initial_peer_locator, "80.80.99.45");
initial_peer_locator.port = 5100;
participant_attr.rtps.builtin.initialPeersList.push_back(initial_peer_locator);

//Link the Transport Layer to the Participant.
participant_attr.rtps.userTransports.push_back(tcp2_transport);
XML
<transport_descriptors>
    <transport_descriptor>
        <transport_id>tcp2_transport</transport_id>
        <type>TCPv4</type>
    </transport_descriptor>
</transport_descriptors>

<participant profile_name="TCP2Participant">
    <rtps>
        <userTransports>
            <transport_id>tcp2_transport</transport_id>
        </userTransports>
        <builtin>
            <initialPeersList>
                <locator>
                    <tcpv4>
                        <address>80.80.99.45</address>
                        <physical_port>5100</physical_port>
                    </tcpv4>
                </locator>
            </initialPeersList>
        </builtin>
        <useBuiltinTransports>false</useBuiltinTransports>
    </rtps>
</participant>

The combination of the above configurations in both TCP Server and TCP Client allows a scenario similar to the represented by the following image.

_images/TCP_WAN.png

IPLocator

IPLocator is an auxiliary static class that offers methods to ease the management of IP based locators, as UDP or TCP. In TCP, the port field of the locator is divided into physical and logical port. The physical port is the port used by the network device, the real port that the operating system understands. The logical port can be seen as RTPS port, or UDP’s equivalent port (physical ports of UDP, are logical ports in TCP). Logical ports normally are not necessary to manage explicitly, but you can do it through IPLocator class. Physical ports instead, must be set to explicitly use certain ports, to allow the communication through a NAT, for example.

Locator_t locator;
// Get & Set Physical Port
uint16_t physical_port = IPLocator::getPhysicalPort(locator);
IPLocator::setPhysicalPort(locator, 5555);

// Get & Set Logical Port
uint16_t logical_port = IPLocator::getLogicalPort(locator);
IPLocator::setLogicalPort(locator, 7400);

// Set WAN Address
IPLocator::setWan(locator, "80.88.75.55");

NOTE

TCP doesn’t support multicast scenarios, so you must plan carefully your network architecture.

TLS over TCP

Fast-RTPS allows configuring a TCP Transport to use TLS (Transport Layer Security) by setting up TCP Server and TCP Client properly.

TCP Server
C++
auto tls_transport = std::make_shared<TCPv4TransportDescriptor>();

using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions;
tls_transport->apply_security = true;
tls_transport->tls_config.password = "test";
tls_transport->tls_config.cert_chain_file = "server.pem";
tls_transport->tls_config.private_key_file = "serverkey.pem";
tls_transport->tls_config.tmp_dh_file = "dh2048.pem";
tls_transport->tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS);
tls_transport->tls_config.add_option(TLSOptions::SINGLE_DH_USE);
tls_transport->tls_config.add_option(TLSOptions::NO_SSLV2);
XML
<transport_descriptors>
    <transport_descriptor>
        <transport_id>tls_transport_server</transport_id>
        <type>TCPv4</type>
        <tls>
            <password>test</password>
            <private_key_file>serverkey.pem</private_key_file>
            <cert_chain_file>server.pem</cert_chain_file>
            <tmp_dh_file>dh2048.pem</tmp_dh_file>
            <options>
                <option>DEFAULT_WORKAROUNDS</option>
                <option>SINGLE_DH_USE</option>
                <option>NO_SSLV2</option>
            </options>
        </tls>
    </transport_descriptor>
</transport_descriptors>
TCP Client
C++
auto tls_transport = std::make_shared<TCPv4TransportDescriptor>();

using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions;
using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode;
tls_transport->apply_security = true;
tls_transport->tls_config.verify_file = "ca.pem";
tls_transport->tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER;
tls_transport->tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS);
tls_transport->tls_config.add_option(TLSOptions::SINGLE_DH_USE);
tls_transport->tls_config.add_option(TLSOptions::NO_SSLV2);
XML
<transport_descriptors>
    <transport_descriptor>
        <transport_id>tls_transport_client</transport_id>
        <type>TCPv4</type>
        <tls>
            <verify_file>ca.pem</verify_file>
            <verify_mode>
                <verify>VERIFY_PEER</verify>
            </verify_mode>
            <options>
                <option>DEFAULT_WORKAROUNDS</option>
                <option>SINGLE_DH_USE</option>
                <option>NO_SSLV2</option>
            </options>
        </tls>
    </transport_descriptor>
</transport_descriptors>

More TLS related options can be found in the section Transport descriptors.

Listening locators

eProsima Fast RTPS divides listening locators into four categories:

  • Metatraffic Multicast Locators: these locators are used to receive metatraffic information using multicast. They usually are used by built-in endpoints, like the discovery of built-in endpoints. You can set your own locators using attribute rtps.builtin.metatrafficMulticastLocatorList.

    // This locator will open a socket to listen network messages on UDPv4 port 22222 over multicast address 239.255.0.1
    eprosima::fastrtps::rtps::Locator_t locator;
    IPLocator::setIPv4(locator, 239, 255, 0 , 1);
    locator.port = 22222;
    
    participant_attr.rtps.builtin.metatrafficMulticastLocatorList.push_back(locator);
    
  • Metatraffic Unicast Locators: these locators are used to receive metatraffic information using unicast. They usually are used by built-in endpoints, like the discovery of built-in endpoints. You can set your own locators using attribute rtps.builtin.metatrafficUnicastLocatorList.

    // This locator will open a socket to listen network messages on UDPv4 port 22223 over network interface 192.168.0.1
    eprosima::fastrtps::rtps::Locator_t locator;
    IPLocator::setIPv4(locator, 192, 168, 0 , 1);
    locator.port = 22223;
    
    participant_attr.rtps.builtin.metatrafficUnicastLocatorList.push_back(locator);
    
  • User Multicast Locators: these locators are used to receive user information using multicast. They are used by user endpoints. You can set your own locators using attribute rtps.defaultMulticastLocatorList.

    // This locator will open a socket to listen network messages on UDPv4 port 22224 over multicast address 239.255.0.1
    eprosima::fastrtps::rtps::Locator_t locator;
    IPLocator::setIPv4(locator, 239, 255, 0 , 1);
    locator.port = 22224;
    
    participant_attr.rtps.defaultMulticastLocatorList.push_back(locator);
    
  • User Unicast Locators: these locators are used to receive user information using unicast. They are used by user endpoints. You can set your own locators using attributes rtps.defaultUnicastLocatorList.

    // This locator will open a socket to listen network messages on UDPv4 port 22225 over network interface 192.168.0.1
    eprosima::fastrtps::rtps::Locator_t locator;
    IPLocator::setIPv4(locator, 192, 168, 0 , 1);
    locator.port = 22225;
    
    participant_attr.rtps.defaultUnicastLocatorList.push_back(locator);
    

By default eProsima Fast RTPS calculates the listening locators for the built-in UDPv4 network transport using well-known ports. These well-known ports are calculated using the following predefined rules:

Ports used
Traffic type Well-known port expression
Metatraffic multicast PB + DG * domainId + offsetd0
Metatraffic unicast PB + DG * domainId + offsetd1 + PG * participantId
User multicast PB + DG * domainId + offsetd2
User unicast PB + DG * domainId + offsetd3 + PG * participantId

These predefined rules use some values explained here:

  • DG: DomainId Gain. You can set this value using attribute rtps.port.domainIDGain. The default value is 250.
  • PG: ParticipantId Gain. You can set this value using attribute rtps.port.participantIDGain. The default value is 2.
  • PB: Port Base number. You can set this value using attribute rtps.port.portBase. The default value is 7400.
  • offsetd0, offsetd1, offsetd2, offsetd3: Additional offsets. You can set these values using attributes rtps.port.offsetdN. Default values are: offsetd0 = 0, offsetd1 = 10, offsetd2 = 1, offsetd3 = 11.

Both UDP and TCP unicast locators support to have a null address. In that case, eProsima Fast RTPS understands to get local network addresses and use them.

Both UDP and TCP locators support to have a zero port. In that case, eProsima Fast RTPS understands to calculate well-known port for that type of traffic.

Initial peers

These locators are used to know where to send initial discovery network messages. You can set your own locators using attribute rtps.builtin.initialPeersList. By default eProsima Fast RTPS uses as initial peers the Metatraffic Multicast Locators.

// This locator configures as initial peer the UDPv4 address 192.168.0.2:7600.
// Initial discovery network messages will send to this UDPv4 address.
eprosima::fastrtps::rtps::Locator_t locator;
IPLocator::setIPv4(locator, "192.168.0.2");
locator.port = 7600;

participant_attr.rtps.builtin.initialPeersList.push_back(locator);

Whitelist Interfaces

There could be situations where you want to block some network interfaces to avoid connections or sending data through them. This can be managed using the field interface whitelist in the transport descriptors, and with them, you can set the interfaces you want to use to send or receive packets. The values on this list should match the IPs of your machine in that networks. For example:

C++
UDPv4TransportDescriptor descriptor;
descriptor.interfaceWhiteList.emplace_back("127.0.0.1");
XML
<transport_descriptors>
    <transport_descriptor>
        <transport_id>CustomTransport</transport_id>
        <type>UDPv4</type>
        <interfaceWhiteList>
            <address>127.0.0.1</address>
        </interfaceWhiteList>
    </transport_descriptor>
<transport_descriptor>

Tips

Disabling all multicast traffic

C++
// Metatraffic Multicast Locator List will be empty.
// Metatraffic Unicast Locator List will contain one locator, with null address and null port.
// Then eProsima Fast RTPS will use all network interfaces to receive network messages using a well-known port.
Locator_t default_unicast_locator;
participant_attr.rtps.builtin.metatrafficUnicastLocatorList.push_back(default_unicast_locator);

// Initial peer will be UDPv4 addresss 192.168.0.1. The port will be a well-known port.
// Initial discovery network messages will be sent to this UDPv4 address.
Locator_t initial_peer;
IPLocator::setIPv4(initial_peer, 192, 168, 0, 1);
participant_attr.rtps.builtin.initialPeersList.push_back(initial_peer);
XML
<participant profile_name="disable_multicast" is_default_profile="true">
    <rtps>
        <builtin>
            <metatrafficUnicastLocatorList>
                <locator/>
            </metatrafficUnicastLocatorList>
            <initialPeersList>
                <locator>
                    <udpv4>
                        <address>192.168.0.1</address>
                    </udpv4>
                </locator>
            </initialPeersList>
        </builtin>
    </rtps>
</participant>

Non-blocking write on sockets

For UDP transport, it is possible to configure whether to use non-blocking write calls on the sockets.

C++
//Create a descriptor for the new transport.
auto non_blocking_UDP_transport = std::make_shared<UDPv4TransportDescriptor>();
non_blocking_UDP_transport->non_blocking_send = false;

//Disable the built-in Transport Layer.
participant_attr.rtps.useBuiltinTransports = false;

//Link the Transport Layer to the Participant.
participant_attr.rtps.userTransports.push_back(non_blocking_UDP_transport);
XML
<transport_descriptors>
    <transport_descriptor>
        <transport_id>non_blocking_transport</transport_id>
        <type>UDPv4</type>
        <non_blocking_send>false</non_blocking_send>
    </transport_descriptor>
</transport_descriptors>

<participant profile_name="non_blocking_transport">
    <rtps>
        <userTransports>
            <transport_id>non_blocking_transport</transport_id>
        </userTransports>
        <useBuiltinTransports>false</useBuiltinTransports>
    </rtps>
</participant>

XML Configuration

The XML profiles section contains the full information about how to setup Fast RTPS through an XML file.

Flow Controllers

eProsima Fast RTPS supports user configurable flow controllers on a Publisher and Participant level. These controllers can be used to limit the amount of data to be sent under certain conditions depending on the kind of controller implemented.

The current release implement throughput controllers, which can be used to limit the total message throughput to be sent over the network per time measurement unit. In order to use them, a descriptor must be passed into the Participant or Publisher Attributes.

C++
// Limit to 300kb per second.
ThroughputControllerDescriptor slowPublisherThroughputController{300000, 1000};
publisher_attr.throughputController = slowPublisherThroughputController;
XML
<publisher profile_name="publisher_profile_qos_flowcontroller">
    <throughputController>
        <bytesPerPeriod>300000</bytesPerPeriod>
        <periodMillisecs>1000</periodMillisecs>
    </throughputController>
</publisher>

In the Writer-Reader layer, the throughput controller is built-in and the descriptor defaults to infinite throughput. To change the values:

WriterAttributes writer_attr;
writer_attr.throughputController.bytesPerPeriod = 300000; //300kb
writer_attr.throughputController.periodMillisecs = 1000; //1000ms

//CONF-QOS-PUBLISHMODE
// Allows fragmentation.
publisher_attr.qos.m_publishMode.kind = ASYNCHRONOUS_PUBLISH_MODE;

Note that specifying a throughput controller with a size smaller than the socket size can cause messages to never become sent.

Sending large data

The default message size eProsima Fast RTPS uses is a conservative value of 65Kb. If your topic data is bigger, it must be fragmented.

Fragmented messages are sent over multiple packets, as understood by the particular transport layer. To make this possible, you must configure the Publisher to work in asynchronous mode.

C++
// Allows fragmentation.
publisher_attr.qos.m_publishMode.kind = ASYNCHRONOUS_PUBLISH_MODE;
XML
<publisher profile_name="publisher_profile_qos_publishmode">
    <qos>
        <publishMode>
            <kind>ASYNCHRONOUS</kind>
        </publishMode>
    </qos>
</publisher>

In the Writer-Subscriber layer, you have to configure the Writer:

WriterAttributes write_attr;
write_attr.mode = ASYNCHRONOUS_WRITER;    // Allows fragmentation

Note that in best-effort mode messages can be lost if you send big data too fast and the buffer is filled at a faster rate than what the client can process messages. On the other hand, in reliable mode, the existence of a lot of data fragments could decrease the frequency at which messages are received. If this happens, it can be resolved by increasing socket buffers size, as described in Increasing socket buffers size. It can also help to set a lower Heartbeat period in reliable mode, as stated in Tuning Reliable mode.

When you are sending large data, it is convenient to setup a flow controller to avoid a burst of messages in the network and increase performance. See Flow Controllers

Example: Sending a unique large file

This is a proposed example of how should the user configure its application in order to achieve the best performance. To make this example more tangible, it is going to be supposed that the file has a size of 9.9MB and the network in which the publisher and the subscriber are operating has a bandwidth of 100MB/s

First of all, the asynchronous mode has to be activated in the publisher parameters. Then, a suitable reliability mode has to be selected. In this case, it is important to make sure that all fragments of the message are received. The loss of a fragment means the loss of the entire message, so it would be best to choose the reliable mode.

The default message size of this fragments using the UDPv4 transport has a value of 65Kb (which includes the space reserved for the data and the message header). This means that the publisher would have to write at least about 1100 fragments.

This amount of fragment could slow down the transmission, so it could be interesting to decrease the heartbeat period in order to increase the reactivity of the publisher.

Another important consideration is the addition of a flow controller. Without a flow controller, the publisher can occupy the entire bandwidth. A reasonable flow controller for this application could be a limit of 5MB/s, which represents only 5% of the total bandwidth. Anyway, these values are highly dependent on the specific application and its desired behavior.

At last, there is another detail to have in mind: it is critical to check the size of the system UDP buffers. In Linux, buffers can be enlarged with

sysctl -w net.ipv4.udp_mem="102400 873800 16777216"
sysctl -w net.core.netdev_max_backlog="30000"
sysctl -w net.core.rmem_max="16777216"
sysctl -w net.core.wmem_max="16777216"

Example: Video streaming

In this example, the target application transmits video between a publisher and a subscriber. This video will have a resolution of 640x480 and a frequency of 50fps.

As in the previous example, since the application is sending data that requires fragmentation, the asynchronous mode has to be activated in the publisher parameters.

In audio or video transmissions, sometimes is better to have a stable and high datarate feed than a 100% lossless communication. Working with a frequency of 50Hz makes insignificant the loss of one or two samples each second. Thus, for a higher performance, it can be appropriate to configure the reliability mode to best-effort.

Discovery

Fast RTPS provides a discovery mechanism that allows matching automatically publishers and subscribers. The discovery mechanism is divided into two phases: Participant Discovery Phase and Endpoints Discovery Phase.

  • Participant Discovery Phase (PDP)
    Before discovering any entity of a remote participant, both participants have to meet between them. Participant Discovery Phase provides this step and is responsible for sending periodic information about itself. To know how to configure where to send this periodic information, see Initial peers. To know how to configure the announcement period and count, see Discovery Configuration. When both participants are met, is the turn of Endpoints Discovery Phase.
  • Endpoints Discovery Phase (EDP)
    This phase is responsible for sending entities information to the remote participant. Also, it has to process the entities information of the remote participant and check which entities can match between them.

There are several possible choices for the PDP strategy comprised into the enum DiscoveryProtocol_t:

  • NONE which disables the PDP discovery. This setting is only compatible with the RTPSDomain layer. User must manually match and unmatch RTPS endpoints using whatever external meta-information channel of its choice.
  • SIMPLE generates a standard participant with complete backward compatibility with any other RTPS implementation.
  • CLIENT generates a client participant, which relies on a server to be notified of other clients presence. This participant can create publishers and subscribers of any topic (static or dynamic) as ordinary participants do.
  • SERVER generates a server participant, which receives, manages and spreads its linked clients metatraffic assuring any single one is aware of the others. This participant can create publishers and subscribers of any topic (static or dynamic) as ordinary participants do. Servers can link to other servers in order to share its clients information.
  • BACKUP generates a server participant with additional functionality over SERVER. Specifically, it uses a database to backup its client information, so that if for whatever reason it disappears, it can be automatically restored and continue spreading metatraffic to late joiners. A SERVER in the same scenario ought to collect client information again, introducing a recovery delay.

For an extensive explanation of CLIENT, SERVER and BACKUP discovery strategies please refer to Discovery Server documentation. The non-SIMPLE PDP strategies were devised to cope with several scenarios where the standard PDP was unsuitable or plainly cannot be applied:

  • a high number of endpoint entities which are continuously entering and exiting a large network.
  • a network without multicasting capabilities.

Lack of multicast discovery mechanism is covered by providing one or several servers whose addresses are known beforehand by any other participant (clients). These servers centralize the distribution of meta-information (participant discovery information); thus, there is no longer need of participants exchanging discovery messages among them.

The basic mechanisms mimic to some extent the standard ones:

  • as in the standard, clients send periodically announcements of its own discovery data. But the recipients of these messages are only its servers, and only until servers don’t acknowledge clients announcements. Once a server acknowledges a client announcement, there is a reliable PDP connection established, and the client becomes a mere recipient of server’s discovery data.
  • as in the standard, clients make periodical participant liveliness announcements (lease duration). But these messages are only exchanged between a client and its servers. Thus:
  • other participants (clients or servers) demise by lease duration would be reported by the linked servers.
  • server demises by lease duration would automatically trigger the client’s announcement until the server communication is restored and acknowledges again client’s announcements.

Static Endpoints Discovery

Endpoints Discovery Phase can be replaced by a static version that doesn’t send any information. It is useful when you have a limited network bandwidth and a well-known schema of publishers and subscribers. Instead of receiving entities information for matching, this information is loaded from an XML file.

First of all, you have to disable the Endpoints Discovery Phase and enable the Static Endpoints Discovery. This can be done from the participant attributes.

participant_attr.rtps.builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol = false;
participant_attr.rtps.builtin.discovery_config.use_STATIC_EndpointDiscoveryProtocol = true;

Then, you will need to load the XML file containing the configuration of the remote participant. So, for example, if there is a remote participant with a subscriber which is waiting to receive samples from your publisher, you will need to load the configuration of this remote participant.

participant_attr.rtps.builtin.discovery_config.setStaticEndpointXMLFilename("ParticipantWithASubscriber.xml");

A basic XML configuration file for this remote participant would contain information like the name of the remote participant, the topic name and data type of the subscriber, and its entity and user-defined ID. All these values have to exactly match the parameter values used to configure the remote participant (through the class ParticipantAttributes) and its subscriber (through the class SubscriberAttributes). Missing elements will acquire default values. For example:

<staticdiscovery>
    <participant>
        <name>HelloWorldSubscriber</name>
        <reader>
            <userId>3</userId>
            <entityId>4</entityId>
            <topicName>HelloWorldTopic</topicName>
            <topicDataType>HelloWorld</topicDataType>
        </reader>
    </participant>
</staticdiscovery>

The XML that configures the participant on the other side (in this case, a subscriber) could look like this:

<staticdiscovery>
    <participant>
        <name>HelloWorldPublisher</name>
        <writer>
            <userId>1</userId>
            <entityId>2</entityId>
            <topicName>HelloWorldTopic</topicName>
            <topicDataType>HelloWorld</topicDataType>
        </writer>
    </participant>
</staticdiscovery>

You can find an example that uses Static Endpoint Discovery.

The complete list of fields for readers and writers includes the following parameters:

  • userId: numeric value.
  • entityID: numeric value.
  • expectsInlineQos: true or false. (only valid for readers)
  • topicName: text value.
  • topicDataType: text value.
  • topicKind: NO_KEY or WITH_KEY.
  • reliabilityQos: BEST_EFFORT_RELIABILITY_QOS or RELIABLE_RELIABILITY_QOS.
  • unicastLocator
    • address: text value.
    • port: numeric value.
  • multicastLocator
    • address: text value.
    • port: numeric value.
  • topic
    • name: text value.
    • data type: text value.
    • kind: text value.
  • durabilityQos: VOLATILE_DURABILITY_QOS, TRANSIENT_LOCAL_DURABILITY_QOS or TRANSIENT_DURABILITY_QOS.
  • ownershipQos
    • kind: SHARED_OWNERSHIP_QOS or EXCLUSIVE_OWNERSHIP_QOS.
  • partitionQos: text value.
  • livelinessQos
    • kind: AUTOMATIC_LIVELINESS_QOS, MANUAL_BY_PARTICIPANT_LIVELINESS_QOS or MANUAL_BY_TOPIC_LIVELINESS_QOS.
    • leaseDuration_ms: numeric value.

Subscribing to Discovery Topics

As specified in the Discovery section, the Participant or RTPS Participant has a series of meta-data endpoints for use during the discovery process. The participant listener interface includes methods which are called each time a Publisher or a Subscriber is discovered. This allows you to create your own network analysis tools.

Implementation of custom listener
class CustomParticipantListener : public eprosima::fastrtps::ParticipantListener
{
    /* Custom Listener onSubscriberDiscovery */
    void onSubscriberDiscovery(
            eprosima::fastrtps::Participant * participant,
            eprosima::fastrtps::rtps::ReaderDiscoveryInfo && info) override
    {
        (void)participant;
        switch(info.status) {
            case eprosima::fastrtps::rtps::ReaderDiscoveryInfo::DISCOVERED_READER:
                /* Process the case when a new subscriber was found in the domain */
                cout << "New subscriber for topic '" << info.info.topicName() << "' of type '" << info.info.typeName() << "' discovered";
                break;
            case eprosima::fastrtps::rtps::ReaderDiscoveryInfo::CHANGED_QOS_READER:
                /* Process the case when a subscriber changed its QOS */
                break;
            case eprosima::fastrtps::rtps::ReaderDiscoveryInfo::REMOVED_READER:
                /* Process the case when a subscriber was removed from the domain */
                cout << "Subscriber for topic '" << info.info.topicName() << "' of type '" << info.info.typeName() << "' left the domain.";
                break;
        }
    }

    /* Custom Listener onPublisherDiscovery */
    void onPublisherDiscovery(
            eprosima::fastrtps::Participant * participant,
            eprosima::fastrtps::rtps::WriterDiscoveryInfo  && info) override
    {
        (void)participant;
        switch(info.status) {
            case eprosima::fastrtps::rtps::WriterDiscoveryInfo ::DISCOVERED_WRITER:
                /* Process the case when a new publisher was found in the domain */
                cout << "New publisher for topic '" << info.info.topicName() << "' of type '" << info.info.typeName() << "' discovered";
                break;
            case eprosima::fastrtps::rtps::WriterDiscoveryInfo ::CHANGED_QOS_WRITER:
                /* Process the case when a publisher changed its QOS */
                break;
            case eprosima::fastrtps::rtps::WriterDiscoveryInfo ::REMOVED_WRITER:
                /* Process the case when a publisher was removed from the domain */
                cout << "publisher for topic '" << info.info.topicName() << "' of type '" << info.info.typeName() << "' left the domain.";
                break;
        }
    }
};
Setting the custom listener
// Create Custom user ParticipantListener (should inherit from eprosima::fastrtps::ParticipantListener.
CustomParticipantListener *listener = new CustomParticipantListener();
// Pass the listener on participant creation.
Participant* participant = Domain::createParticipant(participant_attr, listener);

The callbacks defined in the ReaderListener you attach to the EDP will execute for each data message after the built-in protocols have processed it.

Tuning

Taking advantage of multicast

For topics with several subscribers, it is recommendable to configure them to use multicast instead of unicast. By doing so, only one network package will be sent for each sample. This will improve both CPU and network usage. Multicast configuration is explained in Multicast locators.

Increasing socket buffers size

In high rate scenarios or large data scenarios, the bottleneck could be the size of the socket buffers. Network packages could be dropped because there is no space in the socket buffer. Using Reliable Reliability Fast RTPS will try to recover lost samples, but with the penalty of retransmission. Using Best-Effort Reliability samples will be definitely lost.

By default eProsima Fast RTPS creates socket buffers with the system default size, but you can modify it. sendSocketBufferSize attribute helps to increase the socket buffer used to send data. listenSocketBufferSize attribute helps to increase the socket buffer used to read data.

C++
participant_attr.rtps.sendSocketBufferSize = 1048576;
participant_attr.rtps.listenSocketBufferSize = 4194304;
XML
<participant profile_name="participant_xml_profile_qos_socketbuffers">
    <rtps>
        <sendSocketBufferSize>1048576</sendSocketBufferSize>
        <listenSocketBufferSize>4194304</listenSocketBufferSize>
    </rtps>
</participant>

Finding out system maximum values

Linux operating system sets a maximum value for socket buffer sizes. When you set in Fast RTPS a socket buffer size, your value cannot exceed the maximum value of the system.

To get these values you can use the command sysctl. Maximum buffer size value of socket buffers used to send data could be retrieved using this command:

$> sudo sysctl -a | grep net.core.wmem_max
net.core.wmem_max = 1048576

For socket buffers used to receive data the command is:

$> sudo sysctl -a | grep net.core.rmem_max
net.core.rmem_max = 4194304

If these default maximum values are not enough for you, you can also increase them.

$> echo 'net.core.wmem_max=12582912' >> /etc/sysctl.conf
$> echo 'net.core.rmem_max=12582912' >> /etc/sysctl.conf

Tuning Reliable mode

RTPS protocol can maintain reliable communication using special messages (Heartbeat and Ack/Nack messages). RTPS protocol can detect which samples are lost and re-sent them again.

You can modify the frequency these special submessages are exchanged by specifying a custom heartbeat period. The heartbeat period in the Publisher-Subscriber level is configured as part of the ParticipantAttributes:

publisher_attr.times.heartbeatPeriod.seconds = 0;
publisher_attr.times.heartbeatPeriod.nanosec = 500000000; //500 ms

In the Writer-Reader layer, this belongs to the WriterAttributes:

writer_attr.times.heartbeatPeriod.seconds = 0;
writer_attr.times.heartbeatPeriod.nanosec = 500000000; //500 ms

A smaller heartbeat period increases the number of overhead messages in the network, but speeds up the system response when a piece of data is lost.

Non-strict reliability

Using a strict reliability, configuring History kind as KEEP_ALL, determines all samples have to be received by all subscribers. This implicates a performance decrease in case a lot of samples are dropped. If you don’t need this strictness, use a non-strict reliability, i.e. configure History kind as KEEP_LAST.

Slow down sample rate

Sometimes publishers could send data in a too high rate for subscribers. This can end dropping samples. To avoid this you can slow down the rate using Flow Controllers.

Additional Quality of Service options

As a user, you can implement your own quality of service (QoS) restrictions in your application. eProsima Fast RTPS comes bundled with a set of examples of how to implement common client-wise QoS settings:

  • Ownership Strength: When multiple data sources come online, filter duplicates by focusing on the higher priority sources.
  • Filtering: Filter incoming messages based on content, time, or both.

These examples come with their own Readme.txt that explains how the implementations work.

Logging

Fast RTPS includes an extensible logging system with the following class hierarchy:

_images/logging.png

Log is the entry point of the Logging system. It exposes three macro definitions to ease its usage:

logInfo(INFO_MSG, "This is an info message");
logWarning(WARN_MSG, "This is a warning message");
logError(ERROR_MSG, "This is an error message");

In all cases, INFO_MSG, WARN_MSG and ERROR_MSG will be used as category for the log entry as a preprocessor string, so you can use define any category inline.

logInfo(NEW_CATEGORY, "This log message belong to NEW_CATEGORY category.");

You can control the verbosity of the log system and filter it by category:

Log::SetVerbosity(Log::Kind::Warning);
std::regex my_regex("NEW_CATEGORY");
Log::SetCategoryFilter(my_regex);

The possible verbosity levels are Log::Kind::Info, Log::Kind::Warning and Log::Kind::Error.

When selecting one of them, you also select the ones with more priority.

  • Selecting Log::Kind::Error, you will only receive error messages.
  • Selecting Log::Kind::Warning you select Log::Kind::Error too.
  • Selecting Log::Kind::Info will select all of them

To filter by category, you must provide a valid std::regex expression that will be applied to the category. The categories that matches the expression, will be logged.

By default, the verbosity is set to Log::Kind::Error and without category filtering.

There are some others configurable parameters:

//! Enables the reporting of filenames in log entries. Disabled by default.
RTPS_DllAPI static void ReportFilenames(bool);
//! Enables the reporting of function names in log entries. Enabled by default when supported.
RTPS_DllAPI static void ReportFunctions(bool);
//! Sets the verbosity level, allowing for messages equal or under that priority to be logged.
RTPS_DllAPI static void SetVerbosity(Log::Kind);
//! Returns the current verbosity level.
RTPS_DllAPI static Log::Kind GetVerbosity();
//! Sets a filter that will pattern-match against log categories, dropping any unmatched categories.
RTPS_DllAPI static void SetCategoryFilter    (const std::regex&);
//! Sets a filter that will pattern-match against filenames, dropping any unmatched categories.
RTPS_DllAPI static void SetFilenameFilter    (const std::regex&);
//! Sets a filter that will pattern-match against the provided error string, dropping any unmatched categories.
RTPS_DllAPI static void SetErrorStringFilter (const std::regex&);

LogConsumers

LogConsumers are classes that implement how to manage the log information. They must be registered into the Log system to be called with the log messages (after filtering).

Currently there are two LogConsumer implementations:

  • StdoutConsumer:

    Default consumer, it prints the logging messages to the standard output. It has no configuration available.

  • FileConsumer:

    It prints the logging messages to a file. It has two configuration parameters:

    • filename that defines the file where the consumer will write the log messages.
    • append that indicates to the consumer if the output file must be opened to append new content.

    By default, filename is output.log and append is equals to false.

If you want to add a consumer to manage the logs, you must call the RegisterConsumer method of the Log. To remove all consumers, including the default one, you should call the ClearConsumers method. If you want to reset the Log configuration to its defaults, including recovering the default consumer, you can call to its Reset method.

Log::ClearConsumers(); // Deactivate StdoutConsumer

// Add FileConsumer consumer
std::unique_ptr<FileConsumer> fileConsumer(new FileConsumer("append.log", true));
Log::RegisterConsumer(std::move(fileConsumer));

// Back to its defaults: StdoutConsumer will be enable and FileConsumer removed.
Log::Reset();

XML Log configuration

You can configure the logging system through xml with the tag <log> under the <dds> tag, or as an standalone file (without the <dds> tag, just <log> as root). You can set <use_default> and a set of <consumer>. Each <consumer> is defined by its <class> and a set of <property>.

<log>
    <use_default>FALSE</use_default>
    <consumer>
        <class>FileConsumer</class>
        <property>
            <name>filename</name>
            <value>test1.log</value>
        </property>
        <property>
            <name>append</name>
            <value>TRUE</value>
        </property>
    </consumer>
</log>

<use_default> indicates if we want to use the default consumer StdoutConsumer.

Each <consumer> defines a consumer that will be added to the consumers list of the Log. <class> indicates which consumer class to instantiate and the set of <property> configures it. StdoutConsumer has no properties to be configured, but FileConsumer has filename and append.

This marks the end of this document. We recommend you to take a look at the Doxygen API reference and the embedded examples that come with the distribution. If you need more help, send us an email to support@eprosima.com.