Libosmium  2.12.2
Fast and flexible C++ library for working with OpenStreetMap data
writer.hpp
Go to the documentation of this file.
1 #ifndef OSMIUM_IO_WRITER_HPP
2 #define OSMIUM_IO_WRITER_HPP
3 
4 /*
5 
6 This file is part of Osmium (http://osmcode.org/libosmium).
7 
8 Copyright 2013-2017 Jochen Topf <jochen@topf.org> and others (see README).
9 
10 Boost Software License - Version 1.0 - August 17th, 2003
11 
12 Permission is hereby granted, free of charge, to any person or organization
13 obtaining a copy of the software and accompanying documentation covered by
14 this license (the "Software") to use, reproduce, display, distribute,
15 execute, and transmit the Software, and to prepare derivative works of the
16 Software, and to permit third-parties to whom the Software is furnished to
17 do so, all subject to the following:
18 
19 The copyright notices in the Software and this entire statement, including
20 the above license grant, this restriction and the following disclaimer,
21 must be included in all copies of the Software, in whole or in part, and
22 all derivative works of the Software, unless such copies or derivative
23 works are solely in the form of machine-executable object code generated by
24 a source language processor.
25 
26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32 DEALINGS IN THE SOFTWARE.
33 
34 */
35 
36 #include <cassert>
37 #include <cstddef>
38 #include <exception>
39 #include <functional>
40 #include <future>
41 #include <initializer_list>
42 #include <memory>
43 #include <string>
44 #include <utility>
45 
47 #include <osmium/io/detail/output_format.hpp>
48 #include <osmium/io/detail/queue_util.hpp>
49 #include <osmium/io/detail/read_write.hpp>
50 #include <osmium/io/detail/write_thread.hpp>
51 #include <osmium/io/error.hpp>
52 #include <osmium/io/file.hpp>
53 #include <osmium/io/header.hpp>
55 #include <osmium/memory/buffer.hpp>
56 #include <osmium/thread/util.hpp>
57 #include <osmium/util/config.hpp>
58 #include <osmium/version.hpp>
59 
60 namespace osmium {
61 
62  namespace memory {
63  class Item;
64  } //namespace memory
65 
66  namespace io {
67 
68  namespace detail {
69 
70  inline size_t get_output_queue_size() noexcept {
71  size_t n = osmium::config::get_max_queue_size("OUTPUT", 20);
72  return n > 2 ? n : 2;
73  }
74 
75  } // namespace detail
76 
100  class Writer {
101 
102  static constexpr size_t default_buffer_size = 10 * 1024 * 1024;
103 
105 
106  detail::future_string_queue_type m_output_queue;
107 
108  std::unique_ptr<osmium::io::detail::OutputFormat> m_output;
109 
111 
113 
114  std::future<bool> m_write_future;
115 
117 
118  enum class status {
119  okay = 0, // normal writing
120  error = 1, // some error occurred while writing
121  closed = 2 // close() called successfully
122  } m_status;
123 
124  // This function will run in a separate thread.
125  static void write_thread(detail::future_string_queue_type& output_queue,
126  std::unique_ptr<osmium::io::Compressor>&& compressor,
127  std::promise<bool>&& write_promise) {
128  detail::WriteThread write_thread{output_queue,
129  std::move(compressor),
130  std::move(write_promise)};
131  write_thread();
132  }
133 
135  if (buffer && buffer.committed() > 0) {
136  m_output->write_buffer(std::move(buffer));
137  }
138  }
139 
140  void do_flush() {
141  osmium::thread::check_for_exception(m_write_future);
142  if (m_buffer && m_buffer.committed() > 0) {
143  osmium::memory::Buffer buffer{m_buffer_size,
145  using std::swap;
146  swap(m_buffer, buffer);
147 
148  m_output->write_buffer(std::move(buffer));
149  }
150  }
151 
152  template <typename TFunction, typename... TArgs>
153  void ensure_cleanup(TFunction func, TArgs&&... args) {
154  if (m_status != status::okay) {
155  throw io_error("Can not write to writer when in status 'closed' or 'error'");
156  }
157 
158  try {
159  func(std::forward<TArgs>(args)...);
160  } catch (...) {
161  m_status = status::error;
162  detail::add_to_queue(m_output_queue, std::current_exception());
163  detail::add_end_of_data_to_queue(m_output_queue);
164  throw;
165  }
166  }
167 
168  struct options_type {
170  overwrite allow_overwrite = overwrite::no;
171  fsync sync = fsync::no;
172  };
173 
174  static void set_option(options_type& options, const osmium::io::Header& header) {
175  options.header = header;
176  }
177 
178  static void set_option(options_type& options, overwrite value) {
179  options.allow_overwrite = value;
180  }
181 
182  static void set_option(options_type& options, fsync value) {
183  options.sync = value;
184  }
185 
186  void do_close() {
187  if (m_status == status::okay) {
188  ensure_cleanup([&](){
189  do_write(std::move(m_buffer));
190  m_output->write_end();
191  m_status = status::closed;
192  detail::add_end_of_data_to_queue(m_output_queue);
193  });
194  }
195  }
196 
197  public:
198 
222  template <typename... TArgs>
223  explicit Writer(const osmium::io::File& file, TArgs&&... args) :
224  m_file(file.check()),
225  m_output_queue(detail::get_output_queue_size(), "raw_output"),
226  m_output(osmium::io::detail::OutputFormatFactory::instance().create_output(m_file, m_output_queue)),
227  m_buffer(),
228  m_buffer_size(default_buffer_size),
229  m_write_future(),
230  m_thread(),
231  m_status(status::okay) {
232  assert(!m_file.buffer()); // XXX can't handle pseudo-files
233 
234  options_type options;
235  (void)std::initializer_list<int>{
236  (set_option(options, args), 0)...
237  };
238 
239  if (options.header.get("generator") == "") {
240  options.header.set("generator", "libosmium/" LIBOSMIUM_VERSION_STRING);
241  }
242 
243  std::unique_ptr<osmium::io::Compressor> compressor =
244  CompressionFactory::instance().create_compressor(file.compression(),
245  osmium::io::detail::open_for_writing(m_file.filename(), options.allow_overwrite),
246  options.sync);
247 
248  std::promise<bool> write_promise;
249  m_write_future = write_promise.get_future();
250  m_thread = osmium::thread::thread_handler{write_thread, std::ref(m_output_queue), std::move(compressor), std::move(write_promise)};
251 
252  ensure_cleanup([&](){
253  m_output->write_header(options.header);
254  });
255  }
256 
257  template <typename... TArgs>
258  explicit Writer(const std::string& filename, TArgs&&... args) :
259  Writer(osmium::io::File{filename}, std::forward<TArgs>(args)...) {
260  }
261 
262  template <typename... TArgs>
263  explicit Writer(const char* filename, TArgs&&... args) :
264  Writer(osmium::io::File{filename}, std::forward<TArgs>(args)...) {
265  }
266 
267  Writer(const Writer&) = delete;
268  Writer& operator=(const Writer&) = delete;
269 
270  Writer(Writer&&) = default;
271  Writer& operator=(Writer&&) = default;
272 
273  ~Writer() noexcept {
274  try {
275  do_close();
276  } catch (...) {
277  // Ignore any exceptions because destructor must not throw.
278  }
279  }
280 
284  size_t buffer_size() const noexcept {
285  return m_buffer_size;
286  }
287 
292  void set_buffer_size(size_t size) noexcept {
293  m_buffer_size = size;
294  }
295 
303  void flush() {
304  ensure_cleanup([&](){
305  do_flush();
306  });
307  }
308 
318  ensure_cleanup([&](){
319  do_flush();
320  do_write(std::move(buffer));
321  });
322  }
323 
331  void operator()(const osmium::memory::Item& item) {
332  ensure_cleanup([&](){
333  if (!m_buffer) {
334  m_buffer = osmium::memory::Buffer{m_buffer_size,
336  }
337  try {
338  m_buffer.push_back(item);
339  } catch (const osmium::buffer_is_full&) {
340  do_flush();
341  m_buffer.push_back(item);
342  }
343  });
344  }
345 
355  void close() {
356  do_close();
357 
358  if (m_write_future.valid()) {
359  m_write_future.get();
360  }
361  }
362 
363  }; // class Writer
364 
365  } // namespace io
366 
367 } // namespace osmium
368 
369 #endif // OSMIUM_IO_WRITER_HPP
fsync sync
Definition: writer.hpp:171
~Writer() noexcept
Definition: writer.hpp:273
Definition: writer.hpp:168
void do_write(osmium::memory::Buffer &&buffer)
Definition: writer.hpp:134
void do_flush()
Definition: writer.hpp:140
void swap(Buffer &lhs, Buffer &rhs)
Definition: buffer.hpp:765
void do_close()
Definition: writer.hpp:186
osmium::thread::thread_handler m_thread
Definition: writer.hpp:116
osmium::io::Header header
Definition: writer.hpp:169
osmium::memory::Buffer m_buffer
Definition: writer.hpp:110
std::unique_ptr< osmium::io::detail::OutputFormat > m_output
Definition: writer.hpp:108
void set_buffer_size(size_t size) noexcept
Definition: writer.hpp:292
Definition: file.hpp:72
Definition: item.hpp:105
static void write_thread(detail::future_string_queue_type &output_queue, std::unique_ptr< osmium::io::Compressor > &&compressor, std::promise< bool > &&write_promise)
Definition: writer.hpp:125
void operator()(osmium::memory::Buffer &&buffer)
Definition: writer.hpp:317
Namespace for everything in the Osmium library.
Definition: assembler.hpp:63
status
Definition: writer.hpp:118
#define LIBOSMIUM_VERSION_STRING
Definition: version.hpp:40
Definition: attr.hpp:333
size_t buffer_size() const noexcept
Definition: writer.hpp:284
fsync
Definition: writer_options.hpp:51
static void set_option(options_type &options, fsync value)
Definition: writer.hpp:182
void push_back(const osmium::memory::Item &item)
Definition: buffer.hpp:521
Definition: error.hpp:44
size_t m_buffer_size
Definition: writer.hpp:112
void close()
Definition: writer.hpp:355
void flush()
Definition: writer.hpp:303
osmium::io::File m_file
Definition: writer.hpp:104
size_t committed() const noexcept
Definition: buffer.hpp:263
size_t get_max_queue_size(const char *queue_name, size_t default_value) noexcept
Definition: config.hpp:69
Definition: buffer.hpp:97
Definition: buffer.hpp:58
const char * buffer() const noexcept
Definition: file.hpp:156
static void set_option(options_type &options, overwrite value)
Definition: writer.hpp:178
Definition: writer.hpp:100
Writer(const char *filename, TArgs &&... args)
Definition: writer.hpp:263
std::future< bool > m_write_future
Definition: writer.hpp:114
Writer(const osmium::io::File &file, TArgs &&... args)
Definition: writer.hpp:223
void check_for_exception(std::future< T > &future)
Definition: util.hpp:55
Definition: header.hpp:68
Writer(const std::string &filename, TArgs &&... args)
Definition: writer.hpp:258
overwrite allow_overwrite
Definition: writer.hpp:170
static void set_option(options_type &options, const osmium::io::Header &header)
Definition: writer.hpp:174
detail::future_string_queue_type m_output_queue
Definition: writer.hpp:106
file_compression compression() const noexcept
Definition: file.hpp:289
File & filename(const std::string &filename)
Definition: file.hpp:307
void operator()(const osmium::memory::Item &item)
Definition: writer.hpp:331
void ensure_cleanup(TFunction func, TArgs &&... args)
Definition: writer.hpp:153
Definition: util.hpp:85
void set(const std::string &key, const std::string &value)
Definition: options.hpp:87
overwrite
Definition: writer_options.hpp:43