37 #include <protobuf_comm/server.h> 62 ProtobufStreamServer::Session::Session(ClientID
id, ProtobufStreamServer *parent,
63 boost::asio::io_service& io_service)
64 : id_(id), parent_(parent), socket_(io_service)
67 in_data_ = malloc(in_data_size_);
68 outbound_active_ =
false;
72 ProtobufStreamServer::Session::~Session()
74 boost::system::error_code err;
75 if (socket_.is_open()) {
76 socket_.shutdown(ip::tcp::socket::shutdown_both, err);
85 ProtobufStreamServer::Session::start_session()
87 remote_endpoint_ = socket_.remote_endpoint();
95 ProtobufStreamServer::Session::start_read()
97 boost::asio::async_read(socket_,
98 boost::asio::buffer(&in_frame_header_,
sizeof(frame_header_t)),
99 boost::bind(&ProtobufStreamServer::Session::handle_read_header,
100 shared_from_this(), boost::asio::placeholders::error));
110 ProtobufStreamServer::Session::send(uint16_t component_id, uint16_t msg_type,
111 google::protobuf::Message &m)
114 parent_->message_register().serialize(component_id, msg_type, m,
115 entry->frame_header, entry->message_header,
116 entry->serialized_message);
118 entry->buffers[0] = boost::asio::buffer(&entry->frame_header,
sizeof(frame_header_t));
119 entry->buffers[1] = boost::asio::buffer(&entry->message_header,
sizeof(message_header_t));
120 entry->buffers[2] = boost::asio::buffer(entry->serialized_message);
122 std::lock_guard<std::mutex> lock(outbound_mutex_);
123 if (outbound_active_) {
124 outbound_queue_.push(entry);
126 outbound_active_ =
true;
127 boost::asio::async_write(socket_, entry->buffers,
128 boost::bind(&ProtobufStreamServer::Session::handle_write,
130 boost::asio::placeholders::error,
131 boost::asio::placeholders::bytes_transferred,
139 ProtobufStreamServer::Session::disconnect()
141 boost::system::error_code err;
142 if (socket_.is_open()) {
143 socket_.shutdown(ip::tcp::socket::shutdown_both, err);
151 ProtobufStreamServer::Session::handle_write(
const boost::system::error_code& error,
158 std::lock_guard<std::mutex> lock(outbound_mutex_);
159 if (! outbound_queue_.empty()) {
161 outbound_queue_.pop();
162 boost::asio::async_write(socket_, entry->buffers,
163 boost::bind(&ProtobufStreamServer::Session::handle_write,
165 boost::asio::placeholders::error,
166 boost::asio::placeholders::bytes_transferred,
169 outbound_active_ =
false;
172 parent_->disconnected(shared_from_this(), error);
184 ProtobufStreamServer::Session::handle_read_header(
const boost::system::error_code& error)
187 size_t to_read = ntohl(in_frame_header_.payload_size);
188 if (to_read > in_data_size_) {
189 void *new_data = realloc(in_data_, to_read);
191 in_data_size_ = to_read;
194 parent_->disconnected(shared_from_this(),
195 errc::make_error_code(errc::not_enough_memory));
199 boost::asio::async_read(socket_,
200 boost::asio::buffer(in_data_, to_read),
201 boost::bind(&ProtobufStreamServer::Session::handle_read_message,
202 shared_from_this(), boost::asio::placeholders::error));
204 parent_->disconnected(shared_from_this(), error);
217 ProtobufStreamServer::Session::handle_read_message(
const boost::system::error_code& error)
220 message_header_t *
message_header =
static_cast<message_header_t *
>(in_data_);
222 uint16_t comp_id = ntohs(message_header->component_id);
223 uint16_t msg_type = ntohs(message_header->msg_type);
225 std::shared_ptr<google::protobuf::Message> m =
226 parent_->message_register().deserialize(in_frame_header_, *message_header,
227 (
char *)in_data_ +
sizeof(message_header_t));
228 parent_->sig_rcvd_(id_, comp_id, msg_type, m);
229 }
catch (std::runtime_error &e) {
231 parent_->sig_recv_failed_(id_, comp_id, msg_type, e.what());
235 parent_->disconnected(shared_from_this(), error);
253 acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), port))
256 own_message_register_ =
true;
259 acceptor_.set_option(socket_base::reuse_address(
true));
262 asio_thread_ = std::thread(&ProtobufStreamServer::run_asio,
this);
273 std::vector<std::string> &proto_path)
275 acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), port))
278 own_message_register_ =
true;
281 acceptor_.set_option(socket_base::reuse_address(
true));
284 asio_thread_ = std::thread(&ProtobufStreamServer::run_asio,
this);
294 acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), port)),
295 message_register_(mr), own_message_register_(false)
299 acceptor_.set_option(socket_base::reuse_address(
true));
302 asio_thread_ = std::thread(&ProtobufStreamServer::run_asio,
this);
311 if (own_message_register_) {
312 delete message_register_;
325 google::protobuf::Message &m)
327 if (sessions_.find(client) == sessions_.end()) {
328 throw std::runtime_error(
"Client does not exist");
331 sessions_[client]->send(component_id, msg_type, m);
343 std::shared_ptr<google::protobuf::Message> m)
345 send(client, component_id, msg_type, *m);
356 const google::protobuf::Descriptor *desc = m.GetDescriptor();
357 const google::protobuf::EnumDescriptor *enumdesc = desc->FindEnumTypeByName(
"CompType");
359 throw std::logic_error(
"Message does not have CompType enum");
361 const google::protobuf::EnumValueDescriptor *compdesc =
362 enumdesc->FindValueByName(
"COMP_ID");
363 const google::protobuf::EnumValueDescriptor *msgtdesc =
364 enumdesc->FindValueByName(
"MSG_TYPE");
365 if (! compdesc || ! msgtdesc) {
366 throw std::logic_error(
"Message CompType enum hs no COMP_ID or MSG_TYPE value");
368 int comp_id = compdesc->number();
369 int msg_type = msgtdesc->number();
370 if (comp_id < 0 || comp_id > std::numeric_limits<uint16_t>::max()) {
371 throw std::logic_error(
"Message has invalid COMP_ID");
373 if (msg_type < 0 || msg_type > std::numeric_limits<uint16_t>::max()) {
374 throw std::logic_error(
"Message has invalid MSG_TYPE");
377 send(client, comp_id, msg_type, m);
398 google::protobuf::Message &m)
400 std::map<ClientID, boost::shared_ptr<Session>>::iterator s;
401 for (s = sessions_.begin(); s != sessions_.end(); ++s) {
402 send(s->first, component_id, msg_type, m);
413 std::shared_ptr<google::protobuf::Message> m)
415 std::map<ClientID, boost::shared_ptr<Session>>::iterator s;
416 for (s = sessions_.begin(); s != sessions_.end(); ++s) {
417 send(s->first, component_id, msg_type, m);
427 std::map<ClientID, boost::shared_ptr<Session>>::iterator s;
428 for (s = sessions_.begin(); s != sessions_.end(); ++s) {
439 std::map<ClientID, boost::shared_ptr<Session>>::iterator s;
440 for (s = sessions_.begin(); s != sessions_.end(); ++s) {
452 if (sessions_.find(client) != sessions_.end()) {
453 boost::shared_ptr<Session> session = sessions_[client];
454 session->disconnect();
460 ProtobufStreamServer::start_accept()
462 #if defined(__GNUC__) && (__GNUC__ < 4 || (__GNUC__ == 4 && __GNUC_MINOR__ < 7)) 463 std::lock_guard<std::mutex> lock(next_cid_mutex_);
465 Session::Ptr new_session(
new Session(next_cid_++,
this, io_service_));
466 acceptor_.async_accept(new_session->socket(),
467 boost::bind(&ProtobufStreamServer::handle_accept,
this,
468 new_session, boost::asio::placeholders::error));
472 ProtobufStreamServer::disconnected(boost::shared_ptr<Session> session,
473 const boost::system::error_code &error)
475 sessions_.erase(session->id());
476 sig_disconnected_(session->id(), error);
480 ProtobufStreamServer::handle_accept(Session::Ptr new_session,
481 const boost::system::error_code& error)
484 new_session->start_session();
485 sessions_[new_session->id()] = new_session;
486 sig_connected_(new_session->id(), new_session->remote_endpoint());
487 new_session->start_read();
495 ProtobufStreamServer::run_asio()
497 #if BOOST_ASIO_VERSION > 100409 498 while (! io_service_.stopped()) {
503 #if BOOST_ASIO_VERSION > 100409 unsigned int ClientID
ID to identify connected clients.
Register to map msg type numbers to Protobuf messages.
void disconnect(ClientID client)
Disconnect specific client.
void send_to_all(uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to all clients.
ProtobufStreamServer(unsigned short port)
Constructor.
~ProtobufStreamServer()
Destructor.
void send(ClientID client, uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to the given client.
message_header_t message_header
Frame header (network byte order)