37 #include <protobuf_comm/client.h> 39 #include <boost/lexical_cast.hpp> 57 ProtobufStreamClient::ProtobufStreamClient()
58 : resolver_(io_service_), socket_(io_service_), io_service_work_(io_service_)
61 own_message_register_ =
true;
63 outbound_active_ =
false;
65 frame_header_version_ = PB_FRAME_V2;
67 in_frame_header_ = malloc(in_frame_header_size_);
68 in_data_ = malloc(in_data_size_);
78 : resolver_(io_service_), socket_(io_service_), io_service_work_(io_service_)
81 own_message_register_ =
true;
83 outbound_active_ =
false;
85 in_data_ = malloc(in_data_size_);
86 frame_header_version_ = PB_FRAME_V2;
88 in_frame_header_ = malloc(in_frame_header_size_);
98 frame_header_version_t header_version)
99 : resolver_(io_service_), socket_(io_service_), io_service_work_(io_service_),
100 message_register_(mr), own_message_register_(false),
101 frame_header_version_(header_version)
104 outbound_active_ =
false;
105 in_data_size_ = 1024;
106 in_data_ = malloc(in_data_size_);
107 if (frame_header_version_ == PB_FRAME_V1) {
112 in_frame_header_ = malloc(in_frame_header_size_);
124 free(in_frame_header_);
125 if (own_message_register_) {
126 delete message_register_;
130 #if defined(__GNUC__) && (__GNUC__ < 4 || (__GNUC__ == 4 && __GNUC_MINOR__ < 6)) 131 static void run_asio_thread(boost::asio::io_service &io_service)
132 { io_service.run(); }
136 ProtobufStreamClient::run_asio()
138 #if defined(__GNUC__) && (__GNUC__ < 4 || (__GNUC__ == 4 && __GNUC_MINOR__ < 6)) 139 asio_thread_ = std::thread(run_asio_thread, std::ref(io_service_));
141 asio_thread_ = std::thread([
this]() { this->io_service_.run(); });
156 ip::tcp::resolver::query query(host, boost::lexical_cast<std::string>(port));
157 resolver_.async_resolve(query,
158 boost::bind(&ProtobufStreamClient::handle_resolve,
this,
159 boost::asio::placeholders::error,
160 boost::asio::placeholders::iterator));
166 ProtobufStreamClient::handle_resolve(
const boost::system::error_code& err,
167 ip::tcp::resolver::iterator endpoint_iterator)
172 #if BOOST_ASIO_VERSION > 100409 173 boost::asio::async_connect(socket_, endpoint_iterator,
175 socket_.async_connect(*endpoint_iterator,
177 boost::bind(&ProtobufStreamClient::handle_connect,
this,
178 boost::asio::placeholders::error));
181 sig_disconnected_(err);
186 ProtobufStreamClient::handle_connect(
const boost::system::error_code &err)
189 #if BOOST_VERSION >= 105400 && BOOST_VERSION < 105500 194 boost::system::error_code ec;
195 socket_.remote_endpoint(ec);
196 if (ec == boost::system::errc::not_connected) {
198 sig_disconnected_(ec);
207 sig_disconnected_(err);
212 ProtobufStreamClient::disconnect_nosig()
214 boost::system::error_code err;
215 if (socket_.is_open()) {
216 socket_.shutdown(ip::tcp::socket::shutdown_both, err);
228 sig_disconnected_(boost::system::error_code());
233 ProtobufStreamClient::start_recv()
235 boost::asio::async_read(socket_,
236 boost::asio::buffer(in_frame_header_, in_frame_header_size_),
237 boost::bind(&ProtobufStreamClient::handle_read_header,
238 this, boost::asio::placeholders::error));
242 ProtobufStreamClient::handle_read_header(
const boost::system::error_code& error)
246 if (frame_header_version_ == PB_FRAME_V1) {
253 if (to_read > in_data_size_) {
254 void *new_data = realloc(in_data_, to_read);
256 in_data_size_ = to_read;
260 sig_disconnected_(errc::make_error_code(errc::not_enough_memory));
264 boost::asio::async_read(socket_,
265 boost::asio::buffer(in_data_, to_read),
266 boost::bind(&ProtobufStreamClient::handle_read_message,
267 this, boost::asio::placeholders::error));
270 sig_disconnected_(error);
275 ProtobufStreamClient::handle_read_message(
const boost::system::error_code& error)
282 if (frame_header_version_ == PB_FRAME_V1) {
285 frame_header.
cipher = PB_ENCRYPTION_NONE;
297 data = (
char *)in_data_ +
sizeof(message_header);
301 uint16_t msg_type = ntohs(message_header.
msg_type);
303 std::shared_ptr<google::protobuf::Message> m =
304 message_register_->
deserialize(frame_header, message_header, data);
306 sig_rcvd_(comp_id, msg_type, m);
307 }
catch (std::runtime_error &e) {
308 sig_recv_failed_(comp_id, msg_type, e.what());
314 sig_disconnected_(error);
319 ProtobufStreamClient::handle_write(
const boost::system::error_code& error,
326 std::lock_guard<std::mutex> lock(outbound_mutex_);
327 if (! outbound_queue_.empty()) {
329 outbound_queue_.pop();
330 boost::asio::async_write(socket_, entry->
buffers,
331 boost::bind(&ProtobufStreamClient::handle_write,
this,
332 boost::asio::placeholders::error,
333 boost::asio::placeholders::bytes_transferred,
336 outbound_active_ =
false;
340 sig_disconnected_(error);
352 google::protobuf::Message &m)
355 throw std::runtime_error(
"Cannot send while not connected");
359 message_register_->
serialize(component_id, msg_type, m,
363 if (frame_header_version_ == PB_FRAME_V1) {
370 entry->
buffers[1] = boost::asio::const_buffer();
377 std::lock_guard<std::mutex> lock(outbound_mutex_);
378 if (outbound_active_) {
379 outbound_queue_.push(entry);
381 outbound_active_ =
true;
382 boost::asio::async_write(socket_, entry->
buffers,
383 boost::bind(&ProtobufStreamClient::handle_write,
this,
384 boost::asio::placeholders::error,
385 boost::asio::placeholders::bytes_transferred,
398 const google::protobuf::Descriptor *desc = m.GetDescriptor();
399 const google::protobuf::EnumDescriptor *enumdesc = desc->FindEnumTypeByName(
"CompType");
401 throw std::logic_error(
"Message does not have CompType enum");
403 const google::protobuf::EnumValueDescriptor *compdesc =
404 enumdesc->FindValueByName(
"COMP_ID");
405 const google::protobuf::EnumValueDescriptor *msgtdesc =
406 enumdesc->FindValueByName(
"MSG_TYPE");
407 if (! compdesc || ! msgtdesc) {
408 throw std::logic_error(
"Message CompType enum hs no COMP_ID or MSG_TYPE value");
410 int comp_id = compdesc->number();
411 int msg_type = msgtdesc->number();
412 if (comp_id < 0 || comp_id > std::numeric_limits<uint16_t>::max()) {
413 throw std::logic_error(
"Message has invalid COMP_ID");
415 if (msg_type < 0 || msg_type > std::numeric_limits<uint16_t>::max()) {
416 throw std::logic_error(
"Message has invalid MSG_TYPE");
419 send(comp_id, msg_type, m);
~ProtobufStreamClient()
Destructor.
Register to map msg type numbers to Protobuf messages.
std::shared_ptr< google::protobuf::Message > deserialize(frame_header_t &frame_header, message_header_t &message_header, void *data)
Deserialize message.
ProtobufStreamClient()
Constructor.
std::array< boost::asio::const_buffer, 3 > buffers
outgoing buffers
void disconnect()
Disconnect from remote host.
void serialize(uint16_t component_id, uint16_t msg_type, google::protobuf::Message &msg, frame_header_t &frame_header, message_header_t &message_header, std::string &data)
Serialize a message.
frame_header_t frame_header
Frame header (network byte order), never encrypted.
void send(uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to the server.
void async_connect(const char *host, unsigned short port)
Asynchronous connect.
frame_header_v1_t frame_header_v1
Frame header (network byte order), never encrypted.
message_header_t message_header
Frame header (network byte order)
std::string serialized_message
serialized protobuf message