iceoryx_doc  1.0.1
port_introspection.hpp
1 // Copyright (c) 2019 - 2020 by Robert Bosch GmbH. All rights reserved.
2 // Copyright (c) 2020 - 2021 by Apex.AI Inc. All rights reserved.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 // SPDX-License-Identifier: Apache-2.0
17 #ifndef IOX_POSH_ROUDI_INTROSPECTION_PORT_INTROSPECTION_HPP
18 #define IOX_POSH_ROUDI_INTROSPECTION_PORT_INTROSPECTION_HPP
19 
20 #include "fixed_size_container.hpp"
21 #include "iceoryx_posh/iceoryx_posh_types.hpp"
22 #include "iceoryx_posh/internal/popo/ports/publisher_port_data.hpp"
23 #include "iceoryx_posh/internal/popo/ports/publisher_port_user.hpp"
24 #include "iceoryx_posh/mepoo/chunk_header.hpp"
25 #include "iceoryx_posh/roudi/introspection_types.hpp"
26 #include "iceoryx_utils/cxx/helplets.hpp"
27 #include "iceoryx_utils/cxx/method_callback.hpp"
28 #include "iceoryx_utils/internal/concurrent/periodic_task.hpp"
29 
30 #include <atomic>
31 #include <mutex>
32 #include <vector>
33 
34 #include <map>
35 
36 namespace iox
37 {
38 namespace roudi
39 {
45 template <typename PublisherPort, typename SubscriberPort>
47 {
48  private:
49  enum class ConnectionState
50  {
51  DEFAULT,
52  SUB_REQUESTED,
53  CONNECTED
54  };
55 
58 
59  class PortData
60  {
61  private:
63 
64  struct ConnectionInfo;
65 
66  struct PublisherInfo
67  {
68  PublisherInfo() noexcept
69  {
70  }
71 
72  PublisherInfo(typename PublisherPort::MemberType_t& portData)
73  : portData(&portData)
74  , process(portData.m_runtimeName)
75  , service(portData.m_serviceDescription)
76  , node(portData.m_nodeName)
77  {
78  }
79 
80  typename PublisherPort::MemberType_t* portData{nullptr};
81  RuntimeName_t process;
83  NodeName_t node;
84 
85  using TimePointNs_t = mepoo::TimePointNs_t;
86  using DurationNs_t = mepoo::DurationNs_t;
87  TimePointNs_t m_sequenceNumberTimestamp{DurationNs_t(0)};
88  mepoo::SequenceNumber_t m_sequenceNumber{0U};
89 
91  std::map<int, ConnectionInfo*> connectionMap;
92  int index{-1};
93  };
94 
95  struct SubscriberInfo
96  {
97  SubscriberInfo() noexcept = default;
98 
99  SubscriberInfo(typename SubscriberPort::MemberType_t& portData)
100  : portData(&portData)
101  , process(portData.m_runtimeName)
102  , service(portData.m_serviceDescription)
103  , node(portData.m_nodeName)
104  {
105  }
106 
107  typename SubscriberPort::MemberType_t* portData{nullptr};
108  RuntimeName_t process;
110  NodeName_t node;
111  };
112 
113  struct ConnectionInfo
114  {
115  ConnectionInfo() noexcept = default;
116 
117  ConnectionInfo(typename SubscriberPort::MemberType_t& portData)
118  : subscriberInfo(portData)
119  , state(ConnectionState::DEFAULT)
120  {
121  }
122 
123  ConnectionInfo(SubscriberInfo& subscriberInfo) noexcept
124  : subscriberInfo(subscriberInfo)
125  , state(ConnectionState::DEFAULT)
126  {
127  }
128 
129  SubscriberInfo subscriberInfo;
130  PublisherInfo* publisherInfo{nullptr};
131  ConnectionState state{ConnectionState::DEFAULT};
132 
133  bool isConnected() const noexcept
134  {
135  return publisherInfo && state == ConnectionState::CONNECTED;
136  }
137  };
138 
139  public:
140  PortData() noexcept;
141 
145  bool addPublisher(typename PublisherPort::MemberType_t& port);
146 
150  bool addSubscriber(typename SubscriberPort::MemberType_t& portData);
151 
156  bool removePublisher(const PublisherPort& port);
157 
162  bool removeSubscriber(const SubscriberPort& port);
163 
168  bool updateConnectionState(const capro::CaproMessage& message) noexcept;
169 
178  bool updateSubscriberConnectionState(const capro::CaproMessage& message, const UniquePortId& id);
179 
182  void prepareTopic(PortIntrospectionTopic& topic) noexcept;
183 
184  void prepareTopic(PortThroughputIntrospectionTopic& topic) noexcept;
185 
186  void prepareTopic(SubscriberPortChangingIntrospectionFieldTopic& topic) noexcept;
187 
193  template <typename T, std::enable_if_t<std::is_same<T, iox::build::OneToManyPolicy>::value>* = nullptr>
194  PortIntrospection::ConnectionState getNextState(ConnectionState currentState,
195  capro::CaproMessageType messageType) noexcept;
196 
202  template <typename T, std::enable_if_t<std::is_same<T, iox::build::ManyToManyPolicy>::value>* = nullptr>
203  PortIntrospection::ConnectionState getNextState(ConnectionState currentState,
204  capro::CaproMessageType messageType) noexcept;
205 
208  bool isNew() const noexcept;
209 
212  void setNew(bool value) noexcept;
213 
214  private:
215  using PublisherContainer = FixedSizeContainer<PublisherInfo, MAX_PUBLISHERS>;
216  using ConnectionContainer = FixedSizeContainer<ConnectionInfo, MAX_SUBSCRIBERS>;
217 
219  std::map<capro::ServiceDescription, std::map<UniquePortId, typename PublisherContainer::Index_t>>
220  m_publisherMap;
221 
223  std::map<capro::ServiceDescription, std::map<UniquePortId, typename ConnectionContainer::Index_t>>
224  m_connectionMap;
225 
229  PublisherContainer m_publisherContainer;
230  ConnectionContainer m_connectionContainer;
231 
232  std::atomic<bool> m_newData;
233  std::mutex m_mutex;
234  };
235 
236  // end of helper classes
237 
238  public:
239  PortIntrospection() noexcept;
240 
241  ~PortIntrospection() noexcept;
242 
243  // delete copy constructor and assignment operator
244  PortIntrospection(PortIntrospection const&) = delete;
245  PortIntrospection& operator=(PortIntrospection const&) = delete;
246  // delete move constructor and assignment operator
248  PortIntrospection& operator=(PortIntrospection&&) = delete;
249 
253  bool addPublisher(typename PublisherPort::MemberType_t& port);
254 
258  bool addSubscriber(typename SubscriberPort::MemberType_t& port);
259 
264  bool removePublisher(const PublisherPort& port);
265 
270  bool removeSubscriber(const SubscriberPort& port);
271 
274  void reportMessage(const capro::CaproMessage& message) noexcept;
275 
282  void reportMessage(const capro::CaproMessage& message, const UniquePortId& id);
283 
287  bool registerPublisherPort(PublisherPort&& publisherPortGeneric,
288  PublisherPort&& publisherPortThroughput,
289  PublisherPort&& publisherPortSubscriberPortsData) noexcept;
290 
293  void setSendInterval(const units::Duration interval) noexcept;
294 
295 
297  void run() noexcept;
298 
300  void stop() noexcept;
301 
302  protected:
304  void sendPortData() noexcept;
305 
307  void sendThroughputData() noexcept;
308 
310  void sendSubscriberPortsData() noexcept;
311 
313  void send() noexcept;
314 
315  protected:
316  cxx::optional<PublisherPort> m_publisherPort;
317  cxx::optional<PublisherPort> m_publisherPortThroughput;
318  cxx::optional<PublisherPort> m_publisherPortSubscriberPortsData;
319 
320  private:
321  PortData m_portData;
322 
323  units::Duration m_sendInterval{units::Duration::fromSeconds(1U)};
324  concurrent::PeriodicTask<cxx::MethodCallback<void>> m_publishingTask{
325  concurrent::PeriodicTaskManualStart, "PortIntr", *this, &PortIntrospection::send};
326 };
327 
331 
332 } // namespace roudi
333 } // namespace iox
334 
335 #include "port_introspection.inl"
336 
337 #endif // IOX_POSH_ROUDI_INTROSPECTION_PORT_INTROSPECTION_HPP
C'tors for CaPro messages.
Definition: capro_message.hpp:60
class for the identification of a communication event including information on the service,...
Definition: service_description.hpp:86
This class handles the port introspection for RouDi. It is recommended to use the PortIntrospectionTy...
Definition: port_introspection.hpp:47
bool removePublisher(const PublisherPort &port)
remove a publisher port from introspection
Definition: port_introspection.inl:648
void run() noexcept
start the internal send thread
Definition: port_introspection.inl:75
void stop() noexcept
stop the internal send thread
Definition: port_introspection.inl:172
void sendThroughputData() noexcept
sends the throughput data; this is used from the unittests
Definition: port_introspection.inl:122
void setSendInterval(const units::Duration interval) noexcept
set the time interval used to send new introspection data
Definition: port_introspection.inl:161
void sendSubscriberPortsData() noexcept
sends the subscriberport changing data, this is used from the unittests
Definition: port_introspection.inl:141
bool addSubscriber(typename SubscriberPort::MemberType_t &port)
add a subscriber port to be tracked by introspection
Definition: port_introspection.inl:642
void send() noexcept
calls the three specific send functions from above, this is used from the periodic task
Definition: port_introspection.inl:93
bool removeSubscriber(const SubscriberPort &port)
remove a subscriber port from introspection
Definition: port_introspection.inl:654
void reportMessage(const capro::CaproMessage &message) noexcept
report a capro message to introspection (since this could change the state of active connections)
Definition: port_introspection.inl:44
bool addPublisher(typename PublisherPort::MemberType_t &port)
add a publisher port to be tracked by introspection
Definition: port_introspection.inl:636
bool registerPublisherPort(PublisherPort &&publisherPortGeneric, PublisherPort &&publisherPortThroughput, PublisherPort &&publisherPortSubscriberPortsData) noexcept
register publisher port used to send introspection
Definition: port_introspection.inl:57
void sendPortData() noexcept
sends the port data; this is used from the unittests
Definition: port_introspection.inl:104
Definition: service_description.hpp:29
the topic for the port introspection that a user can subscribe to
Definition: introspection_types.hpp:87
the topic for the port throughput that a user can subscribe to
Definition: introspection_types.hpp:107
Definition: introspection_types.hpp:124