Eclipse SUMO - Simulation of Urban MObility
Connection.cpp
Go to the documentation of this file.
1 /****************************************************************************/
2 // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.org/sumo
3 // Copyright (C) 2012-2022 German Aerospace Center (DLR) and others.
4 // This program and the accompanying materials are made available under the
5 // terms of the Eclipse Public License 2.0 which is available at
6 // https://www.eclipse.org/legal/epl-2.0/
7 // This Source Code may also be made available under the following Secondary
8 // Licenses when the conditions for such availability set forth in the Eclipse
9 // Public License 2.0 are satisfied: GNU General Public License, version 2
10 // or later which is available at
11 // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12 // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13 /****************************************************************************/
21 // C++ TraCI client API implementation
22 /****************************************************************************/
23 #include <config.h>
24 
25 #include <thread>
26 #include <chrono>
27 #include <array>
28 #include <libsumo/StorageHelper.h>
29 #include <libsumo/TraCIDefs.h>
30 #include "Connection.h"
31 
32 
33 namespace libtraci {
34 // ===========================================================================
35 // static member initializations
36 // ===========================================================================
37 Connection* Connection::myActive = nullptr;
38 std::map<const std::string, Connection*> Connection::myConnections;
39 
40 
41 // ===========================================================================
42 // member method definitions
43 // ===========================================================================
44 #ifdef _MSC_VER
45 /* Disable "decorated name length exceeded, name was truncated" warnings for the whole file. */
46 #pragma warning(disable: 4503)
47 #endif
48 Connection::Connection(const std::string& host, int port, int numRetries, const std::string& label, FILE* const pipe) :
49  myLabel(label), myProcessPipe(pipe), myProcessReader(nullptr), mySocket(host, port) {
50  if (pipe != nullptr) {
51  myProcessReader = new std::thread(&Connection::readOutput, this);
52  }
53  for (int i = 0; i <= numRetries; i++) {
54  try {
55  mySocket.connect();
56  break;
57  } catch (tcpip::SocketException& e) {
58  if (i == numRetries) {
59  close();
60  throw;
61  }
62  std::cout << "Could not connect to TraCI server at " << host << ":" << port << " " << e.what() << std::endl;
63  std::cout << " Retrying in 1 second" << std::endl;
64  std::this_thread::sleep_for(std::chrono::seconds(1));
65  }
66  }
67 }
68 
69 
70 void
72  std::array<char, 256> buffer;
73  bool errout = false;
74  while (fgets(buffer.data(), (int)buffer.size(), myProcessPipe) != nullptr) {
75  std::stringstream result;
76  result << buffer.data();
77  std::string line;
78  while (std::getline(result, line)) {
79  if ((errout && line[0] == ' ') || line.compare(0, 6, "Error:") == 0 || line.compare(0, 8, "Warning:") == 0) {
80  std::cerr << line << std::endl;
81  errout = true;
82  } else {
83  std::cout << line << std::endl;
84  errout = false;
85  }
86  }
87  }
88 }
89 
90 
91 void
94  tcpip::Storage outMsg;
95  // command length
96  outMsg.writeUnsignedByte(1 + 1);
97  // command id
99  mySocket.sendExact(outMsg);
100 
101  tcpip::Storage inMsg;
102  std::string acknowledgement;
103  check_resultState(inMsg, libsumo::CMD_CLOSE, false, &acknowledgement);
104  mySocket.close();
105  }
106  if (myProcessReader != nullptr) {
107  myProcessReader->join();
108  delete myProcessReader;
109  myProcessReader = nullptr;
110 #ifdef WIN32
111  _pclose(myProcessPipe);
112 #else
113  pclose(myProcessPipe);
114 #endif
115  }
116  myConnections.erase(myLabel);
117  delete myActive;
118  myActive = nullptr;
119 }
120 
121 
122 void
124  tcpip::Storage outMsg;
125  // command length
126  outMsg.writeUnsignedByte(1 + 1 + 8);
127  // command id
129  outMsg.writeDouble(time);
130  // send request message
131  mySocket.sendExact(outMsg);
132 
133  tcpip::Storage inMsg;
135  mySubscriptionResults.clear();
137  int numSubs = inMsg.readInt();
138  while (numSubs-- > 0) {
139  const int responseID = check_commandGetResult(inMsg, 0, -1, true);
142  readVariableSubscription(responseID, inMsg);
143  } else {
144  readContextSubscription(responseID, inMsg);
145  }
146  }
147 }
148 
149 
150 void
152  tcpip::Storage outMsg;
153  // command length
154  outMsg.writeUnsignedByte(1 + 1 + 4);
155  // command id
157  // client index
158  outMsg.writeInt(order);
159  mySocket.sendExact(outMsg);
160 
161  tcpip::Storage inMsg;
163 }
164 
165 
166 void
167 Connection::createCommand(int cmdID, int varID, const std::string* const objID, tcpip::Storage* add) const {
169  throw libsumo::FatalTraCIError("Not connected.");
170  }
171  myOutput.reset();
172  // command length
173  int length = 1 + 1;
174  if (varID >= 0) {
175  length += 1;
176  if (objID != nullptr) {
177  length += 4 + (int)objID->length();
178  }
179  }
180  if (add != nullptr) {
181  length += (int)add->size();
182  }
183  if (length <= 255) {
184  myOutput.writeUnsignedByte(length);
185  } else {
187  myOutput.writeInt(length + 4);
188  }
190  if (varID >= 0) {
192  if (objID != nullptr) {
193  myOutput.writeString(*objID);
194  }
195  }
196  // additional values
197  if (add != nullptr) {
198  myOutput.writeStorage(*add);
199  }
200 }
201 
202 
203 void
204 Connection::subscribe(int domID, const std::string& objID, double beginTime, double endTime,
205  int domain, double range, const std::vector<int>& vars, const libsumo::TraCIResults& params) {
207  throw tcpip::SocketException("Socket is not initialised");
208  }
209  const bool isContext = domain != -1;
210  tcpip::Storage outMsg;
211  outMsg.writeUnsignedByte(domID); // command id
212  outMsg.writeDouble(beginTime);
213  outMsg.writeDouble(endTime);
214  outMsg.writeString(objID);
215  if (isContext) {
216  outMsg.writeUnsignedByte(domain);
217  outMsg.writeDouble(range);
218  }
219  if (vars.size() == 1 && vars.front() == -1) {
220  if (domID == libsumo::CMD_SUBSCRIBE_VEHICLE_VARIABLE && !isContext) {
221  // default for vehicles is edge id and lane position
222  outMsg.writeUnsignedByte(2);
225  } else {
226  // default for detectors is vehicle number, for all others (and contexts) id list
227  outMsg.writeUnsignedByte(1);
228  const bool isDetector = domID == libsumo::CMD_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
234  }
235  } else {
236  outMsg.writeUnsignedByte((int)vars.size());
237  for (const int v : vars) {
238  outMsg.writeUnsignedByte(v);
239  const auto& paramEntry = params.find(v);
240  if (paramEntry != params.end()) {
241  outMsg.writeStorage(*libsumo::StorageHelper::toStorage(*paramEntry->second));
242  }
243  }
244  }
245  tcpip::Storage complete;
246  complete.writeUnsignedByte(0);
247  complete.writeInt(5 + (int)outMsg.size());
248  complete.writeStorage(outMsg);
249  // send message
250  mySocket.sendExact(complete);
251 
252  tcpip::Storage inMsg;
253  check_resultState(inMsg, domID);
254  if (!vars.empty()) {
255  const int responseID = check_commandGetResult(inMsg, domID);
256  if (isContext) {
257  readContextSubscription(responseID, inMsg);
258  } else {
259  readVariableSubscription(responseID, inMsg);
260  }
261  }
262 }
263 
264 
265 void
266 Connection::check_resultState(tcpip::Storage& inMsg, int command, bool ignoreCommandId, std::string* acknowledgement) {
267  mySocket.receiveExact(inMsg);
268  int cmdLength;
269  int cmdId;
270  int resultType;
271  int cmdStart;
272  std::string msg;
273  try {
274  cmdStart = inMsg.position();
275  cmdLength = inMsg.readUnsignedByte();
276  cmdId = inMsg.readUnsignedByte();
277  if (command != cmdId && !ignoreCommandId) {
278  throw libsumo::TraCIException("#Error: received status response to command: " + toString(cmdId) + " but expected: " + toString(command));
279  }
280  resultType = inMsg.readUnsignedByte();
281  msg = inMsg.readString();
282  } catch (std::invalid_argument&) {
283  throw libsumo::TraCIException("#Error: an exception was thrown while reading result state message");
284  }
285  switch (resultType) {
286  case libsumo::RTYPE_ERR:
287  throw libsumo::TraCIException(msg);
289  throw libsumo::TraCIException(".. Sent command is not implemented (" + toString(command) + "), [description: " + msg + "]");
290  case libsumo::RTYPE_OK:
291  if (acknowledgement != nullptr) {
292  (*acknowledgement) = ".. Command acknowledged (" + toString(command) + "), [description: " + msg + "]";
293  }
294  break;
295  default:
296  throw libsumo::TraCIException(".. Answered with unknown result code(" + toString(resultType) + ") to command(" + toString(command) + "), [description: " + msg + "]");
297  }
298  if ((cmdStart + cmdLength) != (int) inMsg.position()) {
299  throw libsumo::TraCIException("#Error: command at position " + toString(cmdStart) + " has wrong length");
300  }
301 }
302 
303 
304 int
305 Connection::check_commandGetResult(tcpip::Storage& inMsg, int command, int expectedType, bool ignoreCommandId) const {
306  int length = inMsg.readUnsignedByte();
307  if (length == 0) {
308  length = inMsg.readInt();
309  }
310  int cmdId = inMsg.readUnsignedByte();
311  if (!ignoreCommandId && cmdId != (command + 0x10)) {
312  throw libsumo::TraCIException("#Error: received response with command id: " + toString(cmdId) + "but expected: " + toString(command + 0x10));
313  }
314  if (expectedType >= 0) {
315  // not called from the TraCITestClient but from within the Connection
316  inMsg.readUnsignedByte(); // variableID
317  inMsg.readString(); // objectID
318  int valueDataType = inMsg.readUnsignedByte();
319  if (valueDataType != expectedType) {
320  throw libsumo::TraCIException("Expected " + toString(expectedType) + " but got " + toString(valueDataType));
321  }
322  }
323  return cmdId;
324 }
325 
326 
328 Connection::doCommand(int command, int var, const std::string& id, tcpip::Storage* add) {
329  createCommand(command, var, &id, add);
331  myInput.reset();
332  check_resultState(myInput, command);
333  return myInput;
334 }
335 
336 
337 void
341  myInput.reset();
343 }
344 
345 
346 void
347 Connection::readVariables(tcpip::Storage& inMsg, const std::string& objectID, int variableCount, libsumo::SubscriptionResults& into) {
348  while (variableCount > 0) {
349 
350  const int variableID = inMsg.readUnsignedByte();
351  const int status = inMsg.readUnsignedByte();
352  const int type = inMsg.readUnsignedByte();
353 
354  if (status == libsumo::RTYPE_OK) {
355  switch (type) {
357  into[objectID][variableID] = std::make_shared<libsumo::TraCIDouble>(inMsg.readDouble());
358  break;
360  into[objectID][variableID] = std::make_shared<libsumo::TraCIString>(inMsg.readString());
361  break;
362  case libsumo::POSITION_2D: {
363  auto p = std::make_shared<libsumo::TraCIPosition>();
364  p->x = inMsg.readDouble();
365  p->y = inMsg.readDouble();
366  p->z = 0.;
367  into[objectID][variableID] = p;
368  break;
369  }
370  case libsumo::POSITION_3D: {
371  auto p = std::make_shared<libsumo::TraCIPosition>();
372  p->x = inMsg.readDouble();
373  p->y = inMsg.readDouble();
374  p->z = inMsg.readDouble();
375  into[objectID][variableID] = p;
376  break;
377  }
378  case libsumo::TYPE_COLOR: {
379  auto c = std::make_shared<libsumo::TraCIColor>();
380  c->r = (unsigned char)inMsg.readUnsignedByte();
381  c->g = (unsigned char)inMsg.readUnsignedByte();
382  c->b = (unsigned char)inMsg.readUnsignedByte();
383  c->a = (unsigned char)inMsg.readUnsignedByte();
384  into[objectID][variableID] = c;
385  break;
386  }
388  into[objectID][variableID] = std::make_shared<libsumo::TraCIInt>(inMsg.readInt());
389  break;
391  auto sl = std::make_shared<libsumo::TraCIStringList>();
392  int n = inMsg.readInt();
393  for (int i = 0; i < n; ++i) {
394  sl->value.push_back(inMsg.readString());
395  }
396  into[objectID][variableID] = sl;
397  }
398  break;
399  case libsumo::TYPE_COMPOUND: {
400  int n = inMsg.readInt();
401  if (n == 2) {
402  inMsg.readUnsignedByte();
403  const std::string s = inMsg.readString();
404  const int secondType = inMsg.readUnsignedByte();
405  if (secondType == libsumo::TYPE_DOUBLE) {
406  auto r = std::make_shared<libsumo::TraCIRoadPosition>();
407  r->edgeID = s;
408  r->pos = inMsg.readDouble();
409  into[objectID][variableID] = r;
410  } else if (secondType == libsumo::TYPE_STRING) {
411  auto sl = std::make_shared<libsumo::TraCIStringList>();
412  sl->value.push_back(s);
413  sl->value.push_back(inMsg.readString());
414  into[objectID][variableID] = sl;
415  }
416  }
417  }
418  break;
419 
420  // TODO Other data types
421 
422  default:
423  throw libsumo::TraCIException("Unimplemented subscription type: " + toString(type));
424  }
425  } else {
426  throw libsumo::TraCIException("Subscription response error: variableID=" + toString(variableID) + " status=" + toString(status));
427  }
428 
429  variableCount--;
430  }
431 }
432 
433 
434 void
436  const std::string objectID = inMsg.readString();
437  const int variableCount = inMsg.readUnsignedByte();
438  readVariables(inMsg, objectID, variableCount, mySubscriptionResults[responseID]);
439 }
440 
441 
442 void
444  const std::string contextID = inMsg.readString();
445  inMsg.readUnsignedByte(); // context domain
446  const int variableCount = inMsg.readUnsignedByte();
447  int numObjects = inMsg.readInt();
448  // the following also instantiates the empty map to get comparable results with libsumo
449  // see also https://github.com/eclipse/sumo/issues/7288
450  libsumo::SubscriptionResults& results = myContextSubscriptionResults[responseID][contextID];
451  while (numObjects-- > 0) {
452  std::string objectID = inMsg.readString();
453  readVariables(inMsg, objectID, variableCount, results);
454  }
455 }
456 
457 
458 }
459 
460 
461 /****************************************************************************/
An error which is not recoverable.
Definition: TraCIDefs.h:149
static std::shared_ptr< tcpip::Storage > toStorage(const TraCIResult &v)
Definition: StorageHelper.h:33
An error which allows to continue.
Definition: TraCIDefs.h:138
void simulationStep(double time)
Sends a SimulationStep command.
Definition: Connection.cpp:123
Connection(const std::string &host, int port, int numRetries, const std::string &label, FILE *const pipe)
Constructor, connects to the specified SUMO server.
Definition: Connection.cpp:48
void close()
ends the simulation and closes the connection
Definition: Connection.cpp:92
void createCommand(int cmdID, int varID, const std::string *const objID, tcpip::Storage *add=nullptr) const
Sends a GetVariable / SetVariable request if mySocket is connected. Otherwise writes to myOutput only...
Definition: Connection.cpp:167
int check_commandGetResult(tcpip::Storage &inMsg, int command, int expectedType=-1, bool ignoreCommandId=false) const
Validates the result state of a command.
Definition: Connection.cpp:305
void addFilter(int var, tcpip::Storage *add=nullptr)
Definition: Connection.cpp:338
void readVariableSubscription(int responseID, tcpip::Storage &inMsg)
Definition: Connection.cpp:435
tcpip::Socket mySocket
The socket.
Definition: Connection.h:167
std::map< int, libsumo::SubscriptionResults > mySubscriptionResults
Definition: Connection.h:173
void check_resultState(tcpip::Storage &inMsg, int command, bool ignoreCommandId=false, std::string *acknowledgement=0)
Validates the result state of a command.
Definition: Connection.cpp:266
tcpip::Storage myInput
The reusable input storage.
Definition: Connection.h:171
FILE *const myProcessPipe
Definition: Connection.h:164
void readVariables(tcpip::Storage &inMsg, const std::string &objectID, int variableCount, libsumo::SubscriptionResults &into)
Definition: Connection.cpp:347
std::map< int, libsumo::ContextSubscriptionResults > myContextSubscriptionResults
Definition: Connection.h:174
tcpip::Storage myOutput
The reusable output storage.
Definition: Connection.h:169
void setOrder(int order)
Sends a SetOrder command.
Definition: Connection.cpp:151
void subscribe(int domID, const std::string &objID, double beginTime, double endTime, int domain, double range, const std::vector< int > &vars, const libsumo::TraCIResults &params)
Sends a SubscribeContext or a SubscribeVariable request.
Definition: Connection.cpp:204
tcpip::Storage & doCommand(int command, int var, const std::string &id, tcpip::Storage *add=nullptr)
Definition: Connection.cpp:328
static std::map< const std::string, Connection * > myConnections
Definition: Connection.h:177
const std::string myLabel
Definition: Connection.h:163
void readContextSubscription(int responseID, tcpip::Storage &inMsg)
Definition: Connection.cpp:443
static Connection * myActive
Definition: Connection.h:176
static std::string toString(const T &t, std::streamsize accuracy=PRECISION)
Definition: Connection.h:145
std::thread * myProcessReader
Definition: Connection.h:165
bool receiveExact(Storage &)
Receive a complete TraCI message from Socket::socket_.
Definition: socket.cpp:536
void sendExact(const Storage &)
Definition: socket.cpp:439
bool has_client_connection() const
Definition: socket.cpp:568
void connect()
Connects to host_:port_.
Definition: socket.cpp:367
void close()
Definition: socket.cpp:391
virtual std::string readString()
Definition: storage.cpp:180
virtual void writeString(const std::string &s)
Definition: storage.cpp:197
virtual unsigned int position() const
Definition: storage.cpp:76
virtual void writeInt(int)
Definition: storage.cpp:321
virtual void writeDouble(double)
Definition: storage.cpp:354
virtual int readUnsignedByte()
Definition: storage.cpp:155
void reset()
Definition: storage.cpp:85
virtual void writeUnsignedByte(int)
Definition: storage.cpp:165
StorageType::size_type size() const
Definition: storage.h:119
virtual void writeStorage(tcpip::Storage &store)
Definition: storage.cpp:388
virtual double readDouble()
Definition: storage.cpp:362
virtual int readInt()
Definition: storage.cpp:311
TRACI_CONST int TYPE_COLOR
TRACI_CONST int LAST_STEP_VEHICLE_NUMBER
TRACI_CONST int POSITION_3D
TRACI_CONST int RTYPE_NOTIMPLEMENTED
TRACI_CONST int TRACI_ID_LIST
TRACI_CONST int VAR_ROAD_ID
TRACI_CONST int TYPE_COMPOUND
TRACI_CONST int RESPONSE_SUBSCRIBE_PARKINGAREA_VARIABLE
TRACI_CONST int RESPONSE_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
TRACI_CONST int POSITION_2D
TRACI_CONST int RESPONSE_SUBSCRIBE_OVERHEADWIRE_VARIABLE
TRACI_CONST int CMD_CLOSE
TRACI_CONST int CMD_SETORDER
TRACI_CONST int TYPE_STRINGLIST
TRACI_CONST int TYPE_INTEGER
TRACI_CONST int RESPONSE_SUBSCRIBE_BUSSTOP_VARIABLE
TRACI_CONST int CMD_ADD_SUBSCRIPTION_FILTER
std::map< std::string, libsumo::TraCIResults > SubscriptionResults
{object->{variable->value}}
Definition: TraCIDefs.h:300
TRACI_CONST int VAR_LANEPOSITION
TRACI_CONST int CMD_SUBSCRIBE_VEHICLE_VARIABLE
TRACI_CONST int TYPE_DOUBLE
TRACI_CONST int CMD_SUBSCRIBE_LANEAREA_VARIABLE
TRACI_CONST int CMD_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
TRACI_CONST int CMD_SUBSCRIBE_MULTIENTRYEXIT_VARIABLE
TRACI_CONST int RTYPE_ERR
TRACI_CONST int CMD_SIMSTEP
TRACI_CONST int CMD_SUBSCRIBE_LANE_VARIABLE
TRACI_CONST int RTYPE_OK
std::map< int, std::shared_ptr< libsumo::TraCIResult > > TraCIResults
{variable->value}
Definition: TraCIDefs.h:298
TRACI_CONST int CMD_SUBSCRIBE_EDGE_VARIABLE
TRACI_CONST int TYPE_STRING