-
Notifications
You must be signed in to change notification settings - Fork 878
Description
Is there an already existing issue for this?
- I have searched the existing issues
Expected behavior
When two nodes communicate with each other using SHM, no warnings should appear after the connection is established and normal communication is completed.
Current behavior
When two nodes communicate with each other using SHM, a warning appears after the connection is established and normal communication is completed, with the content being "Buffer is being invalidated, segment_size may be insufficient"
Steps to reproduce
pub code:
#include <sys/time.h>
#include <unistd.h>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/domain/qos/DomainParticipantQos.hpp>
#include <fastdds/dds/publisher/qos/PublisherQos.hpp>
#include <fastdds/rtps/transport/TCPv4TransportDescriptor.hpp>
#include <fastdds/rtps/transport/UDPv4TransportDescriptor.hpp>
#include <fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.hpp>
#include "pub.h"
namespace comm {
Publish::Publish(std::string _topic, uint32_t _domian_id = 0)
: participanter(nullptr)
, publisher(nullptr)
, topic(nullptr)
, writer(nullptr)
, type(new PubDataPubSubType())
, matched(0)
, topic_name(_topic)
, domian_id(_domian_id)
{
struct timeval tv;
gettimeofday(&tv, NULL);
pubData.index(0);
pubData.message("hello world");
pubData.time(tv.tv_usec);
}
Publish::~Publish()
{
if (nullptr != participanter)
{
// Delete DDS entities contained within the DomainParticipant
participanter->delete_contained_entities();
// Delete DomainParticipant
DomainParticipantFactory::get_instance()->delete_participant(participanter);
}
}
bool Publish::init()
{
// step 1, create participant
DomainParticipantQos participantQos;
participantQos.transport().use_builtin_transports = false;
auto udp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>();
participantQos.transport().user_transports.push_back(udp_transport);
auto shm_transport = std::make_shared<eprosima::fastdds::rtps::SharedMemTransportDescriptor>();
shm_transport->segment_size(64*1024*1024);
shm_transport->max_message_size(512*1024);
shm_transport->port_queue_capacity(4096);
std::cout << "Segment size: " << shm_transport->segment_size() << " bytes" << std::endl;
participantQos.transport().user_transports.push_back(shm_transport);
participantQos.name("Participant_publisher");
participanter = DomainParticipantFactory::get_shared_instance()->create_participant(domian_id, participantQos);
if (nullptr == participanter)
{
return false;
}
// step 2, register type
type.register_type(participanter);
// step 3, create publisher
PublisherQos pubQos = PUBLISHER_QOS_DEFAULT;
participanter->get_default_publisher_qos(pubQos);
publisher = participanter->create_publisher(pubQos, nullptr);
if (nullptr == publisher)
{
return false;
}
// step 4, create topic
TopicQos topicQos = TOPIC_QOS_DEFAULT;
participanter->get_default_topic_qos(topicQos);
topic = participanter->create_topic(topic_name, type.get_type_name(), topicQos);
if (nullptr == topic)
{
return false;
}
// step 5, create data writer
DataWriterQos writerQos = DATAWRITER_QOS_DEFAULT;
publisher->get_default_datawriter_qos(writerQos);
writerQos.reliability().kind = RELIABLE_RELIABILITY_QOS;
writerQos.durability().kind = VOLATILE_DURABILITY_QOS;
//writerQos.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS;
writerQos.history().kind = KEEP_LAST_HISTORY_QOS;
writerQos.history().depth = 5;
writerQos.resource_limits().max_samples = 100;
writerQos.resource_limits().max_instances = 10;
writerQos.resource_limits().max_samples_per_instance = 10;
writer = publisher->create_datawriter(topic, writerQos, this, StatusMask::all());
if (nullptr == writer)
{
return false;
}
return true;
}
void Publish::on_publication_matched(DataWriter* writer, const PublicationMatchedStatus& info)
{
if (info.current_count_change == 1)
{
matched = static_cast<int16_t>(info.current_count);
std::cout << "Publisher matched." << std::endl;
}
else if (info.current_count_change == -1)
{
matched = static_cast<int16_t>(info.current_count);
std::cout << "Publisher unmatched." << std::endl;
}
else
{
std::cout << info.current_count_change
<< " is not a valid value for PublicationMatchedStatus current count change" << std::endl;
}
}
void Publish::pub()
{
if (matched <= 0)
return;
struct timeval tv;
gettimeofday(&tv, NULL);
pubData.index(pubData.index() + 1);
pubData.message("hello world");
pubData.time(tv.tv_usec);
writer->write(&pubData);
std::cout << "Message: '" << pubData.message()
<< "' with index: '" << pubData.index()
<< "' with time: '" << pubData.time()
<< "' SENT" << std::endl;
}
}
int main(int argc, char** argv)
{
Log::SetVerbosity(Log::Kind::Info);
comm::Publish puber("xugp");
puber.init();
while (true)
{
// puber.pub();
sleep(1);
}
return 0;
}
sub code:
#include "sub.h"
#include <sys/time.h>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/rtps/transport/UDPv4TransportDescriptor.hpp>
#include <fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.hpp>
namespace comm {
Subscripe::Subscripe(std::string _topic, uint32_t _domian_id = 0)
: participanter(nullptr)
, subscriber(nullptr)
, topic(nullptr)
, reader(nullptr)
, type(new PubDataPubSubType())
, topic_name(_topic)
, domian_id(_domian_id)
{
}
Subscripe::~Subscripe()
{
if (nullptr != participanter)
{
// Delete DDS entities contained within the DomainParticipant
participanter->delete_contained_entities();
// Delete DomainParticipant
DomainParticipantFactory::get_instance()->delete_participant(participanter);
}
}
bool Subscripe::init()
{
// step 1, create participant
DomainParticipantQos participantQos;
participantQos.transport().use_builtin_transports = false;
auto udp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>();
participantQos.transport().user_transports.push_back(udp_transport);
auto shm_transport = std::make_shared<eprosima::fastdds::rtps::SharedMemTransportDescriptor>();
shm_transport->segment_size(64*1024*1024);
shm_transport->max_message_size(512*1024);
shm_transport->port_queue_capacity(4096);
std::cout << "Segment size: " << shm_transport->segment_size() << " bytes" << std::endl;
participantQos.transport().user_transports.push_back(shm_transport);
participantQos.name("Participant_subscriper");
participanter = DomainParticipantFactory::get_instance()->create_participant(domian_id, participantQos);
if (nullptr == participanter)
{
return false;
}
// step 2, register type
type.register_type(participanter);
// step 3, create subcriber
SubscriberQos subQos = SUBSCRIBER_QOS_DEFAULT;
participanter->get_default_subscriber_qos(subQos);
subscriber = participanter->create_subscriber(subQos, nullptr);
if (nullptr == subscriber)
{
return false;
}
// step 4, create topic
TopicQos topicQos = TOPIC_QOS_DEFAULT;
participanter->get_default_topic_qos(topicQos);
topic = participanter->create_topic(topic_name, type.get_type_name(), topicQos);
if (nullptr == topic)
{
return false;
}
// step 5, create data writer
DataReaderQos readerQos = DATAREADER_QOS_DEFAULT;
subscriber->get_default_datareader_qos(readerQos);
readerQos.reliability().kind = RELIABLE_RELIABILITY_QOS;
//readerQos.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS;
readerQos.durability().kind = VOLATILE_DURABILITY_QOS;
readerQos.history().kind = KEEP_LAST_HISTORY_QOS;
readerQos.history().depth = 5;
readerQos.resource_limits().max_samples = 100;
readerQos.resource_limits().max_instances = 10;
readerQos.resource_limits().max_samples_per_instance = 10;
reader = subscriber->create_datareader(topic, readerQos, this, StatusMask::all());
if (nullptr == reader)
{
return false;
}
return true;
}
void Subscripe::on_subscription_matched(DataReader* reader, const SubscriptionMatchedStatus& info)
{
if (info.current_count_change == 1)
{
std::cout << "Subscriber matched." << std::endl;
}
else if (info.current_count_change == -1)
{
std::cout << "Subscriber unmatched." << std::endl;
}
else
{
std::cout << info.current_count_change
<< " is not a valid value for SubscriptionMatchedStatus current count change" << std::endl;
}
}
void Subscripe::on_data_available(DataReader* reader)
{
struct timeval tv;
gettimeofday(&tv, NULL);
SampleInfo info;
while (RETCODE_OK == reader->take_next_sample(&subData, &info))
{
if ((info.instance_state == ALIVE_INSTANCE_STATE) && info.valid_data)
{
// Print Hello world message data
std::cout << "Message: '" << subData.message()
<< "' with index: '" << subData.index()
<< "' with time: '" << subData.time()
<< "' RECEIVED" << tv.tv_usec << std::endl;
}
}
}
}
int main(int argc, char** argv)
{
Log::SetVerbosity(Log::Kind::Info);
comm::Subscripe suber("xugp");
suber.init();
std::cin.get();
return 0;
}
Fast DDS version/commit
The version is V3.2.2.
Platform/Architecture
Ubuntu Focal 20.04 amd64
Transport layer
Shared Memory Transport (SHM)
Additional context
I read the source code and find that there is a special design in "SharedMemTransport.cpp"
bool SharedMemTransport::init(
const fastdds::rtps::PropertyPolicy*,
const uint32_t& max_msg_size_no_frag)
{
(void) max_msg_size_no_frag;
if (configuration_.segment_size() == 0)
{
configuration_.segment_size(shm_default_segment_size);
}
if (configuration_.segment_size() < configuration_.max_message_size())
{
EPROSIMA_LOG_ERROR(RTPS_MSG_OUT, "max_message_size cannot be greater than segment_size");
return false;
}
#ifdef ANDROID
if (access(BOOST_INTERPROCESS_SHARED_DIR_PATH, W_OK) != F_OK)
{
EPROSIMA_LOG_WARNING(RTPS_MSG_OUT,
"Unable to write on " << BOOST_INTERPROCESS_SHARED_DIR_PATH << ". SHM Transport not enabled");
return false;
}
#endif // ifdef ANDROID
try
{
shared_mem_manager_ = SharedMemManager::create(SHM_MANAGER_DOMAIN);
if (!shared_mem_manager_)
{
return false;
}
constexpr uint32_t mean_message_size =
shm_default_segment_size / SharedMemTransportDescriptor::shm_default_port_queue_capacity;
uint32_t max_allocations = configuration_.segment_size() / mean_message_size;
if (configuration_.port_queue_capacity() > max_allocations)
{
max_allocations = configuration_.port_queue_capacity();
}
shared_mem_segment_ = shared_mem_manager_->create_segment(configuration_.segment_size(), max_allocations);
// Memset the whole segment to zero in order to force physical map of the buffer
auto buffer = shared_mem_segment_->alloc_buffer(configuration_.segment_size(),
(std::chrono::steady_clock::now() + std::chrono::milliseconds(100)));
memset(buffer->data(), 0, configuration_.segment_size());
buffer.reset();
if (!configuration_.rtps_dump_file().empty())
{
auto packets_file_consumer = std::unique_ptr<SHMPacketFileConsumer>(
new SHMPacketFileConsumer(configuration_.rtps_dump_file()));
packet_logger_ = std::make_shared<PacketsLog<SHMPacketFileConsumer>>(0, configuration_.dump_thread());
packet_logger_->RegisterConsumer(std::move(packets_file_consumer));
}
}
catch (std::exception& e)
{
EPROSIMA_LOG_ERROR(RTPS_MSG_OUT, e.what());
return false;
}
return true;
}
The codes show that it will create a buffer to force physical map.
// Memset the whole segment to zero in order to force physical map of the buffer
auto buffer = shared_mem_segment_->alloc_buffer(configuration_.segment_size(),
(std::chrono::steady_clock::now() + std::chrono::milliseconds(100)));
memset(buffer->data(), 0, configuration_.segment_size());
buffer.reset();
It allocate all the segment_size to the buffer. However, it doesn't release the free_bytes_ in shared_mem_segment_. When memory is reallocated in the function copy_to_shared_buffer, the allocation will fail and generate the warning "Buffer is being invalidated, segment_size may be insufficient".
std::shared_ptr<SharedMemManager::Buffer> SharedMemTransport::copy_to_shared_buffer(
const std::vector<NetworkBuffer>& buffers,
uint32_t total_bytes,
const std::chrono::steady_clock::time_point& max_blocking_time_point)
{
using namespace eprosima::fastdds::statistics::rtps;
assert(shared_mem_segment_);
// Statistics submessage is always the last buffer to be added
// If statistics message is present, skip last buffer
auto it_end = remove_statistics_buffer(buffers.back(), total_bytes) ? std::prev(buffers.end()) : buffers.end();
std::shared_ptr<SharedMemManager::Buffer> shared_buffer =
shared_mem_segment_->alloc_buffer(total_bytes, max_blocking_time_point);
uint8_t* pos = static_cast<uint8_t*>(shared_buffer->data());
for (auto it = buffers.begin(); it != it_end; ++it)
{
// Direct copy from the const_buffer to the mutable shared_buffer
memcpy(pos, (it->buffer), it->size);
pos += it->size;
}
return shared_buffer;
}
When the physical memory is forced out, should the buffer be released to reset the free_bytes_ in shared_mem_segment_
XML configuration file
Relevant log output
Network traffic capture
No response