m_raw_channel.cc
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "m_raw_channel.h"
00024 #include "semaphore.h"
00025 #include "data.h"
00026 #include "protocol.h"
00027 #include "protostructs.h"
00028 #include "packet.h"
00029 #include "endian.h"
00030 #include "error.h"
00031 #include "usbwrap.h"
00032 #include "controller.h"
00033 #include <stdexcept>
00034 #include <sstream>
00035 #include <cstring>
00036 #include <string>
00037 #include "protostructs.h"
00038
00039 #include "debug.h"
00040
00041 namespace Barry { namespace Mode {
00042
00043
00044
00045 class RawChannelSocketHandler: public SocketRoutingQueue::SocketDataHandler
00046 {
00047 RawChannel &m_raw_channel;
00048 public:
00049 RawChannelSocketHandler(RawChannel &raw_channel)
00050 : m_raw_channel(raw_channel)
00051 {}
00052 virtual void DataReceived(Data &data)
00053 {
00054 m_raw_channel.HandleReceivedData(data);
00055 }
00056 virtual void Error(Barry::Error &error)
00057 {
00058 SocketDataHandler::Error(error);
00059 m_raw_channel.HandleError(error);
00060 }
00061 virtual ~RawChannelSocketHandler()
00062 {}
00063 };
00064
00065
00066
00067 class RawChannelZeroSocketHandler: public SocketRoutingQueue::SocketDataHandler
00068 {
00069 RawChannel &m_raw_channel;
00070 public:
00071 RawChannelZeroSocketHandler(RawChannel &raw_channel)
00072 : m_raw_channel(raw_channel)
00073 {}
00074 virtual void DataReceived(Data &data)
00075 {
00076 m_raw_channel.HandleReceivedZeroPacket(data);
00077 }
00078 virtual void Error(Barry::Error &error)
00079 {
00080 SocketDataHandler::Error(error);
00081 m_raw_channel.HandleError(error);
00082 }
00083 virtual ~RawChannelZeroSocketHandler()
00084 {}
00085 };
00086
00087
00088
00089
00090 RawChannel::RawChannel(Controller &con, RawChannelDataCallback &callback)
00091 : Mode(con, Controller::RawChannel)
00092 , m_mutex_valid(false)
00093 , m_cv_valid(false)
00094 , m_semaphore(NULL)
00095 , m_callback(&callback)
00096 , m_send_buffer(NULL)
00097 , m_zero_registered(false)
00098 , m_pending_error(NULL)
00099 {
00100 CheckQueueAvailable();
00101 InitBuffer();
00102 InitSemaphore();
00103 }
00104
00105 RawChannel::RawChannel(Controller &con)
00106 : Mode(con, Controller::RawChannel)
00107 , m_mutex_valid(false)
00108 , m_cv_valid(false)
00109 , m_semaphore(NULL)
00110 , m_callback(NULL)
00111 , m_send_buffer(NULL)
00112 , m_zero_registered(false)
00113 , m_pending_error(NULL)
00114 {
00115 CheckQueueAvailable();
00116 InitBuffer();
00117 InitSemaphore();
00118 }
00119
00120 void RawChannel::CheckQueueAvailable()
00121 {
00122 if( !m_con.m_queue ) {
00123 throw Barry::Error("RawChannel: No routing queue set in controller");
00124 }
00125 }
00126
00127 void RawChannel::InitBuffer()
00128 {
00129 m_send_buffer = new unsigned char[SB_CHANNELPACKET_HEADER_SIZE + SB_CHANNELPACKET_MAX_DATA_SIZE];
00130 }
00131
00132 void RawChannel::InitSemaphore()
00133 {
00134
00135 if( pthread_mutex_init(&m_mutex, NULL) ) {
00136 throw Barry::Error("Failed to create mutex");
00137 }
00138 m_mutex_valid = true;
00139 if( pthread_cond_init(&m_cv, NULL) ) {
00140 throw Barry::Error("Failed to create condvar");
00141 }
00142 m_cv_valid = true;
00143 m_semaphore = new semaphore(m_mutex, m_cv);
00144 }
00145
00146 RawChannel::~RawChannel()
00147 {
00148 UnregisterZeroSocketInterest();
00149
00150 delete[] m_send_buffer;
00151
00152 if( m_mutex_valid ) {
00153 pthread_mutex_destroy(&m_mutex);
00154 }
00155 if( m_cv_valid ) {
00156 pthread_cond_destroy(&m_cv);
00157 }
00158 delete m_semaphore;
00159 delete m_pending_error;
00160 }
00161
00162 void RawChannel::OnOpen()
00163 {
00164
00165
00166 m_zero_registered = true;
00167 m_socket->HideSequencePacket(false);
00168 SocketRoutingQueue::SocketDataHandlerPtr zeroCallback;
00169 zeroCallback.reset(new RawChannelZeroSocketHandler(*this));
00170 m_con.m_queue->RegisterInterest(0, zeroCallback);
00171
00172
00173
00174 if( m_callback ) {
00175 SocketRoutingQueue::SocketDataHandlerPtr callback;
00176 callback.reset(new RawChannelSocketHandler(*this));
00177 m_socket->RegisterInterest(callback);
00178 }
00179 else {
00180 SocketRoutingQueue::SocketDataHandlerPtr nullCallback;
00181 m_socket->RegisterInterest(nullCallback);
00182 }
00183 }
00184
00185
00186 void RawChannel::HandleReceivedZeroPacket(Data &data)
00187 {
00188 Protocol::CheckSize(data, SB_PACKET_HEADER_SIZE);
00189 MAKE_PACKETPTR_BUF(packet, data.GetData());
00190
00191 if( packet->socket != 0 ) {
00192 UnregisterZeroSocketInterest();
00193 SetPendingError("RawChannel: Got packet not for socket-zero");
00194 m_semaphore->Signal();
00195 }
00196
00197 switch( btohs(packet->command) )
00198 {
00199 case SB_COMMAND_SEQUENCE_HANDSHAKE:
00200 m_semaphore->Signal();
00201 break;
00202 case SB_COMMAND_CLOSE_SOCKET:
00203 case SB_COMMAND_REMOTE_CLOSE_SOCKET:
00204
00205
00206 UnregisterZeroSocketInterest();
00207 if( m_callback ) {
00208 m_callback->ChannelClose();
00209 }
00210
00211 m_semaphore->Signal();
00212 break;
00213 default:
00214 UnregisterZeroSocketInterest();
00215 if( m_callback ) {
00216 m_callback->ChannelError("RawChannel: Got unexpected socket zero packet");
00217 }
00218 else {
00219 SetPendingError("RawChannel: Got unexpected socket zero packet");
00220 }
00221 m_semaphore->Signal();
00222 break;
00223 }
00224
00225 }
00226
00227 void RawChannel::HandleReceivedData(Data &data)
00228 {
00229
00230 ValidateDataPacket(data);
00231 MAKE_CHANNELPACKETPTR_BUF(packet, data.GetData());
00232
00233
00234 Data partial(packet->u.data, data.GetSize() - SB_CHANNELPACKET_HEADER_SIZE);
00235 if( m_callback ) {
00236 m_callback->DataReceived(partial);
00237 }
00238 else {
00239 SetPendingError("RawChannel: Received data to handle when in non-callback mode");
00240 }
00241 }
00242
00243 void RawChannel::HandleError(Barry::Error &error)
00244 {
00245 std::ostringstream errorOss;
00246 errorOss << "RawChannel: Socket error received, what: " << error.what();
00247
00248 if( m_callback ) {
00249 m_callback->ChannelError(errorOss.str().c_str());
00250 }
00251 else {
00252 SetPendingError(errorOss.str().c_str());
00253 }
00254 m_semaphore->Signal();
00255 }
00256
00257 void RawChannel::UnregisterZeroSocketInterest()
00258 {
00259 if( m_zero_registered ) {
00260 m_con.m_queue->UnregisterInterest(0);
00261 m_socket->HideSequencePacket(true);
00262 m_zero_registered = false;
00263 }
00264 }
00265
00266 void RawChannel::SetPendingError(const char *msg)
00267 {
00268 if( !m_pending_error ) {
00269 m_pending_error = new std::string(msg);
00270 }
00271 }
00272
00273
00274
00275
00276 void RawChannel::Send(Data &data, int timeout)
00277 {
00278 size_t packetSize = SB_CHANNELPACKET_HEADER_SIZE + data.GetSize();
00279
00280 if( packetSize > SB_CHANNELPACKET_HEADER_SIZE + SB_CHANNELPACKET_MAX_DATA_SIZE ) {
00281 throw Barry::Error("RawChannel: send data size larger than MaximumPacketSize");
00282 }
00283
00284
00285 MAKE_CHANNELPACKETPTR_BUF(packet, m_send_buffer);
00286 packet->socket = htobs(m_socket->GetSocket());
00287 packet->size = htobs(packetSize);
00288 std::memcpy(packet->u.data, data.GetData(), data.GetSize());
00289
00290 Data toSend(m_send_buffer, packetSize);
00291 m_socket->Send(toSend, timeout);
00292 m_semaphore->WaitForSignal();
00293 if( m_pending_error ) {
00294 throw Barry::Error(*m_pending_error);
00295 }
00296 }
00297
00298 void RawChannel::Receive(Data &data,int timeout)
00299 {
00300 if( m_callback ) {
00301 throw std::logic_error("RawChannel: Receive called when channel was created with a callback");
00302 }
00303
00304 m_socket->Receive(m_receive_data, timeout);
00305
00306 ValidateDataPacket(m_receive_data);
00307 MAKE_CHANNELPACKETPTR_BUF(packet, m_receive_data.GetData());
00308
00309 size_t len = packet->size - SB_CHANNELPACKET_HEADER_SIZE;
00310 memcpy(data.GetBuffer(), packet->u.data, len);
00311 data.ReleaseBuffer(len);
00312
00313 }
00314
00315 void RawChannel::ValidateDataPacket(Data &data)
00316 {
00317 Protocol::CheckSize(data, SB_CHANNELPACKET_HEADER_SIZE);
00318 MAKE_CHANNELPACKETPTR_BUF(packet, data.GetData());
00319 if( packet->size != data.GetSize() ) {
00320
00321 throw std::logic_error("RawChannel: Data size doesn't match packet size");
00322 }
00323 }
00324
00325 size_t RawChannel::MaximumSendSize()
00326 {
00327 return SB_CHANNELPACKET_MAX_DATA_SIZE;
00328 }
00329
00330 }}