7 #define CRYPTOPP_TRACE_NETWORK 0
9 NAMESPACE_BEGIN(CryptoPP)
11 #ifdef HIGHRES_TIMER_AVAILABLE
13 lword LimitedBandwidth::ComputeCurrentTransceiveLimit()
15 if (!m_maxBytesPerSecond)
18 double curTime = GetCurTimeAndCleanUp();
20 for (OpQueue::size_type i=0; i!=m_ops.size(); ++i)
21 total += m_ops[i].second;
22 return SaturatingSubtract(m_maxBytesPerSecond, total);
25 double LimitedBandwidth::TimeToNextTransceive()
27 if (!m_maxBytesPerSecond)
30 if (!m_nextTransceiveTime)
31 ComputeNextTransceiveTime();
33 return SaturatingSubtract(m_nextTransceiveTime, m_timer.ElapsedTimeAsDouble());
36 void LimitedBandwidth::NoteTransceive(lword size)
38 if (m_maxBytesPerSecond)
40 double curTime = GetCurTimeAndCleanUp();
41 m_ops.push_back(std::make_pair(curTime, size));
42 m_nextTransceiveTime = 0;
46 void LimitedBandwidth::ComputeNextTransceiveTime()
48 double curTime = GetCurTimeAndCleanUp();
50 for (
unsigned int i=0; i!=m_ops.size(); ++i)
51 total += m_ops[i].second;
52 m_nextTransceiveTime =
53 (total < m_maxBytesPerSecond) ? curTime : m_ops.front().first + 1000;
56 double LimitedBandwidth::GetCurTimeAndCleanUp()
58 if (!m_maxBytesPerSecond)
61 double curTime = m_timer.ElapsedTimeAsDouble();
62 while (m_ops.size() && (m_ops.front().first + 1000 < curTime))
69 double nextTransceiveTime = TimeToNextTransceive();
70 if (nextTransceiveTime)
71 container.ScheduleEvent(nextTransceiveTime,
CallStack(
"LimitedBandwidth::GetWaitObjects()", &callStack));
77 lword& byteCount,
bool blockingOutput,
78 unsigned long maxTime,
bool checkDelimiter, byte delimiter)
80 m_blockedBySpeedLimit =
false;
82 if (!GetMaxBytesPerSecond())
84 size_t ret = DoPump(byteCount, blockingOutput, maxTime, checkDelimiter, delimiter);
85 m_doPumpBlocked = (ret != 0);
90 unsigned long timeToGo = maxTime;
91 Timer timer(Timer::MILLISECONDS, forever);
92 lword maxSize = byteCount;
99 lword curMaxSize = UnsignedMin(ComputeCurrentTransceiveLimit(), maxSize - byteCount);
101 if (curMaxSize || m_doPumpBlocked)
103 if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
104 size_t ret = DoPump(curMaxSize, blockingOutput, timeToGo, checkDelimiter, delimiter);
105 m_doPumpBlocked = (ret != 0);
108 NoteTransceive(curMaxSize);
109 byteCount += curMaxSize;
115 if (maxSize != ULONG_MAX && byteCount >= maxSize)
120 timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
125 double waitTime = TimeToNextTransceive();
126 if (!forever && waitTime > timeToGo)
128 m_blockedBySpeedLimit =
true;
133 LimitedBandwidth::GetWaitObjects(container,
CallStack(
"NonblockingSource::GeneralPump2() - speed limit", 0));
134 container.Wait((
unsigned long)waitTime);
140 size_t NonblockingSource::PumpMessages2(
unsigned int &messageCount,
bool blocking)
142 if (messageCount == 0)
149 byteCount = LWORD_MAX;
150 RETURN_IF_NONZERO(Pump2(byteCount, blocking));
151 }
while(byteCount == LWORD_MAX);
153 if (!m_messageEndSent && SourceExhausted())
156 m_messageEndSent =
true;
164 m_blockedBySpeedLimit =
false;
166 size_t curBufSize = GetCurrentBufferSize();
167 if (curBufSize <= targetSize && (targetSize || !EofPending()))
170 if (!GetMaxBytesPerSecond())
171 return DoFlush(maxTime, targetSize);
174 unsigned long timeToGo = maxTime;
175 Timer timer(Timer::MILLISECONDS, forever);
176 lword totalFlushed = 0;
182 size_t flushSize = UnsignedMin(curBufSize - targetSize, ComputeCurrentTransceiveLimit());
183 if (flushSize || EofPending())
185 if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
186 size_t ret = (size_t)DoFlush(timeToGo, curBufSize - flushSize);
195 if (curBufSize <= targetSize && (targetSize || !EofPending()))
200 timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
205 double waitTime = TimeToNextTransceive();
206 if (!forever && waitTime > timeToGo)
208 m_blockedBySpeedLimit =
true;
213 LimitedBandwidth::GetWaitObjects(container,
CallStack(
"NonblockingSink::TimedFlush() - speed limit", 0));
214 container.Wait((
unsigned long)waitTime);
220 bool NonblockingSink::IsolatedFlush(
bool hardFlush,
bool blocking)
223 return hardFlush && (!!GetCurrentBufferSize() || EofPending());
230 , m_waitingForResult(false), m_outputBlocked(false)
231 , m_dataBegin(0), m_dataEnd(0)
244 if (BlockedBySpeedLimit())
245 LimitedBandwidth::GetWaitObjects(container,
CallStack(
"NetworkSource::GetWaitObjects() - speed limit", &callStack));
246 else if (!m_outputBlocked)
248 if (m_dataBegin == m_dataEnd)
249 AccessReceiver().
GetWaitObjects(container,
CallStack(
"NetworkSource::GetWaitObjects() - no data", &callStack));
251 container.SetNoWait(
CallStack(
"NetworkSource::GetWaitObjects() - have data", &callStack));
257 size_t NetworkSource::DoPump(lword &byteCount,
bool blockingOutput,
unsigned long maxTime,
bool checkDelimiter, byte delimiter)
261 lword maxSize = byteCount;
264 Timer timer(Timer::MILLISECONDS, forever);
272 if (m_dataBegin == m_dataEnd)
274 if (receiver.EofReceived())
277 if (m_waitingForResult)
279 if (receiver.MustWaitForResult() &&
280 !receiver.
Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
281 CallStack(
"NetworkSource::DoPump() - wait receive result", 0)))
284 unsigned int recvResult = receiver.GetReceiveResult();
285 #if CRYPTOPP_TRACE_NETWORK
286 OutputDebugString((IntToString((
unsigned int)
this) +
": Received " + IntToString(recvResult) +
" bytes\n").c_str());
288 m_dataEnd += recvResult;
289 m_waitingForResult =
false;
291 if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size())
296 m_dataEnd = m_dataBegin = 0;
298 if (receiver.MustWaitToReceive())
300 if (!receiver.
Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
301 CallStack(
"NetworkSource::DoPump() - wait receive", 0)))
304 receiver.
Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
305 m_waitingForResult =
true;
310 m_waitingForResult =
true;
313 #if CRYPTOPP_TRACE_NETWORK
314 OutputDebugString((IntToString((
unsigned int)
this) +
": Receiving " + IntToString(m_buf.size()-m_dataEnd) +
" bytes\n").c_str());
316 while (receiver.
Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd))
318 unsigned int recvResult = receiver.GetReceiveResult();
319 #if CRYPTOPP_TRACE_NETWORK
320 OutputDebugString((IntToString((
unsigned int)
this) +
": Received " + IntToString(recvResult) +
" bytes\n").c_str());
322 m_dataEnd += recvResult;
323 if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2)
325 m_waitingForResult =
false;
334 m_putSize = UnsignedMin(m_dataEnd - m_dataBegin, maxSize - byteCount);
337 m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
340 size_t result = t->
PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
343 if (t->
Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
344 CallStack(
"NetworkSource::DoPump() - wait attachment", 0)))
348 m_outputBlocked =
true;
352 m_outputBlocked =
false;
354 byteCount += m_putSize;
355 m_dataBegin += m_putSize;
356 if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
358 if (maxSize != ULONG_MAX && byteCount == maxSize)
363 if (maxTime > 0 && timer.ElapsedTime() > maxTime)
373 NetworkSink::NetworkSink(
unsigned int maxBufferSize,
unsigned int autoFlushBound)
374 : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
375 , m_needSendResult(false), m_wasBlocked(false), m_eofState(EOF_NONE)
376 , m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0)
377 , m_speedTimer(
Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
378 , m_currentSpeed(0), m_maxObservedSpeed(0)
384 if (m_speedTimer.ElapsedTime() > 1000)
386 m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime();
387 m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f);
388 m_byteCountSinceLastTimerReset = 0;
389 m_speedTimer.StartTimer();
392 return m_currentSpeed;
397 lword m = GetMaxBytesPerSecond();
398 return m ? STDMIN(m_maxObservedSpeed,
float(CRYPTOPP_VC6_INT64 m)) : m_maxObservedSpeed;
408 if (BlockedBySpeedLimit())
409 LimitedBandwidth::GetWaitObjects(container,
CallStack(
"NetworkSink::GetWaitObjects() - speed limit", &callStack));
410 else if (m_wasBlocked)
411 AccessSender().
GetWaitObjects(container,
CallStack(
"NetworkSink::GetWaitObjects() - was blocked", &callStack));
412 else if (!m_buffer.IsEmpty())
413 AccessSender().
GetWaitObjects(container,
CallStack(
"NetworkSink::GetWaitObjects() - buffer not empty", &callStack));
414 else if (EofPending())
415 AccessSender().
GetWaitObjects(container,
CallStack(
"NetworkSink::GetWaitObjects() - EOF pending", &callStack));
420 if (m_eofState == EOF_DONE)
422 if (length || messageEnd)
428 if (m_eofState > EOF_NONE)
434 assert(length >= m_skipBytes);
435 inString += m_skipBytes;
436 length -= m_skipBytes;
439 m_buffer.
Put(inString, length);
441 if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
444 size_t targetSize = messageEnd ? 0 : m_maxBufferSize;
448 if (m_buffer.CurrentSize() > targetSize)
452 m_skipBytes += length;
453 size_t blockedBytes = UnsignedMin(length, m_buffer.CurrentSize() - targetSize);
454 return STDMAX<size_t>(blockedBytes, 1);
457 m_wasBlocked =
false;
463 m_eofState = EOF_PENDING_SEND;
467 if (m_eofState != EOF_DONE)
474 lword NetworkSink::DoFlush(
unsigned long maxTime,
size_t targetSize)
479 Timer timer(Timer::MILLISECONDS, forever);
480 unsigned int totalFlushSize = 0;
484 if (m_buffer.CurrentSize() <= targetSize)
487 if (m_needSendResult)
489 if (sender.MustWaitForResult() &&
490 !sender.
Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
491 CallStack(
"NetworkSink::DoFlush() - wait send result", 0)))
494 unsigned int sendResult = sender.GetSendResult();
495 #if CRYPTOPP_TRACE_NETWORK
496 OutputDebugString((IntToString((
unsigned int)
this) +
": Sent " + IntToString(sendResult) +
" bytes\n").c_str());
498 m_buffer.
Skip(sendResult);
499 totalFlushSize += sendResult;
500 m_needSendResult =
false;
506 unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
507 if (sender.MustWaitToSend() && !sender.
Wait(timeOut,
CallStack(
"NetworkSink::DoFlush() - wait send", 0)))
510 size_t contiguousSize = 0;
511 const byte *block = m_buffer.Spy(contiguousSize);
513 #if CRYPTOPP_TRACE_NETWORK
514 OutputDebugString((IntToString((
unsigned int)
this) +
": Sending " + IntToString(contiguousSize) +
" bytes\n").c_str());
516 sender.Send(block, contiguousSize);
517 m_needSendResult =
true;
519 if (maxTime > 0 && timeOut == 0)
523 m_byteCountSinceLastTimerReset += totalFlushSize;
526 if (m_buffer.IsEmpty() && !m_needSendResult)
528 if (m_eofState == EOF_PENDING_SEND)
531 m_eofState = sender.MustWaitForEof() ? EOF_PENDING_DELIVERY : EOF_DONE;
534 while (m_eofState == EOF_PENDING_DELIVERY)
536 unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
537 if (!sender.
Wait(timeOut,
CallStack(
"NetworkSink::DoFlush() - wait EOF", 0)))
540 if (sender.EofSent())
541 m_eofState = EOF_DONE;
545 return totalFlushSize;
548 #endif // #ifdef HIGHRES_TIMER_AVAILABLE
base class for all exceptions thrown by Crypto++
container of wait objects
float GetMaxObservedSpeed() const
get the maximum observed speed of this sink in bytes per second
lword TimedFlush(unsigned long maxTime, size_t targetSize=0)
flush to device for no more than maxTime milliseconds
some error not belong to any of the above categories
void GetWaitObjects(WaitObjectContainer &container, CallStack const &callStack)
put wait objects into container
size_t Put2(const byte *inString, size_t length, int messageEnd, bool blocking)
input multiple bytes for blocking or non-blocking processing
unsigned int GetMaxWaitObjectCount() const
maximum number of wait objects that this object can return
virtual unsigned int GetMaxWaitObjectCount() const =0
maximum number of wait objects that this object can return
virtual void GetWaitObjects(WaitObjectContainer &container, CallStack const &callStack)=0
put wait objects into container
size_t GeneralPump2(lword &byteCount, bool blockingOutput=true, unsigned long maxTime=INFINITE_TIME, bool checkDelimiter=false, byte delimiter='\n')
pump up to maxSize bytes using at most maxTime milliseconds
BufferedTransformation * AttachedTransformation()
returns the object immediately attached to this object or NULL for no attachment
float ComputeCurrentSpeed()
compute the current speed of this sink in bytes per second
const unsigned long INFINITE_TIME
used to represent infinite time
unsigned int GetMaxWaitObjectCount() const
bool Wait(unsigned long milliseconds, CallStack const &callStack)
wait on this object
bool AnyRetrievable() const
returns whether any bytes are currently ready for retrieval
void GetWaitObjects(WaitObjectContainer &container, CallStack const &callStack)
put wait objects into container
unsigned int GetMaxWaitObjectCount() const
maximum number of wait objects that this object can return
virtual bool Receive(byte *buf, size_t bufLen)=0
receive data from network source, returns whether result is immediately available ...
a Source class that can pump from a device for a specified amount of time.