Fawkes API  Fawkes Development Version
mjpeg_reply.cpp
1 
2 /***************************************************************************
3  * mjpeg_reply.cpp - Web request MJPEG stream reply
4  *
5  * Created: Wed Feb 05 17:55:41 2014
6  * Copyright 2006-2014 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "mjpeg_reply.h"
23 
24 #include <core/exceptions/system.h>
25 #include <core/threading/mutex.h>
26 #include <core/threading/wait_condition.h>
27 #include <fvutils/compression/jpeg_compressor.h>
28 #include <fvcams/shmem.h>
29 
30 #include <cerrno>
31 #include <unistd.h>
32 #include <cstring>
33 
34 using namespace firevision;
35 
36 namespace fawkes {
37 #if 0 /* just to make Emacs auto-indent happy */
38 }
39 #endif
40 
41 /** @class DynamicMJPEGStreamWebReply <webview/file_reply.h>
42  * Dynamic raw file transfer reply.
43  * This dynamic file transfer reply transmits the given file with a mime type
44  * determined with libmagic.
45  * @author Tim Niemueller
46  */
47 
48 /** Constructor.
49  * @param stream_producer stream producer to query for JPEG buffers
50  */
51 DynamicMJPEGStreamWebReply::DynamicMJPEGStreamWebReply(WebviewJpegStreamProducer *stream_producer)
52  : DynamicWebReply(WebReply::HTTP_OK)
53 {
54  next_buffer_mutex_ = new fawkes::Mutex();
55  next_buffer_waitcond_ = new fawkes::WaitCondition(next_buffer_mutex_);
56  next_frame_ = true;
57 
58  add_header("Content-type", "multipart/x-mixed-replace;boundary=MJPEG-next-frame");
59  stream_producer_ = stream_producer;
60  stream_producer_->add_subscriber(this);
61 }
62 
63 /** Destructor. */
65 {
66  stream_producer_->remove_subscriber(this);
67  delete next_buffer_mutex_;
68  delete next_buffer_waitcond_;
69 }
70 
71 size_t
73 {
74  return -1;
75 }
76 
77 void
79 {
80  next_buffer_mutex_->lock();
81  next_buffer_ = buffer;
82  next_buffer_waitcond_->wake_all();
83  next_buffer_mutex_->unlock();
84 }
85 
86 size_t
87 DynamicMJPEGStreamWebReply::next_chunk(size_t pos, char *buffer, size_t buf_max_size)
88 {
89  if (buf_max_size == 0) return 0;
90 
91  size_t written = 0;
92 
93  if (next_frame_) {
94  next_buffer_mutex_->lock();
95  while (! next_buffer_) {
96  next_buffer_waitcond_->wait();
97  }
98  buffer_ = next_buffer_;
99  next_buffer_.clear();
100  next_buffer_mutex_->unlock();
101 
102  char *header;
103  if (asprintf(&header,
104  "--MJPEG-next-frame\r\n"
105  "Content-type: image/jpeg\r\n"
106  "Content-length: %zu\r\n"
107  "\r\n", buffer_->size()) == -1) {
108  return -2;
109  }
110  size_t header_len = strlen(header);
111  memcpy(buffer, header, header_len);
112  buffer += header_len;
113  buf_max_size -= header_len;
114  written += header_len;
115 
116  buffer_bytes_written_ = 0;
117  next_frame_ = false;
118  }
119 
120  size_t remaining = buffer_->size() - buffer_bytes_written_;
121  if (remaining <= buf_max_size) {
122  memcpy(buffer, buffer_->data() + buffer_bytes_written_, remaining);
123  next_frame_ = true;
124  written += remaining;
125  } else {
126  memcpy(buffer, buffer_->data() + buffer_bytes_written_, buf_max_size);
127  buffer_bytes_written_ += buf_max_size;
128  written += buf_max_size;
129  }
130 
131  return written;
132 }
133 
134 } // end namespace fawkes
Wait until a given condition holds.
virtual ~DynamicMJPEGStreamWebReply()
Destructor.
Definition: mjpeg_reply.cpp:64
Fawkes library namespace.
void unlock()
Unlock the mutex.
Definition: mutex.cpp:135
void wake_all()
Wake up all waiting threads.
void add_header(std::string header, std::string content)
Add a HTTP header.
Definition: reply.cpp:97
virtual void handle_buffer(RefPtr< WebviewJpegStreamProducer::Buffer > buffer)
Notification if a new buffer is available.
Definition: mjpeg_reply.cpp:78
virtual size_t size()
Total size of the web reply.
Definition: mjpeg_reply.cpp:72
Dynamic web reply.
Definition: reply.h:123
void wait()
Wait for the condition forever.
Basic web reply.
Definition: reply.h:36
RefPtr<> is a reference-counting shared smartpointer.
Definition: refptr.h:49
void remove_subscriber(Subscriber *subscriber)
Remove a subscriber.
virtual size_t next_chunk(size_t pos, char *buffer, size_t buf_max_size)
Get data of next chunk.
Definition: mjpeg_reply.cpp:87
void lock()
Lock this mutex.
Definition: mutex.cpp:89
Mutex mutual exclusion lock.
Definition: mutex.h:32
void add_subscriber(Subscriber *subscriber)
Add a subscriber.