17 #ifndef IOX_POSH_ROUDI_INTROSPECTION_PORT_INTROSPECTION_HPP
18 #define IOX_POSH_ROUDI_INTROSPECTION_PORT_INTROSPECTION_HPP
20 #include "fixed_size_container.hpp"
21 #include "iceoryx_posh/iceoryx_posh_types.hpp"
22 #include "iceoryx_posh/internal/popo/ports/publisher_port_data.hpp"
23 #include "iceoryx_posh/internal/popo/ports/publisher_port_user.hpp"
24 #include "iceoryx_posh/mepoo/chunk_header.hpp"
25 #include "iceoryx_posh/roudi/introspection_types.hpp"
26 #include "iceoryx_utils/cxx/helplets.hpp"
27 #include "iceoryx_utils/cxx/method_callback.hpp"
28 #include "iceoryx_utils/internal/concurrent/periodic_task.hpp"
45 template <
typename PublisherPort,
typename SubscriberPort>
49 enum class ConnectionState
64 struct ConnectionInfo;
68 PublisherInfo() noexcept
72 PublisherInfo(
typename PublisherPort::MemberType_t& portData)
74 , process(portData.m_runtimeName)
75 , service(portData.m_serviceDescription)
76 , node(portData.m_nodeName)
80 typename PublisherPort::MemberType_t* portData{
nullptr};
81 RuntimeName_t process;
85 using TimePointNs_t = mepoo::TimePointNs_t;
86 using DurationNs_t = mepoo::DurationNs_t;
87 TimePointNs_t m_sequenceNumberTimestamp{DurationNs_t(0)};
88 mepoo::SequenceNumber_t m_sequenceNumber{0U};
91 std::map<int, ConnectionInfo*> connectionMap;
97 SubscriberInfo() noexcept =
default;
99 SubscriberInfo(
typename SubscriberPort::MemberType_t& portData)
100 : portData(&portData)
101 , process(portData.m_runtimeName)
102 , service(portData.m_serviceDescription)
103 , node(portData.m_nodeName)
107 typename SubscriberPort::MemberType_t* portData{
nullptr};
108 RuntimeName_t process;
113 struct ConnectionInfo
115 ConnectionInfo() noexcept =
default;
117 ConnectionInfo(
typename SubscriberPort::MemberType_t& portData)
118 : subscriberInfo(portData)
119 , state(ConnectionState::DEFAULT)
123 ConnectionInfo(SubscriberInfo& subscriberInfo) noexcept
124 : subscriberInfo(subscriberInfo)
125 , state(ConnectionState::DEFAULT)
129 SubscriberInfo subscriberInfo;
130 PublisherInfo* publisherInfo{
nullptr};
131 ConnectionState state{ConnectionState::DEFAULT};
133 bool isConnected()
const noexcept
135 return publisherInfo && state == ConnectionState::CONNECTED;
145 bool addPublisher(
typename PublisherPort::MemberType_t& port);
150 bool addSubscriber(
typename SubscriberPort::MemberType_t& portData);
193 template <typename T, std::enable_if_t<std::is_same<T, iox::build::OneToManyPolicy>::value>* =
nullptr>
194 PortIntrospection::ConnectionState getNextState(ConnectionState currentState,
195 capro::CaproMessageType messageType) noexcept;
202 template <typename T, std::enable_if_t<std::is_same<T, iox::build::ManyToManyPolicy>::value>* =
nullptr>
203 PortIntrospection::ConnectionState getNextState(ConnectionState currentState,
204 capro::CaproMessageType messageType) noexcept;
208 bool isNew()
const noexcept;
212 void setNew(
bool value) noexcept;
219 std::map<capro::ServiceDescription, std::map<UniquePortId, typename PublisherContainer::Index_t>>
223 std::map<capro::ServiceDescription, std::map<UniquePortId, typename ConnectionContainer::Index_t>>
229 PublisherContainer m_publisherContainer;
230 ConnectionContainer m_connectionContainer;
232 std::atomic<bool> m_newData;
253 bool addPublisher(
typename PublisherPort::MemberType_t& port);
258 bool addSubscriber(
typename SubscriberPort::MemberType_t& port);
288 PublisherPort&& publisherPortThroughput,
289 PublisherPort&& publisherPortSubscriberPortsData) noexcept;
300 void stop() noexcept;
313 void send() noexcept;
316 cxx::optional<PublisherPort> m_publisherPort;
317 cxx::optional<PublisherPort> m_publisherPortThroughput;
318 cxx::optional<PublisherPort> m_publisherPortSubscriberPortsData;
323 units::Duration m_sendInterval{units::Duration::fromSeconds(1U)};
324 concurrent::PeriodicTask<cxx::MethodCallback<void>> m_publishingTask{
335 #include "port_introspection.inl"
C'tors for CaPro messages.
Definition: capro_message.hpp:60
class for the identification of a communication event including information on the service,...
Definition: service_description.hpp:86
This class handles the port introspection for RouDi. It is recommended to use the PortIntrospectionTy...
Definition: port_introspection.hpp:47
bool removePublisher(const PublisherPort &port)
remove a publisher port from introspection
Definition: port_introspection.inl:648
void run() noexcept
start the internal send thread
Definition: port_introspection.inl:75
void stop() noexcept
stop the internal send thread
Definition: port_introspection.inl:172
void sendThroughputData() noexcept
sends the throughput data; this is used from the unittests
Definition: port_introspection.inl:122
void setSendInterval(const units::Duration interval) noexcept
set the time interval used to send new introspection data
Definition: port_introspection.inl:161
void sendSubscriberPortsData() noexcept
sends the subscriberport changing data, this is used from the unittests
Definition: port_introspection.inl:141
bool addSubscriber(typename SubscriberPort::MemberType_t &port)
add a subscriber port to be tracked by introspection
Definition: port_introspection.inl:642
void send() noexcept
calls the three specific send functions from above, this is used from the periodic task
Definition: port_introspection.inl:93
bool removeSubscriber(const SubscriberPort &port)
remove a subscriber port from introspection
Definition: port_introspection.inl:654
void reportMessage(const capro::CaproMessage &message) noexcept
report a capro message to introspection (since this could change the state of active connections)
Definition: port_introspection.inl:44
bool addPublisher(typename PublisherPort::MemberType_t &port)
add a publisher port to be tracked by introspection
Definition: port_introspection.inl:636
bool registerPublisherPort(PublisherPort &&publisherPortGeneric, PublisherPort &&publisherPortThroughput, PublisherPort &&publisherPortSubscriberPortsData) noexcept
register publisher port used to send introspection
Definition: port_introspection.inl:57
void sendPortData() noexcept
sends the port data; this is used from the unittests
Definition: port_introspection.inl:104
Definition: service_description.hpp:29
the topic for the port introspection that a user can subscribe to
Definition: introspection_types.hpp:87
the topic for the port throughput that a user can subscribe to
Definition: introspection_types.hpp:107
Definition: introspection_types.hpp:124