bes  Updated for version 3.20.8
Chunk.cc
1 // -*- mode: c++; c-basic-offset:4 -*-
2 
3 // This file is part of the BES
4 
5 // Copyright (c) 2016 OPeNDAP, Inc.
6 // Author: Nathan Potter <ndp@opendap.org>
7 //
8 // This library is free software; you can redistribute it and/or
9 // modify it under the terms of the GNU Lesser General Public
10 // License as published by the Free Software Foundation; either
11 // version 2.1 of the License, or (at your option) any later version.
12 //
13 // This library is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 // Lesser General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public
19 // License along with this library; if not, write to the Free Software
20 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 //
22 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23 
24 #include "config.h"
25 
26 #include <sstream>
27 #include <cstring>
28 #include <cassert>
29 
30 #include <zlib.h>
31 
32 #include <BESDebug.h>
33 #include <BESLog.h>
34 #include <TheBESKeys.h>
35 #include <BESInternalError.h>
36 #include <BESSyntaxUserError.h>
37 #include <BESForbiddenError.h>
38 #include <BESContextManager.h>
39 #include <url_impl.h>
40 
41 #include "xml2json/include/xml2json.hpp"
42 
43 #include "Chunk.h"
44 #include "CurlUtils.h"
45 #include "CurlHandlePool.h"
46 #include "EffectiveUrlCache.h"
47 #include "DmrppRequestHandler.h"
48 #include "DmrppNames.h"
49 
50 using namespace std;
52 
53 #define prolog std::string("Chunk::").append(__func__).append("() - ")
54 
55 namespace dmrpp {
56 
57 
70 size_t chunk_header_callback(char *buffer, size_t /*size*/, size_t nitems, void *data) {
71  // received header is nitems * size long in 'buffer' NOT ZERO TERMINATED
72  // 'userdata' is set with CURLOPT_HEADERDATA
73  // 'size' is always 1
74 
75  // -2 strips of the CRLF at the end of the header
76  string header(buffer, buffer + nitems - 2);
77 
78  // Look for the content type header and store its value in the Chunk
79  string::size_type pos;
80  if ((pos = header.find("Content-Type")) != string::npos) {
81  // Header format 'Content-Type: <value>'
82  auto c_ptr = reinterpret_cast<Chunk *>(data);
83  c_ptr->set_response_content_type(header.substr(header.find_last_of(' ') + 1));
84  }
85 
86  return nitems;
87 }
88 
102 size_t chunk_write_data(void *buffer, size_t size, size_t nmemb, void *data) {
103  size_t nbytes = size * nmemb;
104  auto chunk = reinterpret_cast<Chunk *>(data);
105 
106  BESDEBUG(MODULE, prolog << "BEGIN chunk->get_response_content_type():" << chunk->get_response_content_type()
107  << " chunk->get_data_url(): " << chunk->get_data_url() << endl);
108 
109  // When Content-Type is 'application/xml,' that's an error. jhrg 6/9/20
110  if (chunk->get_response_content_type().find("application/xml") != string::npos) {
111  // At this point we no longer care about great performance - error msg readability
112  // is more important. jhrg 12/30/19
113  string xml_message = reinterpret_cast<const char *>(buffer);
114  xml_message.erase(xml_message.find_last_not_of("\t\n\v\f\r 0") + 1);
115  // Decode the AWS XML error message. In some cases this will fail because pub keys,
116  // which maybe in this error text, may have < or > chars in them. the XML parser
117  // will be sad if that happens. jhrg 12/30/19
118  try {
119  string json_message = xml2json(xml_message.c_str());
120  stringstream aws_msg;
121  aws_msg << prolog << "AWS S3 Access Error:" << json_message;
122  BESDEBUG(MODULE, aws_msg.str() << endl);
123  VERBOSE(aws_msg.str() << endl);
124 
126  d.Parse(json_message.c_str());
127  rapidjson::Value &message = d["Error"]["Message"];
128  rapidjson::Value &code = d["Error"]["Code"];
129 
130  // We might want to get the "Code" from the "Error" if these text messages
131  // are not good enough. But the "Code" is not really suitable for normal humans...
132  // jhrg 12/31/19
133  stringstream msg;
134  msg << prolog << "Error accessing object store data. (Tried: " << chunk->get_data_url() << ")" <<
135  " Message " << message.GetString();
136  BESDEBUG(MODULE, msg.str() << endl);
137  if (string(code.GetString()) == "AccessDenied") {
138  throw BESForbiddenError(msg.str(), __FILE__, __LINE__);
139  }
140  else {
141  throw BESInternalError(msg.str(), __FILE__, __LINE__);
142  }
143  }
144  catch (BESError) {
145  // re-throw any BESError - added for the future if we make BESError a child
146  // of std::exception as it should be. jhrg 12/30/19
147  throw;
148  }
149  catch (std::exception &e) {
150  stringstream msg;
151  msg << prolog << "Error accessing object store data. (Tried: " << chunk->get_data_url() << ")" <<
152  " Message " << e.what();
153  BESDEBUG(MODULE, msg.str() << endl);
154  throw BESSyntaxUserError(msg.str(), __FILE__, __LINE__);
155  }
156  }
157 
158  // rbuf: |******++++++++++----------------------|
159  // ^ ^ bytes_read + nbytes
160  // | bytes_read
161 
162  unsigned long long bytes_read = chunk->get_bytes_read();
163 
164  // If this fails, the code will write beyond the buffer.
165  if (bytes_read + nbytes > chunk->get_rbuf_size()) {
166  stringstream msg;
167  msg << prolog << "ERROR! The number of bytes_read: " << bytes_read << " plus the number of bytes to read: "
168  << nbytes << " is larger than the target buffer size: " << chunk->get_rbuf_size();
169  BESDEBUG(MODULE, msg.str() << endl);
170  DmrppRequestHandler::curl_handle_pool->release_all_handles();
171  throw BESInternalError(msg.str(), __FILE__, __LINE__);
172  }
173 
174  memcpy(chunk->get_rbuf() + bytes_read, buffer, nbytes);
175  chunk->set_bytes_read(bytes_read + nbytes);
176 
177  BESDEBUG(MODULE, prolog << "END" << endl);
178 
179  return nbytes;
180 }
181 
192 void inflate(char *dest, unsigned int dest_len, char *src, unsigned int src_len) {
193  /* Sanity check */
194  assert(src_len > 0);
195  assert(src);
196  assert(dest_len > 0);
197  assert(dest);
198 
199  /* Input; uncompress */
200  z_stream z_strm; /* zlib parameters */
201 
202  /* Set the uncompression parameters */
203  memset(&z_strm, 0, sizeof(z_strm));
204  z_strm.next_in = (Bytef *) src;
205  z_strm.avail_in = src_len;
206  z_strm.next_out = (Bytef *) dest;
207  z_strm.avail_out = dest_len;
208 
209  /* Initialize the uncompression routines */
210  if (Z_OK != inflateInit(&z_strm))
211  throw BESError("Failed to initialize inflate software.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
212 
213  /* Loop to uncompress the buffer */
214  int status = Z_OK;
215  do {
216  /* Uncompress some data */
217  status = inflate(&z_strm, Z_SYNC_FLUSH);
218 
219  /* Check if we are done uncompressing data */
220  if (Z_STREAM_END == status) break; /*done*/
221 
222  /* Check for error */
223  if (Z_OK != status) {
224  (void) inflateEnd(&z_strm);
225  throw BESError("Failed to inflate data chunk.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
226  }
227  else {
228  /* If we're not done and just ran out of buffer space, it's an error.
229  * The HDF5 library code would extend the buffer as needed, but for
230  * this handler, we always know the size of the uncompressed chunk.
231  */
232  if (0 == z_strm.avail_out) {
233  throw BESError("Data buffer is not big enough for uncompressed data.", BES_INTERNAL_ERROR, __FILE__,
234  __LINE__);
235 #if 0
236  /* Here's how to extend the buffer if needed. This might be useful some day... */
237  void *new_outbuf; /* Pointer to new output buffer */
238 
239  /* Allocate a buffer twice as big */
240  nalloc *= 2;
241  if (NULL == (new_outbuf = H5MM_realloc(outbuf, nalloc))) {
242  (void) inflateEnd(&z_strm);
243  HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, 0, "memory allocation failed for inflate decompression")
244  } /* end if */
245  outbuf = new_outbuf;
246 
247  /* Update pointers to buffer for next set of uncompressed data */
248  z_strm.next_out = (unsigned char*) outbuf + z_strm.total_out;
249  z_strm.avail_out = (uInt) (nalloc - z_strm.total_out);
250 #endif
251  } /* end if */
252  } /* end else */
253  } while (status == Z_OK);
254 
255  /* Finish uncompressing the stream */
256  (void) inflateEnd(&z_strm);
257 
258 
259 }
260 
261 // #define this to enable the duff's device loop unrolling code.
262 // jhrg 1/19/17
263 #define DUFFS_DEVICE
264 
286 void unshuffle(char *dest, const char *src, unsigned int src_size, unsigned int width) {
287  unsigned int elems = src_size / width; // int division rounds down
288 
289  /* Don't do anything for 1-byte elements, or "fractional" elements */
290  if (!(width > 1 && elems > 1)) {
291  memcpy(dest, const_cast<char *>(src), src_size);
292  }
293  else {
294  /* Get the pointer to the source buffer (Alias for source buffer) */
295  char *_src = const_cast<char *>(src);
296  char *_dest = 0; // Alias for destination buffer
297 
298  /* Input; unshuffle */
299  for (unsigned int i = 0; i < width; i++) {
300  _dest = dest + i;
301 #ifndef DUFFS_DEVICE
302  size_t j = elems;
303  while(j > 0) {
304  *_dest = *_src++;
305  _dest += width;
306 
307  j--;
308  }
309 #else /* DUFFS_DEVICE */
310  {
311  size_t duffs_index = (elems + 7) / 8; /* Counting index for Duff's device */
312  switch (elems % 8) {
313  default:
314  assert(0 && "This Should never be executed!");
315  break;
316  case 0:
317  do {
318  // This macro saves repeating the same line 8 times
319 #define DUFF_GUTS *_dest = *_src++; _dest += width;
320 
321  DUFF_GUTS
322  case 7:
323  DUFF_GUTS
324  case 6:
325  DUFF_GUTS
326  case 5:
327  DUFF_GUTS
328  case 4:
329  DUFF_GUTS
330  case 3:
331  DUFF_GUTS
332  case 2:
333  DUFF_GUTS
334  case 1:
335  DUFF_GUTS
336  } while (--duffs_index > 0);
337  } /* end switch */
338  } /* end block */
339 #endif /* DUFFS_DEVICE */
340 
341  } /* end for i = 0 to width*/
342 
343  /* Compute the leftover bytes if there are any */
344  size_t leftover = src_size % width;
345 
346  /* Add leftover to the end of data */
347  if (leftover > 0) {
348  /* Adjust back to end of shuffled bytes */
349  _dest -= (width - 1); /*lint !e794 _dest is initialized */
350  memcpy((void *) _dest, (void *) _src, leftover);
351  }
352  } /* end if width and elems both > 1 */
353 }
354 
355 
356 void Chunk::parse_chunk_position_in_array_string(const string &pia, vector<unsigned int> &cpia_vect){
357  if (pia.empty()) return;
358 
359  if (!cpia_vect.empty()) cpia_vect.clear();
360 
361  // Assume input is [x,y,...,z] where x, ..., are integers; modest syntax checking
362  // [1] is a minimal 'position in array' string.
363  if (pia.find('[') == string::npos || pia.find(']') == string::npos || pia.length() < 3)
364  throw BESInternalError("while parsing a DMR++, chunk position string malformed", __FILE__, __LINE__);
365 
366  if (pia.find_first_not_of("[]1234567890,") != string::npos)
367  throw BESInternalError("while parsing a DMR++, chunk position string illegal character(s)", __FILE__, __LINE__);
368 
369  // strip off []; iss holds x,y,...,z
370  istringstream iss(pia.substr(1, pia.length() - 2));
371 
372  char c;
373  unsigned int i;
374  while (!iss.eof()) {
375  iss >> i; // read an integer
376  cpia_vect.push_back(i);
377  iss >> c; // read a separator (,)
378  }
379 }
380 
381 
395 void Chunk::set_position_in_array(const string &pia) {
396 #if 0
397  if (pia.empty()) return;
398 
399  if (d_chunk_position_in_array.size()) d_chunk_position_in_array.clear();
400 
401  // Assume input is [x,y,...,z] where x, ..., are integers; modest syntax checking
402  // [1] is a minimal 'position in array' string.
403  if (pia.find('[') == string::npos || pia.find(']') == string::npos || pia.length() < 3)
404  throw BESInternalError("while parsing a DMR++, chunk position string malformed", __FILE__, __LINE__);
405 
406  if (pia.find_first_not_of("[]1234567890,") != string::npos)
407  throw BESInternalError("while parsing a DMR++, chunk position string illegal character(s)", __FILE__, __LINE__);
408 
409  // strip off []; iss holds x,y,...,z
410  istringstream iss(pia.substr(1, pia.length() - 2));
411 
412  char c;
413  unsigned int i;
414  while (!iss.eof()) {
415  iss >> i; // read an integer
416  d_chunk_position_in_array.push_back(i);
417  iss >> c; // read a separator (,)
418  }
419 #endif
420  parse_chunk_position_in_array_string(pia,d_chunk_position_in_array);
421 }
422 
431 void Chunk::set_position_in_array(const std::vector<unsigned int> &pia) {
432  if (pia.empty()) return;
433 
434  if (!d_chunk_position_in_array.empty()) d_chunk_position_in_array.clear();
435 
436  d_chunk_position_in_array = pia;
437 }
438 
446 string Chunk::get_curl_range_arg_string() {
447  return curl::get_range_arg_string(d_offset, d_size);
448 }
449 
461 void Chunk::add_tracking_query_param() {
476  string aws_s3_url_https("https://s3.amazonaws.com/");
477  string aws_s3_url_http("http://s3.amazonaws.com/");
478 
479  // Is it an AWS S3 access? (y.find(x) returns 0 when y starts with x)
480  if (d_data_url.find(aws_s3_url_https) == 0 || d_data_url.find(aws_s3_url_http) == 0) {
481  // Yup, headed to S3.
482  bool found = false;
483  string cloudydap_context_value = BESContextManager::TheManager()->get_context(S3_TRACKING_CONTEXT, found);
484  if (found) {
485  d_query_marker.append("?").append(S3_TRACKING_CONTEXT).append("=").append(cloudydap_context_value);
486  }
487  }
488 }
489 
490 #if 0
502 void *inflate_chunk(void *arg_list)
503 {
504  inflate_chunk_args *args = reinterpret_cast<inflate_chunk_args*>(arg_list);
505 
506  try {
507  args->chunk->inflate_chunk(args->deflate, args->shuffle, args->chunk_size, args->elem_width);
508  }
509  catch (BESError &error) {
510  delete args;
511  pthread_exit(new BESError(error));
512  }
513 
514  delete args;
515  pthread_exit(NULL);
516 }
517 #endif
518 
530 void Chunk::inflate_chunk(bool deflate, bool shuffle, unsigned int chunk_size, unsigned int elem_width) {
531  // This code is pretty naive - there are apparently a number of
532  // different ways HDF5 can compress data, and it does also use a scheme
533  // where several algorithms can be applied in sequence. For now, get
534  // simple zlib deflate working.jhrg 1/15/17
535  // Added support for shuffle. Assuming unshuffle always is applied _after_
536  // inflating the data (reversing the shuffle --> deflate process). It is
537  // possible that data could just be deflated or shuffled (because we
538  // have test data are use only shuffle). jhrg 1/20/17
539  // The file that implements the deflate filter is H5Zdeflate.c in the hdf5 source.
540  // The file that implements the shuffle filter is H5Zshuffle.c.
541 
542  if (d_is_inflated)
543  return;
544 
545  chunk_size *= elem_width;
546 
547  if (deflate) {
548  char *dest = new char[chunk_size];
549  try {
550  inflate(dest, chunk_size, get_rbuf(), get_rbuf_size());
551  // This replaces (and deletes) the original read_buffer with dest.
552 #if USE_SUPER_CHUNKS
553  set_read_buffer(dest, chunk_size, chunk_size, true);
554 #else
555  set_rbuf(dest, chunk_size);
556 #endif
557  }
558  catch (...) {
559  delete[] dest;
560  throw;
561  }
562  }
563 
564  if (shuffle) {
565  // The internal buffer is chunk's full size at this point.
566  char *dest = new char[get_rbuf_size()];
567  try {
568  unshuffle(dest, get_rbuf(), get_rbuf_size(), elem_width);
569 #if USE_SUPER_CHUNKS
570  set_read_buffer(dest,get_rbuf_size(),get_rbuf_size(), true);
571 #else
572  set_rbuf(dest, get_rbuf_size());
573 #endif
574  }
575  catch (...) {
576  delete[] dest;
577  throw;
578  }
579  }
580 
581  d_is_inflated = true;
582 
583 #if 0 // This was handy during development for debugging. Keep it for awhile (year or two) before we drop it ndp - 01/18/17
584  if(BESDebug::IsSet(MODULE)) {
585  unsigned long long chunk_buf_size = get_rbuf_size();
586  dods_float32 *vals = (dods_float32 *) get_rbuf();
587  ostream *os = BESDebug::GetStrm();
588  (*os) << std::fixed << std::setfill('_') << std::setw(10) << std::setprecision(0);
589  (*os) << "DmrppArray::"<< __func__ <<"() - Chunk[" << i << "]: " << endl;
590  for(unsigned long long k=0; k< chunk_buf_size/prototype()->width(); k++) {
591  (*os) << vals[k] << ", " << ((k==0)|((k+1)%10)?"":"\n");
592  }
593  }
594 #endif
595 }
596 
606 void Chunk::read_chunk() {
607  if (d_is_read) {
608  BESDEBUG(MODULE, prolog << "Already been read! Returning." << endl);
609  return;
610  }
611 
612  set_rbuf_to_size();
613 
614  dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(this);
615  if (!handle)
616  throw BESInternalError(prolog + "No more libcurl handles.", __FILE__, __LINE__);
617 
618  try {
619  handle->read_data(); // throws if error
620  DmrppRequestHandler::curl_handle_pool->release_handle(handle);
621  }
622  catch(...) {
623  DmrppRequestHandler::curl_handle_pool->release_handle(handle);
624  throw;
625  }
626 
627  // If the expected byte count was not read, it's an error.
628  if (get_size() != get_bytes_read()) {
629  ostringstream oss;
630  oss << "Wrong number of bytes read for chunk; read: " << get_bytes_read() << ", expected: " << get_size();
631  throw BESInternalError(oss.str(), __FILE__, __LINE__);
632  }
633 
634  d_is_read = true;
635 }
636 
646 void Chunk::dump(ostream &oss) const {
647  oss << "Chunk";
648  oss << "[ptr='" << (void *) this << "']";
649  oss << "[data_url='" << d_data_url << "']";
650  oss << "[offset=" << d_offset << "]";
651  oss << "[size=" << d_size << "]";
652  oss << "[chunk_position_in_array=(";
653  for (unsigned long i = 0; i < d_chunk_position_in_array.size(); i++) {
654  if (i) oss << ",";
655  oss << d_chunk_position_in_array[i];
656  }
657  oss << ")]";
658  oss << "[is_read=" << d_is_read << "]";
659  oss << "[is_inflated=" << d_is_inflated << "]";
660 }
661 
662 string Chunk::to_string() const {
663  std::ostringstream oss;
664  dump(oss);
665  return oss.str();
666 }
667 
668 
669 std::string Chunk::get_data_url() const {
670 
671  string data_url = EffectiveUrlCache::TheCache()->get_effective_url(d_data_url);
672  BESDEBUG(MODULE, prolog << "Using data_url: " << data_url << endl);
673 
674  // A conditional call to void Chunk::add_tracking_query_param()
675  // here for the NASA cost model work THG's doing. jhrg 8/7/18
676  if (!d_query_marker.empty()) {
677  return data_url + d_query_marker;
678  }
679 
680  return data_url;
681 }
682 
683 } // namespace dmrpp
684 
virtual std::string get_context(const std::string &name, bool &found)
retrieve the value of the specified context from the BES
static std::ostream * GetStrm()
return the debug stream
Definition: BESDebug.h:179
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
Definition: BESDebug.h:160
Abstract exception class for the BES with basic string message.
Definition: BESError.h:58
error thrown if the BES is not allowed to access the resource requested
exception thrown if internal error encountered
error thrown if there is a user syntax error in the request or any other user error
Bundle a libcurl easy handle with other information.
void read_data()
This is the read_data() method for all transfers.
GenericValue< UTF8<> > Value
GenericValue with UTF8 encoding.
Definition: document.h:2189
GenericDocument< UTF8<> > Document
GenericDocument with UTF8 encoding.
Definition: document.h:2585