Skip to content

Question about “Buffer is being invalidated, segment_size may be insufficient” warning when using shm transport #6206

@XuBlack

Description

@XuBlack

Is there an already existing issue for this?

  • I have searched the existing issues

Expected behavior

When two nodes communicate with each other using SHM, no warnings should appear after the connection is established and normal communication is completed.

Current behavior

When two nodes communicate with each other using SHM, a warning appears after the connection is established and normal communication is completed, with the content being "Buffer is being invalidated, segment_size may be insufficient"

Steps to reproduce

pub code:

#include <sys/time.h>
#include <unistd.h>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/domain/qos/DomainParticipantQos.hpp>
#include <fastdds/dds/publisher/qos/PublisherQos.hpp>
#include <fastdds/rtps/transport/TCPv4TransportDescriptor.hpp>
#include <fastdds/rtps/transport/UDPv4TransportDescriptor.hpp>
#include <fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.hpp>
#include "pub.h"

namespace comm {

Publish::Publish(std::string _topic, uint32_t _domian_id = 0)
    : participanter(nullptr)
    , publisher(nullptr)
    , topic(nullptr)
    , writer(nullptr)
    , type(new PubDataPubSubType())
    , matched(0)
    , topic_name(_topic)
    , domian_id(_domian_id)
{
    struct timeval tv;
    gettimeofday(&tv, NULL);

    pubData.index(0);
    pubData.message("hello world");
    pubData.time(tv.tv_usec);
}

Publish::~Publish()
{
    if (nullptr != participanter)
    {
        // Delete DDS entities contained within the DomainParticipant
        participanter->delete_contained_entities();

        // Delete DomainParticipant
        DomainParticipantFactory::get_instance()->delete_participant(participanter);
    }
}

bool Publish::init()
{
    // step 1, create participant
    DomainParticipantQos participantQos;
    participantQos.transport().use_builtin_transports = false;
    auto udp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>();
    participantQos.transport().user_transports.push_back(udp_transport);

    auto shm_transport = std::make_shared<eprosima::fastdds::rtps::SharedMemTransportDescriptor>();
    shm_transport->segment_size(64*1024*1024);
    shm_transport->max_message_size(512*1024); 
    shm_transport->port_queue_capacity(4096);
    std::cout << "Segment size: " << shm_transport->segment_size() << " bytes" << std::endl;
    participantQos.transport().user_transports.push_back(shm_transport);

    participantQos.name("Participant_publisher");
    participanter = DomainParticipantFactory::get_shared_instance()->create_participant(domian_id, participantQos);
    if (nullptr == participanter)
    {
        return false;
    }
    // step 2, register type
    type.register_type(participanter);

    // step 3, create publisher
    PublisherQos pubQos = PUBLISHER_QOS_DEFAULT;
    participanter->get_default_publisher_qos(pubQos);
    publisher = participanter->create_publisher(pubQos, nullptr);
    if (nullptr == publisher)
    {
        return false;
    }

    // step 4, create topic
    TopicQos topicQos = TOPIC_QOS_DEFAULT;
    participanter->get_default_topic_qos(topicQos);
    topic = participanter->create_topic(topic_name, type.get_type_name(), topicQos);
    if (nullptr == topic)
    {
        return false;
    }

    // step 5, create data writer
    DataWriterQos writerQos = DATAWRITER_QOS_DEFAULT;
    publisher->get_default_datawriter_qos(writerQos);
    writerQos.reliability().kind = RELIABLE_RELIABILITY_QOS;
    writerQos.durability().kind = VOLATILE_DURABILITY_QOS;
    //writerQos.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS;
    writerQos.history().kind = KEEP_LAST_HISTORY_QOS;
    writerQos.history().depth = 5;
    writerQos.resource_limits().max_samples = 100;
    writerQos.resource_limits().max_instances = 10;
    writerQos.resource_limits().max_samples_per_instance = 10;
    writer = publisher->create_datawriter(topic, writerQos, this, StatusMask::all());
    if (nullptr == writer)
    {
        return false;
    }

    return true;
}   

void Publish::on_publication_matched(DataWriter* writer, const PublicationMatchedStatus& info)
{
     if (info.current_count_change == 1) 
    {
        matched = static_cast<int16_t>(info.current_count); 
        std::cout << "Publisher matched." << std::endl;
    }
    else if (info.current_count_change == -1)
    {
        matched = static_cast<int16_t>(info.current_count);
        std::cout << "Publisher unmatched." << std::endl;
    }
    else
    {
        std::cout << info.current_count_change
                  << " is not a valid value for PublicationMatchedStatus current count change" << std::endl;
    }
}

void Publish::pub()
{
    if (matched <= 0)
        return;
    
    struct timeval tv;
    gettimeofday(&tv, NULL);

    pubData.index(pubData.index() + 1);
    pubData.message("hello world");
    pubData.time(tv.tv_usec);

    writer->write(&pubData);

    std::cout << "Message: '" << pubData.message() 
              << "' with index: '" << pubData.index()
              << "' with time: '" << pubData.time()
              << "' SENT" << std::endl;
}

}

int main(int argc, char** argv) 
{
    Log::SetVerbosity(Log::Kind::Info);
    comm::Publish puber("xugp");
    puber.init();
    while (true)
    {
        // puber.pub();
        sleep(1);
    }
    return 0;
}

sub code:

#include "sub.h"
#include <sys/time.h>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/rtps/transport/UDPv4TransportDescriptor.hpp>
#include <fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.hpp>

namespace comm {

Subscripe::Subscripe(std::string _topic, uint32_t _domian_id = 0)
    : participanter(nullptr)
    , subscriber(nullptr)
    , topic(nullptr)
    , reader(nullptr)
    , type(new PubDataPubSubType())
    , topic_name(_topic)
    , domian_id(_domian_id)
{

}

Subscripe::~Subscripe()
{
    if (nullptr != participanter)
    {
        // Delete DDS entities contained within the DomainParticipant
        participanter->delete_contained_entities();

        // Delete DomainParticipant
        DomainParticipantFactory::get_instance()->delete_participant(participanter);
    }
}

bool Subscripe::init()
{
    // step 1, create participant
    DomainParticipantQos participantQos;
     participantQos.transport().use_builtin_transports = false;
    auto udp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>();
    participantQos.transport().user_transports.push_back(udp_transport);

    auto shm_transport = std::make_shared<eprosima::fastdds::rtps::SharedMemTransportDescriptor>();
    shm_transport->segment_size(64*1024*1024);
    shm_transport->max_message_size(512*1024); 
    shm_transport->port_queue_capacity(4096);
    std::cout << "Segment size: " << shm_transport->segment_size() << " bytes" << std::endl;
    participantQos.transport().user_transports.push_back(shm_transport);

    participantQos.name("Participant_subscriper");
    participanter = DomainParticipantFactory::get_instance()->create_participant(domian_id, participantQos);
    if (nullptr == participanter)
    {
        return false;
    }
    // step 2, register type
    type.register_type(participanter);

    // step 3, create subcriber
    SubscriberQos subQos = SUBSCRIBER_QOS_DEFAULT;
    participanter->get_default_subscriber_qos(subQos);
    subscriber = participanter->create_subscriber(subQos, nullptr);
    if (nullptr == subscriber)
    {
        return false;
    }

    // step 4, create topic
    TopicQos topicQos = TOPIC_QOS_DEFAULT;
    participanter->get_default_topic_qos(topicQos);
    topic = participanter->create_topic(topic_name, type.get_type_name(), topicQos);
    if (nullptr == topic)
    {
        return false;
    }

    // step 5, create data writer
    DataReaderQos readerQos = DATAREADER_QOS_DEFAULT;
    subscriber->get_default_datareader_qos(readerQos);
    readerQos.reliability().kind = RELIABLE_RELIABILITY_QOS;
    //readerQos.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS;
    readerQos.durability().kind = VOLATILE_DURABILITY_QOS;
    readerQos.history().kind = KEEP_LAST_HISTORY_QOS;
    readerQos.history().depth = 5;
    readerQos.resource_limits().max_samples = 100;
    readerQos.resource_limits().max_instances = 10;
    readerQos.resource_limits().max_samples_per_instance = 10;
    reader = subscriber->create_datareader(topic, readerQos, this, StatusMask::all());
    if (nullptr == reader)
    {
        return false;
    }

    return true;
} 

void Subscripe::on_subscription_matched(DataReader* reader, const SubscriptionMatchedStatus& info)
{
     if (info.current_count_change == 1)
    {
        std::cout << "Subscriber matched." << std::endl;
    }
    else if (info.current_count_change == -1)
    {
        std::cout << "Subscriber unmatched." << std::endl;
    }
    else
    {
        std::cout << info.current_count_change
                  << " is not a valid value for SubscriptionMatchedStatus current count change" << std::endl;
    }
}

void Subscripe::on_data_available(DataReader* reader)
{
    struct timeval tv;
    gettimeofday(&tv, NULL);

    SampleInfo info;
    while (RETCODE_OK == reader->take_next_sample(&subData, &info))
    {
        if ((info.instance_state == ALIVE_INSTANCE_STATE) && info.valid_data)
        {
            // Print Hello world message data
            std::cout << "Message: '" << subData.message() 
                      << "' with index: '" << subData.index()
                      << "' with time: '" << subData.time()
                      << "' RECEIVED" << tv.tv_usec << std::endl;
        }
    }
}

}

int main(int argc, char** argv) 
{
    Log::SetVerbosity(Log::Kind::Info);
    comm::Subscripe suber("xugp");
    suber.init();
    std::cin.get();
    return 0;
}

Fast DDS version/commit

The version is V3.2.2.

Platform/Architecture

Ubuntu Focal 20.04 amd64

Transport layer

Shared Memory Transport (SHM)

Additional context

I read the source code and find that there is a special design in "SharedMemTransport.cpp"

bool SharedMemTransport::init(
        const fastdds::rtps::PropertyPolicy*,
        const uint32_t& max_msg_size_no_frag)
{
    (void) max_msg_size_no_frag;

    if (configuration_.segment_size() == 0)
    {
        configuration_.segment_size(shm_default_segment_size);
    }

    if (configuration_.segment_size() < configuration_.max_message_size())
    {
        EPROSIMA_LOG_ERROR(RTPS_MSG_OUT, "max_message_size cannot be greater than segment_size");
        return false;
    }

#ifdef ANDROID
    if (access(BOOST_INTERPROCESS_SHARED_DIR_PATH, W_OK) != F_OK)
    {
        EPROSIMA_LOG_WARNING(RTPS_MSG_OUT,
                "Unable to write on " << BOOST_INTERPROCESS_SHARED_DIR_PATH << ". SHM Transport not enabled");
        return false;
    }
#endif // ifdef ANDROID

    try
    {
        shared_mem_manager_ = SharedMemManager::create(SHM_MANAGER_DOMAIN);
        if (!shared_mem_manager_)
        {
            return false;
        }
        constexpr uint32_t mean_message_size =
                shm_default_segment_size / SharedMemTransportDescriptor::shm_default_port_queue_capacity;
        uint32_t max_allocations = configuration_.segment_size() / mean_message_size;
        if (configuration_.port_queue_capacity() > max_allocations)
        {
            max_allocations = configuration_.port_queue_capacity();
        }
        shared_mem_segment_ = shared_mem_manager_->create_segment(configuration_.segment_size(), max_allocations);

        // Memset the whole segment to zero in order to force physical map of the buffer
        auto buffer = shared_mem_segment_->alloc_buffer(configuration_.segment_size(),
                        (std::chrono::steady_clock::now() + std::chrono::milliseconds(100)));
        memset(buffer->data(), 0, configuration_.segment_size());
        buffer.reset();

        if (!configuration_.rtps_dump_file().empty())
        {
            auto packets_file_consumer = std::unique_ptr<SHMPacketFileConsumer>(
                new SHMPacketFileConsumer(configuration_.rtps_dump_file()));

            packet_logger_ = std::make_shared<PacketsLog<SHMPacketFileConsumer>>(0, configuration_.dump_thread());
            packet_logger_->RegisterConsumer(std::move(packets_file_consumer));
        }
    }
    catch (std::exception& e)
    {
        EPROSIMA_LOG_ERROR(RTPS_MSG_OUT, e.what());
        return false;
    }

    return true;
}

The codes show that it will create a buffer to force physical map.

// Memset the whole segment to zero in order to force physical map of the buffer
auto buffer = shared_mem_segment_->alloc_buffer(configuration_.segment_size(),
                (std::chrono::steady_clock::now() + std::chrono::milliseconds(100)));
memset(buffer->data(), 0, configuration_.segment_size());
buffer.reset();

It allocate all the segment_size to the buffer. However, it doesn't release the free_bytes_ in shared_mem_segment_. When memory is reallocated in the function copy_to_shared_buffer, the allocation will fail and generate the warning "Buffer is being invalidated, segment_size may be insufficient".

std::shared_ptr<SharedMemManager::Buffer> SharedMemTransport::copy_to_shared_buffer(
        const std::vector<NetworkBuffer>& buffers,
        uint32_t total_bytes,
        const std::chrono::steady_clock::time_point& max_blocking_time_point)
{
    using namespace eprosima::fastdds::statistics::rtps;

    assert(shared_mem_segment_);

    // Statistics submessage is always the last buffer to be added
    // If statistics message is present, skip last buffer
    auto it_end = remove_statistics_buffer(buffers.back(), total_bytes) ? std::prev(buffers.end()) : buffers.end();

    std::shared_ptr<SharedMemManager::Buffer> shared_buffer =
            shared_mem_segment_->alloc_buffer(total_bytes, max_blocking_time_point);
    uint8_t* pos = static_cast<uint8_t*>(shared_buffer->data());

    for (auto it = buffers.begin(); it != it_end; ++it)
    {
        // Direct copy from the const_buffer to the mutable shared_buffer
        memcpy(pos, (it->buffer), it->size);
        pos += it->size;
    }

    return shared_buffer;
}

When the physical memory is forced out, should the buffer be released to reset the free_bytes_ in shared_mem_segment_

XML configuration file

Relevant log output

Network traffic capture

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    triageIssue pending classification

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions