-
Notifications
You must be signed in to change notification settings - Fork 58
Expand file tree
/
Copy pathNodeSharedPrivate.hh
More file actions
206 lines (164 loc) · 7.61 KB
/
NodeSharedPrivate.hh
File metadata and controls
206 lines (164 loc) · 7.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
/*
* Copyright (C) 2017 Open Source Robotics Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GZ_TRANSPORT_NODESHAREDPRIVATE_HH_
#define GZ_TRANSPORT_NODESHAREDPRIVATE_HH_
#include <zmq.hpp>
#include <atomic>
#include <map>
#include <memory>
#include <queue>
#include <string>
#include <vector>
#include "gz/transport/Discovery.hh"
#include "gz/transport/Node.hh"
namespace gz
{
namespace transport
{
// Inline bracket to help doxygen filtering.
inline namespace GZ_TRANSPORT_VERSION_NAMESPACE {
//
/// \brief Metadata for a publication. This is sent as part of the ZMQ
/// message for topic statistics.
class PublicationMetadata
{
/// \brief Publication timestamp.
public: uint64_t stamp = 0;
/// \brief Sequence number, used to detect dropped messages.
public: uint64_t seq = 0;
};
//
// Private data class for NodeShared.
class NodeSharedPrivate
{
// Constructor
public: NodeSharedPrivate() :
context(new zmq::context_t(1)),
publisher(new zmq::socket_t(*context, ZMQ_PUB)),
subscriber(new zmq::socket_t(*context, ZMQ_SUB)),
requester(new zmq::socket_t(*context, ZMQ_ROUTER)),
responseReceiver(new zmq::socket_t(*context, ZMQ_ROUTER)),
replier(new zmq::socket_t(*context, ZMQ_ROUTER))
{
}
/// \brief Initialize security
public: void SecurityInit();
/// \brief Handle new secure connections
public: void SecurityOnNewConnection();
/// \brief Access control handler for plain security.
/// This function is designed to be run in a thread.
public: void AccessControlHandler();
/// \brief Get and validate a non-negative environment variable.
/// \param[in] _envVar The name of the environment variable to get.
/// \param[in] _defaultValue The default value returned in case the
/// environment variable is invalid (e.g.: invalid number,
/// negative number).
/// \return The value read from the environment variable or the default
/// value if the validation wasn't succeed.
public: int NonNegativeEnvVar(const std::string &_envVar,
int _defaultValue) const;
//////////////////////////////////////////////////
/////// Declare here the ZMQ Context ///////
//////////////////////////////////////////////////
/// \brief 0MQ context. Always declare this object before any ZMQ socket
/// to make sure that the context is destroyed after all sockets.
public: std::unique_ptr<zmq::context_t> context;
//////////////////////////////////////////////////
/////// Declare here all ZMQ sockets ///////
//////////////////////////////////////////////////
/// \brief ZMQ socket to send topic updates.
public: std::unique_ptr<zmq::socket_t> publisher;
/// \brief ZMQ socket to receive topic updates.
public: std::unique_ptr<zmq::socket_t> subscriber;
/// \brief ZMQ socket for sending service call requests.
public: std::unique_ptr<zmq::socket_t> requester;
/// \brief ZMQ socket for receiving service call responses.
public: std::unique_ptr<zmq::socket_t> responseReceiver;
/// \brief ZMQ socket to receive service call requests.
public: std::unique_ptr<zmq::socket_t> replier;
/// \brief Thread the handle access control
public: std::thread accessControlThread;
//////////////////////////////////////////////////
/////// Declare here the discovery object ///////
//////////////////////////////////////////////////
/// \brief Discovery service (messages).
public: std::unique_ptr<MsgDiscovery> msgDiscovery;
/// \brief Discovery service (services).
public: std::unique_ptr<SrvDiscovery> srvDiscovery;
//////////////////////////////////////////////////
/////// Other private member variables ///////
//////////////////////////////////////////////////
/// \brief When true, the reception thread will finish.
public: std::atomic<bool> exit = false;
/// \brief Timeout used for receiving messages (ms.).
public: inline static const int Timeout = 250;
////////////////////////////////////////////////////////////////
/////// The following is for asynchronous publication of ///////
/////// messages to local subscribers. ///////
////////////////////////////////////////////////////////////////
/// \brief Encapsulates information needed to publish a message. An
/// instance of this class is pushed onto a publish queue, pubQueue, when
/// a message is published through Node::Publisher::Publish.
/// The pubThread processes the pubQueue in the
/// NodeSharedPrivate::PublishThread function.
///
/// A producer-consumer mechanism is used to send messages so that
/// Node::Publisher::Publish function does not block while executing
/// local subscriber callbacks.
public: struct PublishMsgDetails
{
/// \brief All the local subscription handlers.
public: std::vector<ISubscriptionHandlerPtr> localHandlers;
/// \brief All the raw handlers.
public: std::vector<RawSubscriptionHandlerPtr> rawHandlers;
/// \brief Buffer for the raw handlers.
public: std::unique_ptr<char[]> sharedBuffer = nullptr;
/// \brief Msg copy for the local handlers.
public: std::unique_ptr<ProtoMsg> msgCopy = nullptr;
/// \brief Message size.
// cppcheck-suppress unusedStructMember
public: std::size_t msgSize = 0;
/// \brief Information about the topic and type.
public: MessageInfo info;
};
/// \brief Publish thread used to process the pubQueue.
public: std::thread pubThread;
/// \brief Mutex to protect the pubThread and pubQueue.
public: std::mutex pubThreadMutex;
/// \brief Queue onto which new messages are pushed. The pubThread
/// will pop off the messages and send them to local subscribers.
public: std::queue<std::unique_ptr<PublishMsgDetails>> pubQueue;
/// \brief used to signal when new work is available
public: std::condition_variable signalNewPub;
/// \brief Handles local publication of messages on the pubQueue.
public: void PublishThread();
/// \brief Topic publication sequence numbers.
public: std::map<std::string, uint64_t> topicPubSeq;
/// \brief True if topic statistics have been enabled.
public: bool topicStatsEnabled = false;
/// \brief Statistics for a topic. The key in the map is the topic
/// name and the value contains the topic statistics.
public: std::map<std::string, TopicStatistics> topicStats;
/// \brief Set of topics that have statistics enabled.
public: std::map<std::string,
std::function<void(const TopicStatistics &_stats)>>
enabledTopicStatistics;
};
}
}
}
#endif