bes  Updated for version 3.20.8
DmrppArray.cc
1 // -*- mode: c++; c-basic-offset:4 -*-
2 
3 // This file is part of the BES
4 
5 // Copyright (c) 2016 OPeNDAP, Inc.
6 // Author: James Gallagher <jgallagher@opendap.org>
7 //
8 // This library is free software; you can redistribute it and/or
9 // modify it under the terms of the GNU Lesser General Public
10 // License as published by the Free Software Foundation; either
11 // version 2.1 of the License, or (at your option) any later version.
12 //
13 // This library is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 // Lesser General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public
19 // License along with this library; if not, write to the Free Software
20 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 //
22 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23 
24 #include "config.h"
25 
26 #include <string>
27 #include <sstream>
28 #include <vector>
29 #include <memory>
30 #include <queue>
31 #include <iterator>
32 #include <thread>
33 #include <future> // std::async, std::future
34 #include <chrono> // std::chrono::milliseconds
35 
36 #include <cstring>
37 #include <cassert>
38 #include <cerrno>
39 
40 #include <pthread.h>
41 #include <cmath>
42 
43 #include <unistd.h>
44 
45 #include <D4Enum.h>
46 #include <D4Attributes.h>
47 #include <D4Maps.h>
48 #include <D4Group.h>
49 
50 #include "BESInternalError.h"
51 #include "BESDebug.h"
52 #include "BESLog.h"
53 #include "BESStopWatch.h"
54 
55 #include "byteswap_compat.h"
56 #include "CurlHandlePool.h"
57 #include "Chunk.h"
58 #include "DmrppArray.h"
59 #include "DmrppRequestHandler.h"
60 #include "DmrppNames.h"
61 #include "Base64.h"
62 
63 // Used with BESDEBUG
64 #define dmrpp_3 "dmrpp:3"
65 #define dmrpp_4 "dmrpp:4"
66 
67 using namespace libdap;
68 using namespace std;
69 
70 #define MB (1024*1024)
71 #define prolog std::string("DmrppArray::").append(__func__).append("() - ")
72 #define WAIT_FOR_FUTURE_MS 1
73 
74 namespace dmrpp {
75 
76 // Forward Declarations
77 void *one_super_chunk_thread(void *arg_list);
78 void *one_super_chunk_unconstrained_thread(void *arg_list);
79 
80 // ThreadPool state variables.
81 std::mutex thread_pool_mtx; // mutex for critical section
82 atomic_uint thread_counter(0);
83 
84 
92 bool get_next_future(list<std::future<void *>> &futures, unsigned long timeout) {
93  bool joined = false;
94  bool done = false;
95  std::chrono::milliseconds timeout_ms (timeout);
96 
97  while(!done){
98  auto futr = futures.begin();
99  auto fend = futures.end();
100  while(!joined && futr != fend){
101  // FIXME What happens if wait_for() always returns future_status::timeout for a stuck thread?
102  if((*futr).wait_for(timeout_ms) != std::future_status::timeout){
103  (*futr).get();
104  joined = true;
105  BESDEBUG(dmrpp_3, prolog << "Called future::get() on a ready future." << endl);
106  }
107  else {
108  futr++;
109  BESDEBUG(dmrpp_3, prolog << "future::wait_for() timed out. (timeout: "<<
110  timeout << " ms) There are currently "<< futures.size() << " futures in process." << endl);
111  }
112  }
113  if (joined) {
114  futures.erase(futr);
115  thread_counter--;
116  BESDEBUG(dmrpp_3, prolog << "Erased future from futures list. There are currently " <<
117  futures.size() << " futures in process." << endl);
118  }
119  done = joined || futures.empty();
120  }
121  return joined;
122 }
123 
131 bool start_super_chunk_thread(list<std::future<void *>> &futures, one_super_chunk_args *args) {
132  bool retval = false;
133  std::unique_lock<std::mutex> lck (thread_pool_mtx);
134  if (thread_counter < DmrppRequestHandler::d_max_parallel_transfers) {
135  thread_counter++;
136  futures.push_back(std::async(std::launch::async, one_super_chunk_thread, (void *) args));
137  retval = true;
138  BESDEBUG(dmrpp_3, prolog << "Got std::future '"<< futures.size() <<
139  "' from std::async for " << args->super_chunk->to_string(false) << endl);
140  }
141  return retval;
142 }
143 
144 
145 void *one_super_chunk_thread(void *arg_list)
146 {
147  auto *args = reinterpret_cast<one_super_chunk_args *>(arg_list);
148 
149  try {
150  process_super_chunk(args->super_chunk, args->array);
151 
152  // SuperChunk::read_and_copy() (currently disabled)
153  // does exactly the same thing as process_super_chunk()
154  // in a class method.
155  // args->super_chunk->read_and_copy(args->array);
156  }
157  catch (BESError &error) {
158  delete args;
159  }
160  delete args;
161  return nullptr;
162 }
163 
171 bool start_super_chunk_unconstrained_thread(list<std::future<void *>> &futures, one_super_chunk_args *args) {
172  bool retval = false;
173  std::unique_lock<std::mutex> lck (thread_pool_mtx);
174  if(thread_counter < DmrppRequestHandler::d_max_parallel_transfers) {
175  thread_counter++;
176  futures.push_back(std::async(std::launch::async, one_super_chunk_unconstrained_thread, (void *)args));
177  retval = true;
178  BESDEBUG(dmrpp_3, prolog << "Got std::future '"<< futures.size() <<
179  "' from std::async for " << args->super_chunk->to_string(false) << endl);
180  }
181  return retval;
182 }
183 
184 void *one_super_chunk_unconstrained_thread(void *arg_list)
185 {
186  auto args = reinterpret_cast<one_super_chunk_args *>(arg_list);
187 
188  try {
189  process_super_chunk_unconstrained(args->super_chunk, args->array);
190 
191  // SuperChunk::read_and_copy_unconstrained() (currently disabled)
192  // does exactly the same thing as process_super_chunk_unconstrained()
193  // in a class method.
194  // args->super_chunk->read_and_copy_unconstrained(args->array);
195  }
196  catch (BESError &error) {
197  delete args;
198  }
199  delete args;
200  return nullptr;
201 }
202 
203 //#####################################################################################################################
204 //#####################################################################################################################
205 // DmrppArray begins here.
206 //
207 
208 
209 void DmrppArray::_duplicate(const DmrppArray &)
210 {
211 }
212 
213 DmrppArray::DmrppArray(const string &n, BaseType *v) :
214  Array(n, v, true /*is dap4*/), DmrppCommon()
215 {
216 }
217 
218 DmrppArray::DmrppArray(const string &n, const string &d, BaseType *v) :
219  Array(n, d, v, true), DmrppCommon()
220 {
221 }
222 
223 BaseType *
224 DmrppArray::ptr_duplicate()
225 {
226  return new DmrppArray(*this);
227 }
228 
229 DmrppArray::DmrppArray(const DmrppArray &rhs) :
230  Array(rhs), DmrppCommon(rhs)
231 {
232  _duplicate(rhs);
233 }
234 
235 DmrppArray &
236 DmrppArray::operator=(const DmrppArray &rhs)
237 {
238  if (this == &rhs) return *this;
239 
240  dynamic_cast<Array &>(*this) = rhs; // run Constructor=
241 
242  _duplicate(rhs);
243  DmrppCommon::m_duplicate_common(rhs);
244 
245  return *this;
246 }
247 
252 bool DmrppArray::is_projected()
253 {
254  for (Dim_iter p = dim_begin(), e = dim_end(); p != e; ++p)
255  if (dimension_size(p, true) != dimension_size(p, false)) return true;
256 
257  return false;
258 }
259 
278 static unsigned long long
279 get_index(const vector<unsigned int> &address_in_target, const vector<unsigned int> &target_shape)
280 {
281  assert(address_in_target.size() == target_shape.size()); // ranks must be equal
282 
283  auto shape_index = target_shape.rbegin();
284  auto index = address_in_target.rbegin(), index_end = address_in_target.rend();
285 
286  unsigned long long multiplier = *shape_index++;
287  unsigned long long offset = *index++;
288 
289  while (index != index_end) {
290  assert(*index < *shape_index); // index < shape for each dim
291 
292  offset += multiplier * *index++;
293  multiplier *= *shape_index++;
294  }
295 
296  return offset;
297 }
298 
305 unsigned long long DmrppArray::get_size(bool constrained)
306 {
307  // number of array elements in the constrained array
308  unsigned long long size = 1;
309  for (Dim_iter dim = dim_begin(), end = dim_end(); dim != end; dim++) {
310  size *= dimension_size(dim, constrained);
311  }
312  return size;
313 }
314 
321 vector<unsigned int> DmrppArray::get_shape(bool constrained)
322 {
323  Dim_iter dim = dim_begin(), edim = dim_end();
324  vector<unsigned int> shape;
325 
326  // For a 3d array, this method took 14ms without reserve(), 5ms with
327  // (when called many times).
328  shape.reserve(edim - dim);
329 
330  for (; dim != edim; dim++) {
331  shape.push_back(dimension_size(dim, constrained));
332  }
333 
334  return shape;
335 }
336 
342 DmrppArray::dimension DmrppArray::get_dimension(unsigned int i)
343 {
344  assert(i <= (dim_end() - dim_begin()));
345  return *(dim_begin() + i);
346 }
347 
350 
361 void DmrppArray::insert_constrained_contiguous(Dim_iter dim_iter, unsigned long *target_index,
362  vector<unsigned int> &subset_addr,
363  const vector<unsigned int> &array_shape, char /*Chunk*/*src_buf)
364 {
365  BESDEBUG("dmrpp", "DmrppArray::" << __func__ << "() - subsetAddress.size(): " << subset_addr.size() << endl);
366 
367  unsigned int bytes_per_elem = prototype()->width();
368 
369  char *dest_buf = get_buf();
370 
371  unsigned int start = this->dimension_start(dim_iter, true);
372  unsigned int stop = this->dimension_stop(dim_iter, true);
373  unsigned int stride = this->dimension_stride(dim_iter, true);
374 
375  dim_iter++;
376 
377  // The end case for the recursion is dimIter == dim_end(); stride == 1 is an optimization
378  // See the else clause for the general case.
379  if (dim_iter == dim_end() && stride == 1) {
380  // For the start and stop indexes of the subset, get the matching indexes in the whole array.
381  subset_addr.push_back(start);
382  unsigned long start_index = get_index(subset_addr, array_shape);
383  subset_addr.pop_back();
384 
385  subset_addr.push_back(stop);
386  unsigned long stop_index = get_index(subset_addr, array_shape);
387  subset_addr.pop_back();
388 
389  // Copy data block from start_index to stop_index
390  // TODO Replace this loop with a call to std::memcpy()
391  for (unsigned long source_index = start_index; source_index <= stop_index; source_index++) {
392  unsigned long target_byte = *target_index * bytes_per_elem;
393  unsigned long source_byte = source_index * bytes_per_elem;
394  // Copy a single value.
395  for (unsigned long i = 0; i < bytes_per_elem; i++) {
396  dest_buf[target_byte++] = src_buf[source_byte++];
397  }
398  (*target_index)++;
399  }
400  }
401  else {
402  for (unsigned int myDimIndex = start; myDimIndex <= stop; myDimIndex += stride) {
403 
404  // Is it the last dimension?
405  if (dim_iter != dim_end()) {
406  // Nope! Then we recurse to the last dimension to read stuff
407  subset_addr.push_back(myDimIndex);
408  insert_constrained_contiguous(dim_iter, target_index, subset_addr, array_shape, src_buf);
409  subset_addr.pop_back();
410  }
411  else {
412  // We are at the last (inner most) dimension, so it's time to copy values.
413  subset_addr.push_back(myDimIndex);
414  unsigned int sourceIndex = get_index(subset_addr, array_shape);
415  subset_addr.pop_back();
416 
417  // Copy a single value.
418  unsigned long target_byte = *target_index * bytes_per_elem;
419  unsigned long source_byte = sourceIndex * bytes_per_elem;
420 
421  for (unsigned int i = 0; i < bytes_per_elem; i++) {
422  dest_buf[target_byte++] = src_buf[source_byte++];
423  }
424  (*target_index)++;
425  }
426  }
427  }
428 }
429 
440 void *one_child_chunk_thread(void *arg_list)
441 {
442  one_child_chunk_args *args = reinterpret_cast<one_child_chunk_args *>(arg_list);
443 
444  try {
445  args->child_chunk->read_chunk();
446 
447  assert(args->master_chunk->get_rbuf());
448  assert(args->child_chunk->get_rbuf());
449  assert(args->child_chunk->get_bytes_read() == args->child_chunk->get_size());
450 
451  // master offset \/
452  // master chunk: mmmmmmmmmmmmmmmm
453  // child chunks: 1111222233334444 (there are four child chunks)
454  // child offsets: ^ ^ ^ ^
455  // For this example, child_1_offset - master_offset == 0 (that's always true)
456  // child_2_offset - master_offset == 4; child_2_offset - master_offset == 8
457  // and child_3_offset - master_offset == 12.
458  // Those are the starting locations with in the data buffer of the master chunk
459  // where that child chunk should be written.
460  // Note: all of the offset values start at the begining of the file.
461 
462  unsigned int offset_within_master_chunk = args->child_chunk->get_offset() - args->master_chunk->get_offset();
463 
464  memcpy(args->master_chunk->get_rbuf() + offset_within_master_chunk, args->child_chunk->get_rbuf(),
465  args->child_chunk->get_bytes_read());
466  }
467  catch (BESError &error) {
468  write(args->fds[1], &args->tid, sizeof(args->tid));
469  delete args;
470  pthread_exit(new string(error.get_verbose_message()));
471  }
472 
473  // tid is a char and thus us written atomically. Writing this tells the parent
474  // thread the child is complete and it should call pthread_join(tid, ...)
475  write(args->fds[1], &args->tid, sizeof(args->tid));
476  delete args;
477  pthread_exit(NULL);
478 }
479 
498 void DmrppArray::read_contiguous()
499 {
500  BESStopWatch sw;
501  if (BESDebug::IsSet(TIMING_LOG_KEY)) sw.start(prolog + "Timer name: "+name(), "");
502 
503  // These first four lines reproduce DmrppCommon::read_atomic(). The call
504  // to Chunk::inflate_chunk() handles 'contiguous' data that are compressed.
505  // And since we need the chunk, I copied the read_atomic code here.
506 
507  auto chunk_refs = get_chunks();
508 
509  if (chunk_refs.size() != 1)
510  throw BESInternalError(string("Expected only a single chunk for variable ") + name(), __FILE__, __LINE__);
511 
512  // This is the original chunk for this 'contiguous' variable.
513  auto master_chunk = chunk_refs[0];
514 
515  unsigned long long master_chunk_size = master_chunk->get_size();
516 
517  // If we want to read the chunk in parallel. Only read in parallel above some threshold. jhrg 9/21/19
518  // Only use parallel read if the chunk is over 2MB, otherwise it is easier to just read it as is kln 9/23/19
519  if (!DmrppRequestHandler::d_use_parallel_transfers || master_chunk_size <= DmrppRequestHandler::d_min_size) {
520  // Else read the master_chunk as is. This is the non-parallel I/O case
521  master_chunk->read_chunk();
522  }
523  else {
524  // Allocated memory for the 'master chunk' so the threads can transfer data
525  // from the child chunks to it.
526  master_chunk->set_rbuf_to_size();
527 
528  // The number of child chunks are determined based on the size of the data.
529  // If the size of the master chunk is 3MB then 3 chunks will be made. We will round down
530  // when necessary and handle the remainder later on (3.3MB = 3 chunks, 4.2MB = 4 chunks, etc.) kln 9/23/19
531  unsigned int num_chunks = floor(master_chunk_size / MB);
532  if (num_chunks >= DmrppRequestHandler::d_max_parallel_transfers)
533  num_chunks = DmrppRequestHandler::d_max_parallel_transfers;
534 
535  // This pipe is used by the child threads to indicate completion
536  int fds[2];
537  int status = pipe(fds);
538  if (status < 0)
539  throw BESInternalError(string("Could not open a pipe for thread communication: ").append(strerror(errno)),
540  __FILE__, __LINE__);
541 
542  // Use the original chunk's size and offset to evenly split it into smaller chunks
543  unsigned long long chunk_size = master_chunk_size / num_chunks;
544  unsigned long long chunk_offset = master_chunk->get_offset();
545  std::string chunk_byteorder = master_chunk->get_byte_order();
546 
547  // If the size of the master chunk is not evenly divisible by num_chunks, capture
548  // the remainder here and increase the size of the last chunk by this number of bytes.
549  unsigned int chunk_remainder = master_chunk->get_size() % num_chunks;
550 
551  string chunk_url = master_chunk->get_data_url();
552 
553  // Setup a queue to break up the original master_chunk and keep track of the pieces
554  queue<shared_ptr<Chunk>> chunks_to_read;
555 
556  for (unsigned int i = 0; i < num_chunks - 1; i++) {
557  chunks_to_read.push(shared_ptr<Chunk>(new Chunk(chunk_url, chunk_byteorder, chunk_size, (chunk_size * i) + chunk_offset)));
558  }
559  // See above for details about chunk_remainder. jhrg 9/21/19
560  chunks_to_read.push(shared_ptr<Chunk>(new Chunk(chunk_url, chunk_byteorder, chunk_size + chunk_remainder,
561  (chunk_size * (num_chunks - 1)) + chunk_offset)));
562 
563  // Start the max number of processing pipelines
564  pthread_t threads[DmrppRequestHandler::d_max_parallel_transfers];
565  memset(&threads[0], 0, sizeof(pthread_t) * DmrppRequestHandler::d_max_parallel_transfers);
566 
567  try {
568  unsigned int num_threads = 0;
569 
570  // start initial set of threads
571  for (unsigned int i = 0;
572  i < (unsigned int) DmrppRequestHandler::d_max_parallel_transfers && !chunks_to_read.empty(); ++i) {
573  shared_ptr<Chunk> current_chunk = chunks_to_read.front();
574  chunks_to_read.pop();
575 
576  // thread number is 'i'
577  one_child_chunk_args *args = new one_child_chunk_args(fds, i, current_chunk, master_chunk);
578  status = pthread_create(&threads[i], NULL, dmrpp::one_child_chunk_thread, (void *) args);
579 
580  if (status == 0) {
581  ++num_threads;
582  BESDEBUG(dmrpp_3, "started thread: " << i << endl);
583  }
584  else {
585  ostringstream oss("Could not start process_one_chunk_unconstrained thread for master_chunk ",
586  std::ios::ate);
587  oss << i << ": " << strerror(status);
588  BESDEBUG(dmrpp_3, oss.str());
589  throw BESInternalError(oss.str(), __FILE__, __LINE__);
590  }
591  }
592 
593  // Now join the child threads, creating replacement threads if needed
594  while (num_threads > 0) {
595  unsigned char tid; // bytes can be written atomically
596  // Block here until a child thread writes to the pipe, then read the byte
597  int bytes = ::read(fds[0], &tid, sizeof(tid));
598  if (bytes != sizeof(tid))
599  throw BESInternalError(string("Could not read the thread id: ").append(strerror(errno)), __FILE__,
600  __LINE__);
601 
602  if (tid >= DmrppRequestHandler::d_max_parallel_transfers) {
603  ostringstream oss("Invalid thread id read after thread exit: ", std::ios::ate);
604  oss << tid;
605  throw BESInternalError(oss.str(), __FILE__, __LINE__);
606  }
607 
608  string *error;
609  status = pthread_join(threads[tid], (void **) &error);
610  --num_threads;
611  BESDEBUG(dmrpp_3, "joined thread: " << (unsigned int) tid << ", there are: " << num_threads << endl);
612 
613  if (status != 0) {
614  ostringstream oss("Could not join process_one_chunk_unconstrained thread for master_chunk ",
615  std::ios::ate);
616  oss << tid << ": " << strerror(status);
617  throw BESInternalError(oss.str(), __FILE__, __LINE__);
618  }
619  else if (error != 0) {
620  BESInternalError e(*error, __FILE__, __LINE__);
621  delete error;
622  throw e;
623  }
624  else if (chunks_to_read.size() > 0) {
625  auto current_chunk = chunks_to_read.front();
626  chunks_to_read.pop();
627 
628  // thread number is 'tid,' the number of the thread that just completed
629  one_child_chunk_args *args = new one_child_chunk_args(fds, tid, current_chunk, master_chunk);
630  int status = pthread_create(&threads[tid], NULL, dmrpp::one_child_chunk_thread, (void *) args);
631 
632  if (status != 0) {
633  ostringstream oss;
634  oss << "Could not start process_one_chunk_unconstrained thread for master_chunk " << tid << ": "
635  << strerror(status);
636  throw BESInternalError(oss.str(), __FILE__, __LINE__);
637  }
638  ++num_threads;
639  BESDEBUG(dmrpp_3, "started thread: " << (unsigned int) tid << ", there are: " << num_threads << endl);
640  }
641  }
642 
643  // Once done with the threads, close the communication pipe.
644  close(fds[0]);
645  close(fds[1]);
646  }
647  catch (...) {
648  // cancel all the threads, otherwise we'll have threads out there using up resources
649  // defined in DmrppCommon.cc
650  join_threads(threads, DmrppRequestHandler::d_max_parallel_transfers);
651  // close the pipe used to communicate with the child threads
652  close(fds[0]);
653  close(fds[1]);
654  // re-throw the exception
655  throw;
656  }
657  }
658 
659  // Now decompress the master chunk
660  master_chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(),
661  var()->width());
662 
663  // 'master_chunk' now holds the data. Transfer it to the Array.
664  if (!is_projected()) { // if there is no projection constraint
665  val2buf(master_chunk->get_rbuf()); // yes, it's not type-safe
666  }
667  else { // apply the constraint
668  vector<unsigned int> array_shape = get_shape(false);
669 
670  // Reserve space in this array for the constrained size of the data request
671  reserve_value_capacity(get_size(true));
672  unsigned long target_index = 0;
673  vector<unsigned int> subset;
674 
675  insert_constrained_contiguous(dim_begin(), &target_index, subset, array_shape, master_chunk->get_rbuf());
676  }
677 
678  set_read_p(true);
679 }
680 
683 
697 static unsigned long multiplier(const vector<unsigned int> &shape, unsigned int k)
698 {
699  assert(shape.size() > 1);
700  assert(shape.size() > k + 1);
701 
702  vector<unsigned int>::const_iterator i = shape.begin(), e = shape.end();
703  advance(i, k + 1);
704  unsigned long multiplier = *i++;
705  while (i != e) {
706  multiplier *= *i++;
707  }
708 
709  return multiplier;
710 }
711 
731 void DmrppArray::insert_chunk_unconstrained(shared_ptr<Chunk> chunk, unsigned int dim, unsigned long long array_offset,
732  const vector<unsigned int> &array_shape,
733  unsigned long long chunk_offset, const vector<unsigned int> &chunk_shape,
734  const vector<unsigned int> &chunk_origin)
735 {
736  // Now we figure out the correct last element. It's possible that a
737  // chunk 'extends beyond' the Array bounds. Here 'end_element' is the
738  // last element of the destination array
739  dimension thisDim = this->get_dimension(dim);
740  unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
741  if ((unsigned) thisDim.stop < end_element) {
742  end_element = thisDim.stop;
743  }
744 
745  unsigned long long chunk_end = end_element - chunk_origin[dim];
746 
747  unsigned int last_dim = chunk_shape.size() - 1;
748  if (dim == last_dim) {
749  unsigned int elem_width = prototype()->width();
750 
751  array_offset += chunk_origin[dim];
752 
753  // Compute how much we are going to copy
754  unsigned long long chunk_bytes = (end_element - chunk_origin[dim] + 1) * elem_width;
755  char *source_buffer = chunk->get_rbuf();
756  char *target_buffer = get_buf();
757  memcpy(target_buffer + (array_offset * elem_width), source_buffer + (chunk_offset * elem_width), chunk_bytes);
758  }
759  else {
760  unsigned long mc = multiplier(chunk_shape, dim);
761  unsigned long ma = multiplier(array_shape, dim);
762 
763  // Not the last dimension, so we continue to proceed down the Recursion Branch.
764  for (unsigned int chunk_index = 0 /*chunk_start*/; chunk_index <= chunk_end; ++chunk_index) {
765  unsigned long long next_chunk_offset = chunk_offset + (mc * chunk_index);
766  unsigned long long next_array_offset = array_offset + (ma * (chunk_index + chunk_origin[dim]));
767 
768  // Re-entry here:
769  insert_chunk_unconstrained(chunk, dim + 1, next_array_offset, array_shape, next_chunk_offset, chunk_shape,
770  chunk_origin);
771  }
772  }
773 }
774 
780 void *one_chunk_unconstrained_thread(void *arg_list)
781 {
782  one_chunk_unconstrained_args *args = reinterpret_cast<one_chunk_unconstrained_args *>(arg_list);
783 
784  try {
785  process_one_chunk_unconstrained(args->chunk, args->array, args->array_shape, args->chunk_shape);
786  }
787  catch (BESError &error) {
788  stringstream msg;
789  msg << prolog << "ERROR. tid: " << +(args->tid) << " message: " << error.get_verbose_message() << endl;
790  ERROR_LOG(msg.str());
791  write(args->fds[1], &args->tid, sizeof(args->tid));
792  delete args;
793  pthread_exit(new string(msg.str()));
794  }
795  catch (std::exception &e){
796  stringstream msg;
797  msg << prolog << "ERROR. tid: " << +(args->tid) << " process_one_chunk_unconstrained() "
798  "failed. Message: " << e.what() << endl;
799  ERROR_LOG(msg.str());
800  write(args->fds[1], &args->tid, sizeof(args->tid));
801  delete args;
802  pthread_exit(new string(msg.str()));
803 
804  }
805  catch (...){
806  stringstream msg;
807  msg << prolog << "ERROR. tid: " << +(args->tid) << " process_one_chunk_unconstrained() "
808  "failed for an unknown reason." << endl;
809  ERROR_LOG(msg.str());
810  write(args->fds[1], &args->tid, sizeof(args->tid));
811  delete args;
812  pthread_exit(new string(msg.str()));
813  }
814 
815  // tid is a char and thus us written atomically. Writing this tells the parent
816  // thread the child is complete and it should call pthread_join(tid, ...)
817  write(args->fds[1], &args->tid, sizeof(args->tid));
818 
819  delete args;
820  pthread_exit(NULL);
821 }
822 
831 void process_super_chunk_unconstrained(const shared_ptr<SuperChunk>& super_chunk, DmrppArray *array)
832 {
833  BESDEBUG(dmrpp_3, prolog << "BEGIN" << endl );
834  super_chunk->read();
835 
836  // The size in element of each of the array's dimensions
837  const vector<unsigned int> array_shape = array->get_shape(true);
838  // The size, in elements, of each of the chunk's dimensions
839  const vector<unsigned int> chunk_shape = array->get_chunk_dimension_sizes();
840 
841 
842  for(auto &chunk :super_chunk->get_chunks()){
843  if (array->is_deflate_compression() || array->is_shuffle_compression())
844  chunk->inflate_chunk(array->is_deflate_compression(), array->is_shuffle_compression(),
845  array->get_chunk_size_in_elements(), array->var()->width());
846 
847  vector<unsigned int> target_element_address = chunk->get_position_in_array();
848  vector<unsigned int> chunk_source_address(array->dimensions(), 0);
849 
850  array->insert_chunk_unconstrained(chunk, 0, 0, array_shape, 0, chunk_shape, chunk->get_position_in_array());
851  }
852 }
853 
854 void process_one_chunk_unconstrained(shared_ptr<Chunk> chunk, DmrppArray *array, const vector<unsigned int> &array_shape,
855  const vector<unsigned int> &chunk_shape)
856 {
857  BESDEBUG(dmrpp_3, prolog << "BEGIN" << endl );
858  chunk->read_chunk();
859 
860  if (array->is_deflate_compression() || array->is_shuffle_compression())
861  chunk->inflate_chunk(array->is_deflate_compression(), array->is_shuffle_compression(),
863  array->var()->width());
864 
865  array->insert_chunk_unconstrained(chunk, 0, 0, array_shape, 0, chunk_shape, chunk->get_position_in_array());
866  BESDEBUG(dmrpp_3, prolog << "END" << endl );
867 }
868 
869 
876 void read_chunks_unconstrained_concurrent(DmrppArray *array, queue<shared_ptr<SuperChunk>> &super_chunks)
877 {
878  BESStopWatch sw;
879  if (BESDebug::IsSet(TIMING_LOG_KEY)) sw.start(prolog + "Timer name: "+prolog, "");
880 
881  // Parallel version based on read_chunks_unconstrained(). There is
882  // substantial duplication of the code in read_chunks_unconstrained(), but
883  // wait to remove that when we move to C++11 which has threads integrated.
884 
885  // We maintain a list of futures to track our parallel activities.
886  list<future<void *>> futures;
887  try {
888  bool done = false;
889  bool joined = true;
890  while (!done) {
891  // Returns true when it "get"s a future (joins a thread).
892  // We do this until the futures have been "got".
893  if(!futures.empty())
894  joined = get_next_future(futures, WAIT_FOR_FUTURE_MS);
895 
896  // If joined is true this means that the thread_count has been decremented (because future::get()
897  // has been called)
898 
899  // Next we check to see if there are still SuperChunks in the queue and we create new futures until
900  // all the SuperChunks have been processed.
901  if (joined){
902  bool thread_started = true;
903  while(thread_started && !super_chunks.empty()) {
904  auto super_chunk = super_chunks.front();
905  BESDEBUG(dmrpp_3, prolog << "Starting thread for " << super_chunk->to_string(false) << endl);
906 
907  auto *args = new one_super_chunk_args(super_chunk, array);
908  thread_started = start_super_chunk_unconstrained_thread(futures, args);
909 
910  if (thread_started) {
911  super_chunks.pop();
912  BESDEBUG(dmrpp_3, prolog << "STARTED thread for" << super_chunk->to_string(false) << endl);
913  } else {
914  // Thread did not start, ownership of the arguments was not passed to the thread.
915  delete args;
916  BESDEBUG(dmrpp_3, prolog << "Thread not started, Returned SuperChunk to queue. " <<
917  "thread_count: " << thread_counter << endl);
918  }
919  }
920  }
921  else if(!super_chunks.empty()){
922  // TODO I can't see how this should happen (that there are super chunks left and yet we failed to
923  // join prior to arriving here, so I laid a trap.
924  stringstream msg;
925  msg << prolog << "No threads joined, yet " << super_chunks.size() << " SuperChunks remain unread.";
926  throw BESInternalError(msg.str(), __FILE__, __LINE__);
927  }
928  else {
929  // No more SuperChunks and no joinable threads means we're done here.
930  done = true;
931  }
932  joined = false;
933  }
934  }
935  catch (...) {
936  // Complete all of the futures, otherwise we'll have threads out there using up resources
937  while(!futures.empty()){
938  futures.back().get();
939  futures.pop_back();
940  }
941  // re-throw the exception
942  throw;
943  }
944 }
945 
946 
958 void DmrppArray::read_chunks_unconstrained()
959 {
960  BESStopWatch sw;
961  if (BESDebug::IsSet(TIMING_LOG_KEY)) sw.start(prolog + "Timer name: "+name(), "");
962 
963  auto chunk_refs = get_chunks();
964  if (chunk_refs.size() < 2)
965  throw BESInternalError(string("Expected chunks for variable ") + name(), __FILE__, __LINE__);
966 
967  // Find all the required chunks to read. I used a queue to preserve the chunk order, which
968  // made using a debugger easier. However, order does not matter, AFAIK.
969  queue<shared_ptr<SuperChunk>> super_chunks;
970  auto current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk()) ;
971  super_chunks.push(current_super_chunk);
972 
973  // Make the SuperChunks using all the chunks.
974  for(const auto& chunk: get_chunks()){
975  bool added = current_super_chunk->add_chunk(chunk);
976  if(!added){
977  current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk());
978  super_chunks.push(current_super_chunk);
979  if(!current_super_chunk->add_chunk(chunk)){
980  stringstream msg ;
981  msg << prolog << "Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
982  throw BESInternalError(msg.str(), __FILE__, __LINE__);
983  }
984  }
985  }
986  reserve_value_capacity(get_size());
987  // The size in element of each of the array's dimensions
988  const vector<unsigned int> array_shape = get_shape(true);
989  // The size, in elements, of each of the chunk's dimensions
990  const vector<unsigned int> chunk_shape = get_chunk_dimension_sizes();
991 
992 
993  BESDEBUG(dmrpp_3, __func__ << endl);
994  BESDEBUG(dmrpp_3, "d_use_parallel_transfers: " << DmrppRequestHandler::d_use_parallel_transfers << endl);
995  BESDEBUG(dmrpp_3, "d_max_parallel_transfers: " << DmrppRequestHandler::d_max_parallel_transfers << endl);
996 
997  if (!DmrppRequestHandler::d_use_parallel_transfers) { // Serial transfers
998  while(!super_chunks.empty()) {
999  auto super_chunk = super_chunks.front();
1000  super_chunks.pop();
1001  process_super_chunk_unconstrained(super_chunk, this);
1002 
1003  // SuperChunk::read_and_copy_unconstrained() (currently disabled)
1004  // does exactly the same thing as process_super_chunk_unconstrained()
1005  // in a class method.
1006  // args->super_chunk->read_and_copy_unconstrained(args->array);
1007  }
1008  }
1009  else { // Parallel transfers
1010  read_chunks_unconstrained_concurrent(this,super_chunks);
1011  }
1012  set_read_p(true);
1013 }
1014 
1015 
1018 
1031 unsigned long long DmrppArray::get_chunk_start(const dimension &thisDim, unsigned int chunk_origin)
1032 {
1033  // What's the first element that we are going to access for this dimension of the chunk?
1034  unsigned long long first_element_offset = 0; // start with 0
1035  if ((unsigned) (thisDim.start) < chunk_origin) {
1036  // If the start is behind this chunk, then it's special.
1037  if (thisDim.stride != 1) {
1038  // And if the stride isn't 1, we have to figure our where to begin in this chunk.
1039  first_element_offset = (chunk_origin - thisDim.start) % thisDim.stride;
1040  // If it's zero great!
1041  if (first_element_offset != 0) {
1042  // otherwise we adjustment to get correct first element.
1043  first_element_offset = thisDim.stride - first_element_offset;
1044  }
1045  }
1046  }
1047  else {
1048  first_element_offset = thisDim.start - chunk_origin;
1049  }
1050 
1051  return first_element_offset;
1052 }
1053 
1075 shared_ptr<Chunk>
1076 DmrppArray::find_needed_chunks(unsigned int dim, vector<unsigned int> *target_element_address, shared_ptr<Chunk> chunk)
1077 {
1078  BESDEBUG(dmrpp_3, prolog << " BEGIN, dim: " << dim << endl);
1079 
1080  // The size, in elements, of each of the chunk's dimensions.
1081  const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
1082 
1083  // The chunk's origin point a.k.a. its "position in array".
1084  const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
1085 
1086  dimension thisDim = this->get_dimension(dim);
1087 
1088  // Do we even want this chunk?
1089  if ((unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) ||
1090  (unsigned) thisDim.stop < chunk_origin[dim]) {
1091  return nullptr; // No. No, we do not. Skip this chunk.
1092  }
1093 
1094  // What's the first element that we are going to access for this dimension of the chunk?
1095  unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
1096 
1097  // Is the next point to be sent in this chunk at all? If no, return.
1098  if (chunk_start > chunk_shape[dim]) {
1099  return nullptr;
1100  }
1101 
1102  // Now we figure out the correct last element, based on the subset expression
1103  unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1104  if ((unsigned) thisDim.stop < end_element) {
1105  end_element = thisDim.stop;
1106  }
1107 
1108  unsigned long long chunk_end = end_element - chunk_origin[dim];
1109 
1110  unsigned int last_dim = chunk_shape.size() - 1;
1111  if (dim == last_dim) {
1112  BESDEBUG(dmrpp_3, prolog << " END, This is the last_dim. chunk: " << chunk->to_string() << endl);
1113  return chunk;
1114  }
1115  else {
1116  // Not the last dimension, so we continue to proceed down the Recursion Branch.
1117  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1118  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1119 
1120  // Re-entry here:
1121  auto needed = find_needed_chunks(dim + 1, target_element_address, chunk);
1122  if (needed){
1123  BESDEBUG(dmrpp_3, prolog << " END, Found chunk: " << needed->to_string() << endl);
1124  return needed;
1125  }
1126 
1127  }
1128  }
1129  BESDEBUG(dmrpp_3, prolog << " END, dim: " << dim << endl);
1130 
1131  return nullptr;
1132 }
1133 
1153 void DmrppArray::insert_chunk(
1154  unsigned int dim,
1155  vector<unsigned int> *target_element_address,
1156  vector<unsigned int> *chunk_element_address,
1157  shared_ptr<Chunk> chunk,
1158  const vector<unsigned int> &constrained_array_shape){
1159 
1160  // The size, in elements, of each of the chunk's dimensions.
1161  const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
1162 
1163  // The chunk's origin point a.k.a. its "position in array".
1164  const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
1165 
1166  dimension thisDim = this->get_dimension(dim);
1167 
1168  // What's the first element that we are going to access for this dimension of the chunk?
1169  unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
1170 
1171  // Now we figure out the correct last element, based on the subset expression
1172  unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1173  if ((unsigned) thisDim.stop < end_element) {
1174  end_element = thisDim.stop;
1175  }
1176 
1177  unsigned long long chunk_end = end_element - chunk_origin[dim];
1178 
1179  unsigned int last_dim = chunk_shape.size() - 1;
1180  if (dim == last_dim) {
1181  char *source_buffer = chunk->get_rbuf();
1182  char *target_buffer = get_buf();
1183  unsigned int elem_width = prototype()->width();
1184 
1185  if (thisDim.stride == 1) {
1186  // The start element in this array
1187  unsigned long long start_element = chunk_origin[dim] + chunk_start;
1188  // Compute how much we are going to copy
1189  unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
1190 
1191  // Compute where we need to put it.
1192  (*target_element_address)[dim] = (start_element - thisDim.start); // / thisDim.stride;
1193  // Compute where we are going to read it from
1194  (*chunk_element_address)[dim] = chunk_start;
1195 
1196  // See below re get_index()
1197  unsigned int target_char_start_index =
1198  get_index(*target_element_address, constrained_array_shape) * elem_width;
1199  unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1200 
1201  memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index,
1202  chunk_constrained_inner_dim_bytes);
1203  }
1204  else {
1205  // Stride != 1
1206  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1207  // Compute where we need to put it.
1208  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1209 
1210  // Compute where we are going to read it from
1211  (*chunk_element_address)[dim] = chunk_index;
1212 
1213  // These calls to get_index() can be removed as with the insert...unconstrained() code.
1214  unsigned int target_char_start_index =
1215  get_index(*target_element_address, constrained_array_shape) * elem_width;
1216  unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1217 
1218  memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
1219  }
1220  }
1221  }
1222  else {
1223  // Not the last dimension, so we continue to proceed down the Recursion Branch.
1224  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1225  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1226  (*chunk_element_address)[dim] = chunk_index;
1227 
1228  // Re-entry here:
1229  insert_chunk(dim + 1, target_element_address, chunk_element_address, chunk, constrained_array_shape);
1230  }
1231  }
1232 }
1233 
1234 void *one_chunk_thread(void *arg_list)
1235 {
1236  one_chunk_args *args = reinterpret_cast<one_chunk_args *>(arg_list);
1237 
1238  try {
1239  process_one_chunk(args->chunk, args->array, args->array_shape);
1240  }
1241  catch (BESError &error) {
1242  write(args->fds[1], &args->tid, sizeof(args->tid));
1243  delete args;
1244  pthread_exit(new string(error.get_verbose_message()));
1245  }
1246 
1247  // tid is a char and thus us written atomically. Writing this tells the parent
1248  // thread the child is complete and it should call pthread_join(tid, ...)
1249  write(args->fds[1], &args->tid, sizeof(args->tid));
1250  delete args;
1251  pthread_exit(NULL);
1252 }
1253 
1254 
1271 void process_one_chunk(shared_ptr<Chunk> chunk, DmrppArray *array, const vector<unsigned int> &constrained_array_shape)
1272 {
1273  BESDEBUG(dmrpp_3, prolog << "BEGIN" << endl );
1274 
1275  chunk->read_chunk();
1276 
1277  if (array->is_deflate_compression() || array->is_shuffle_compression())
1278  chunk->inflate_chunk(array->is_deflate_compression(), array->is_shuffle_compression(),
1279  array->get_chunk_size_in_elements(), array->var()->width());
1280 
1281  vector<unsigned int> target_element_address = chunk->get_position_in_array();
1282  vector<unsigned int> chunk_source_address(array->dimensions(), 0);
1283 
1284  array->insert_chunk(0 /* dimension */, &target_element_address, &chunk_source_address, chunk, constrained_array_shape);
1285  BESDEBUG(dmrpp_3, prolog << "END" << endl );
1286 }
1287 
1288 
1296 void process_super_chunk(const shared_ptr<SuperChunk>& super_chunk, DmrppArray *array)
1297 {
1298  BESDEBUG(dmrpp_3, prolog << "BEGIN" << endl );
1299  super_chunk->read();
1300 
1301  vector<unsigned int> constrained_array_shape = array->get_shape(true);
1302 
1303  for(auto &chunk :super_chunk->get_chunks()){
1304  if (array->is_deflate_compression() || array->is_shuffle_compression())
1305  chunk->inflate_chunk(array->is_deflate_compression(), array->is_shuffle_compression(),
1306  array->get_chunk_size_in_elements(), array->var()->width());
1307 
1308  vector<unsigned int> target_element_address = chunk->get_position_in_array();
1309  vector<unsigned int> chunk_source_address(array->dimensions(), 0);
1310 
1311  array->insert_chunk(0 /* dimension */, &target_element_address, &chunk_source_address, chunk, constrained_array_shape);
1312  }
1313 
1314  BESDEBUG(dmrpp_3, prolog << "END" << endl );
1315 }
1316 
1317 
1318 
1325 void read_chunks_concurrent(DmrppArray *array, queue<shared_ptr<SuperChunk>> &super_chunks)
1326 {
1327  BESStopWatch sw;
1328  if (BESDebug::IsSet(TIMING_LOG_KEY)) sw.start(prolog + "Timer name: "+prolog, "");
1329 
1330  // Parallel version based on read_chunks_unconstrained(). There is
1331  // substantial duplication of the code in read_chunks_unconstrained(), but
1332  // wait to remove that when we move to C++11 which has threads integrated.
1333 
1334  // We maintain a list of futures to track our parallel activities.
1335  list<future<void *>> futures;
1336  try {
1337  bool done = false;
1338  bool joined = true;
1339  while (!done) {
1340  // Returns true when it "get"s a future (joins a thread).
1341  // We do this until the futures have been "got".
1342  if(!futures.empty())
1343  joined = get_next_future(futures, WAIT_FOR_FUTURE_MS);
1344 
1345  // If joined is true this means that the thread_count has been decremented (because future::get()
1346  // has been called)
1347 
1348  // Next we check to see if there are still SuperChunks in the queue and we create new futures until
1349  // all the SuperChunks have been processed.
1350  if (joined){
1351  bool thread_started = true;
1352  while(thread_started && !super_chunks.empty()) {
1353  auto super_chunk = super_chunks.front();
1354  BESDEBUG(dmrpp_3, prolog << "Starting thread for " << super_chunk->to_string(false) << endl);
1355 
1356  auto *args = new one_super_chunk_args(super_chunk, array);
1357  thread_started = start_super_chunk_thread(futures, args);
1358 
1359  if (thread_started) {
1360  super_chunks.pop();
1361  BESDEBUG(dmrpp_3, prolog << "STARTED thread for" << super_chunk->to_string(false) << endl);
1362  } else {
1363  // Thread did not start, ownership of the arguments was not passed to the thread.
1364  delete args;
1365  BESDEBUG(dmrpp_3, prolog << "Thread not started, Returned SuperChunk to queue. " <<
1366  "thread_count: " << thread_counter << endl);
1367  }
1368  }
1369  }
1370  else if(!super_chunks.empty()){
1371  // TODO I can't see how this should happen (that there are super chunks left and yet we failed to
1372  // join prior to arriving here, so I laid a trap.
1373  stringstream msg;
1374  msg << prolog << "No threads joined, yet " << super_chunks.size() << " SuperChunks remain unread.";
1375  throw BESInternalError(msg.str(), __FILE__, __LINE__);
1376  }
1377  else {
1378  // No more SuperChunks and no joinable threads means we're done here.
1379  done = true;
1380  }
1381  joined = false;
1382  }
1383  }
1384  catch (...) {
1385  // Complete all of the futures, otherwise we'll have threads out there using up resources
1386  while(!futures.empty()){
1387  futures.back().get();
1388  futures.pop_back();
1389  }
1390  // re-throw the exception
1391  throw;
1392  }
1393 }
1394 
1401 void DmrppArray::read_chunks()
1402 {
1403  BESStopWatch sw;
1404  if (BESDebug::IsSet(TIMING_LOG_KEY)) sw.start(prolog + "Timer name: "+name(), "");
1405 
1406  auto chunk_refs = get_chunks();
1407  if (chunk_refs.size() < 2)
1408  throw BESInternalError(string("Expected chunks for variable ") + name(), __FILE__, __LINE__);
1409 
1410  // Find all the required chunks to read. I used a queue to preserve the chunk order, which
1411  // made using a debugger easier. However, order does not matter, AFAIK.
1412  queue<shared_ptr<SuperChunk>> super_chunks;
1413  auto current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk()) ;
1414  super_chunks.push(current_super_chunk);
1415 
1416  // TODO We know that non-contiguous chunks may be forward or backward in the file from
1417  // the current offset. When an add_chunk() call fails, prior to making a new SuperChunk
1418  // we might want want try adding the rejected Chunk to the other existing SuperChunks to see
1419  // if it's contiguous there.
1420  // Find the required Chunks and put them into SuperChunks.
1421  for(const auto& chunk: get_chunks()){
1422  vector<unsigned int> target_element_address = chunk->get_position_in_array();
1423  auto needed = find_needed_chunks(0 /* dimension */, &target_element_address, chunk);
1424  if (needed){
1425  bool added = current_super_chunk->add_chunk(chunk);
1426  if(!added){
1427  current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk());
1428  super_chunks.push(current_super_chunk);
1429  if(!current_super_chunk->add_chunk(chunk)){
1430  stringstream msg ;
1431  msg << prolog << "Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
1432  throw BESInternalError(msg.str(), __FILE__, __LINE__);
1433  }
1434  }
1435  }
1436  }
1437 
1438  reserve_value_capacity(get_size(true));
1439 
1440  BESDEBUG(dmrpp_3, prolog << "d_use_parallel_transfers: " << DmrppRequestHandler::d_use_parallel_transfers << endl);
1441  BESDEBUG(dmrpp_3, prolog << "d_max_parallel_transfers: " << DmrppRequestHandler::d_max_parallel_transfers << endl);
1442  BESDEBUG(dmrpp_3, prolog << "SuperChunks.size(): " << super_chunks.size() << endl);
1443 
1444  if (!DmrppRequestHandler::d_use_parallel_transfers) {
1445  // This version is the 'serial' version of the code. It reads a chunk, inserts it,
1446  // reads the next one, and so on.
1447  while (!super_chunks.empty()) {
1448  auto super_chunk = super_chunks.front();
1449  super_chunks.pop();
1450  BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(true) << endl );
1451  process_super_chunk(super_chunk, this);
1452 
1453  // SuperChunk::read_and_copy() (currently disabled)
1454  // does exactly the same thing as process_super_chunk()
1455  // in a class method.
1456  // super_chunk->chunks_to_array_values(this);
1457  }
1458  }
1459  else {
1460  read_chunks_concurrent(this, super_chunks);
1461  }
1462  set_read_p(true);
1463 }
1464 
1465 
1466 #ifdef USE_READ_SERIAL
1488 void DmrppArray::insert_chunk_serial(unsigned int dim, vector<unsigned int> *target_element_address, vector<unsigned int> *chunk_element_address,
1489  Chunk *chunk)
1490 {
1491  BESDEBUG("dmrpp", __func__ << " dim: "<< dim << " BEGIN "<< endl);
1492 
1493  // The size, in elements, of each of the chunk's dimensions.
1494  const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
1495 
1496  // The chunk's origin point a.k.a. its "position in array".
1497  const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
1498 
1499  dimension thisDim = this->get_dimension(dim);
1500 
1501  // Do we even want this chunk?
1502  if ((unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) || (unsigned) thisDim.stop < chunk_origin[dim]) {
1503  return; // No. No, we do not. Skip this.
1504  }
1505 
1506  // What's the first element that we are going to access for this dimension of the chunk?
1507  unsigned int first_element_offset = get_chunk_start(dim, chunk_origin);
1508 
1509  // Is the next point to be sent in this chunk at all? If no, return.
1510  if (first_element_offset > chunk_shape[dim]) {
1511  return;
1512  }
1513 
1514  // Now we figure out the correct last element, based on the subset expression
1515  unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1516  if ((unsigned) thisDim.stop < end_element) {
1517  end_element = thisDim.stop;
1518  }
1519 
1520  unsigned long long chunk_start = first_element_offset; //start_element - chunk_origin[dim];
1521  unsigned long long chunk_end = end_element - chunk_origin[dim];
1522  vector<unsigned int> constrained_array_shape = get_shape(true);
1523 
1524  unsigned int last_dim = chunk_shape.size() - 1;
1525  if (dim == last_dim) {
1526  // Read and Process chunk
1527  chunk->read_chunk();
1528 
1529  chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), var()->width());
1530 
1531  char *source_buffer = chunk->get_rbuf();
1532  char *target_buffer = get_buf();
1533  unsigned int elem_width = prototype()->width();
1534 
1535  if (thisDim.stride == 1) {
1536  // The start element in this array
1537  unsigned long long start_element = chunk_origin[dim] + first_element_offset;
1538  // Compute how much we are going to copy
1539  unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
1540 
1541  // Compute where we need to put it.
1542  (*target_element_address)[dim] = (start_element - thisDim.start) / thisDim.stride;
1543  // Compute where we are going to read it from
1544  (*chunk_element_address)[dim] = first_element_offset;
1545 
1546  unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
1547  unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1548 
1549  memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, chunk_constrained_inner_dim_bytes);
1550  }
1551  else {
1552  // Stride != 1
1553  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1554  // Compute where we need to put it.
1555  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1556 
1557  // Compute where we are going to read it from
1558  (*chunk_element_address)[dim] = chunk_index;
1559 
1560  unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
1561  unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1562 
1563  memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
1564  }
1565  }
1566  }
1567  else {
1568  // Not the last dimension, so we continue to proceed down the Recursion Branch.
1569  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1570  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1571  (*chunk_element_address)[dim] = chunk_index;
1572 
1573  // Re-entry here:
1574  insert_chunk_serial(dim + 1, target_element_address, chunk_element_address, chunk);
1575  }
1576  }
1577 }
1578 
1579 void DmrppArray::read_chunks_serial()
1580 {
1581  BESDEBUG("dmrpp", __func__ << " for variable '" << name() << "' - BEGIN" << endl);
1582 
1583  vector<Chunk> &chunk_refs = get_chunk_vec();
1584  if (chunk_refs.size() == 0) throw BESInternalError(string("Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
1585 
1586  // Allocate target memory.
1587  reserve_value_capacity(get_size(true));
1588 
1589  /*
1590  * Find the chunks to be read, make curl_easy handles for them, and
1591  * stuff them into our curl_multi handle. This is a recursive activity
1592  * which utilizes the same code that copies the data from the chunk to
1593  * the variables.
1594  */
1595  for (unsigned long i = 0; i < chunk_refs.size(); i++) {
1596  Chunk &chunk = chunk_refs[i];
1597 
1598  vector<unsigned int> chunk_source_address(dimensions(), 0);
1599  vector<unsigned int> target_element_address = chunk.get_position_in_array();
1600 
1601  // Recursive insertion operation.
1602  insert_chunk_serial(0, &target_element_address, &chunk_source_address, &chunk);
1603  }
1604 
1605  set_read_p(true);
1606 
1607  BESDEBUG("dmrpp", "DmrppArray::"<< __func__ << "() for " << name() << " END"<< endl);
1608 }
1609 #endif
1610 
1622 bool DmrppArray::read()
1623 {
1624  if (read_p()) return true;
1625 
1626  // Single chunk and 'contiguous' are the same for this code.
1627 
1628  if (get_immutable_chunks().size() == 1) { // Removed: || get_chunk_dimension_sizes().empty()) {
1629  BESDEBUG(dmrpp_4, "Calling read_contiguous() for " << name() << endl);
1630  read_contiguous(); // Throws on various errors
1631  }
1632  else { // Handle the more complex case where the data is chunked.
1633  if (!is_projected()) {
1634  BESDEBUG(dmrpp_4, "Calling read_chunks_unconstrained() for " << name() << endl);
1635  read_chunks_unconstrained();
1636  }
1637  else {
1638  BESDEBUG(dmrpp_4, "Calling read_chunks() for " << name() << endl);
1639  read_chunks();
1640  }
1641  }
1642 
1643  if (this->twiddle_bytes()) {
1644  int num = this->length();
1645  Type var_type = this->var()->type();
1646 
1647  switch (var_type) {
1648  case dods_int16_c:
1649  case dods_uint16_c: {
1650  dods_uint16 *local = reinterpret_cast<dods_uint16*>(this->get_buf());
1651  while (num--) {
1652  *local = bswap_16(*local);
1653  local++;
1654  }
1655  break;
1656  }
1657  case dods_int32_c:
1658  case dods_uint32_c: {
1659  dods_uint32 *local = reinterpret_cast<dods_uint32*>(this->get_buf());;
1660  while (num--) {
1661  *local = bswap_32(*local);
1662  local++;
1663  }
1664  break;
1665  }
1666  case dods_int64_c:
1667  case dods_uint64_c: {
1668  dods_uint64 *local = reinterpret_cast<dods_uint64*>(this->get_buf());;
1669  while (num--) {
1670  *local = bswap_64(*local);
1671  local++;
1672  }
1673  break;
1674  }
1675  default: break; // Do nothing for all other types..
1676  }
1677  }
1678 
1679  return true;
1680 }
1681 
1686 class PrintD4ArrayDimXMLWriter : public unary_function<Array::dimension &, void> {
1687  XMLWriter &xml;
1688  // Was this variable constrained using local/direct slicing? i.e., is d_local_constraint set?
1689  // If so, don't use shared dimensions; instead emit Dim elements that are anonymous.
1690  bool d_constrained;
1691 public:
1692 
1693  PrintD4ArrayDimXMLWriter(XMLWriter &xml, bool c) :
1694  xml(xml), d_constrained(c)
1695  {
1696  }
1697 
1698  void operator()(Array::dimension &d)
1699  {
1700  // This duplicates code in D4Dimensions (where D4Dimension::print_dap4() is defined
1701  // because of the need to print the constrained size of a dimension. I think that
1702  // the constraint information has to be kept here and not in the dimension (since they
1703  // are shared dims). Could hack print_dap4() to take the constrained size, however.
1704  if (xmlTextWriterStartElement(xml.get_writer(), (const xmlChar *) "Dim") < 0)
1705  throw InternalErr(__FILE__, __LINE__, "Could not write Dim element");
1706 
1707  string name = (d.dim) ? d.dim->fully_qualified_name() : d.name;
1708  // If there is a name, there must be a Dimension (named dimension) in scope
1709  // so write its name but not its size.
1710  if (!d_constrained && !name.empty()) {
1711  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "name",
1712  (const xmlChar *) name.c_str()) < 0)
1713  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1714  }
1715  else if (d.use_sdim_for_slice) {
1716  assert(!name.empty());
1717  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "name",
1718  (const xmlChar *) name.c_str()) < 0)
1719  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1720  }
1721  else {
1722  ostringstream size;
1723  size << (d_constrained ? d.c_size : d.size);
1724  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "size",
1725  (const xmlChar *) size.str().c_str()) < 0)
1726  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1727  }
1728 
1729  if (xmlTextWriterEndElement(xml.get_writer()) < 0)
1730  throw InternalErr(__FILE__, __LINE__, "Could not end Dim element");
1731  }
1732 };
1733 
1734 class PrintD4ConstructorVarXMLWriter : public unary_function<BaseType *, void> {
1735  XMLWriter &xml;
1736  bool d_constrained;
1737 public:
1738  PrintD4ConstructorVarXMLWriter(XMLWriter &xml, bool c) :
1739  xml(xml), d_constrained(c)
1740  {
1741  }
1742 
1743  void operator()(BaseType *btp)
1744  {
1745  btp->print_dap4(xml, d_constrained);
1746  }
1747 };
1748 
1749 class PrintD4MapXMLWriter : public unary_function<D4Map *, void> {
1750  XMLWriter &xml;
1751 
1752 public:
1753  PrintD4MapXMLWriter(XMLWriter &xml) :
1754  xml(xml)
1755  {
1756  }
1757 
1758  void operator()(D4Map *m)
1759  {
1760  m->print_dap4(xml);
1761  }
1762 };
1764 
1788 void DmrppArray::print_dap4(XMLWriter &xml, bool constrained /*false*/)
1789 {
1790  if (constrained && !send_p()) return;
1791 
1792  if (xmlTextWriterStartElement(xml.get_writer(), (const xmlChar *) var()->type_name().c_str()) < 0)
1793  throw InternalErr(__FILE__, __LINE__, "Could not write " + type_name() + " element");
1794 
1795  if (!name().empty())
1796  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "name", (const xmlChar *) name().c_str()) <
1797  0)
1798  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1799 
1800  // Hack job... Copied from D4Enum::print_xml_writer. jhrg 11/12/13
1801  if (var()->type() == dods_enum_c) {
1802  D4Enum *e = static_cast<D4Enum *>(var());
1803  string path = e->enumeration()->name();
1804  if (e->enumeration()->parent()) {
1805  // print the FQN for the enum def; D4Group::FQN() includes the trailing '/'
1806  path = static_cast<D4Group *>(e->enumeration()->parent()->parent())->FQN() + path;
1807  }
1808  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "enum", (const xmlChar *) path.c_str()) < 0)
1809  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for enum");
1810  }
1811 
1812  if (prototype()->is_constructor_type()) {
1813  Constructor &c = static_cast<Constructor &>(*prototype());
1814  for_each(c.var_begin(), c.var_end(), PrintD4ConstructorVarXMLWriter(xml, constrained));
1815  // bind2nd(mem_fun_ref(&BaseType::print_dap4), xml));
1816  }
1817 
1818  // Drop the local_constraint which is per-array and use a per-dimension on instead
1819  for_each(dim_begin(), dim_end(), PrintD4ArrayDimXMLWriter(xml, constrained));
1820 
1821  attributes()->print_dap4(xml);
1822 
1823  for_each(maps()->map_begin(), maps()->map_end(), PrintD4MapXMLWriter(xml));
1824 
1825  // Only print the chunks info if there. This is the code added to libdap::Array::print_dap4().
1826  // jhrg 5/10/18
1827  if (DmrppCommon::d_print_chunks && get_immutable_chunks().size() > 0)
1828  print_chunks_element(xml, DmrppCommon::d_ns_prefix);
1829 
1830  // If this variable uses the COMPACT layout, encode the values for
1831  // the array using base64. Note that strings are a special case; each
1832  // element of the array is a string and is encoded in its own base64
1833  // xml element. So, wghile an array of 10 int32 will be encoded in a
1834  // single base64 element, an array of 10 strings will use 10 base64
1835  // elements. This is because the size of each string's value is different.
1836  // Not so for an int32.
1837  if (DmrppCommon::d_print_chunks && is_compact_layout() && read_p()) {
1838  switch (var()->type()) {
1839  case dods_byte_c:
1840  case dods_char_c:
1841  case dods_int8_c:
1842  case dods_uint8_c:
1843  case dods_int16_c:
1844  case dods_uint16_c:
1845  case dods_int32_c:
1846  case dods_uint32_c:
1847  case dods_int64_c:
1848  case dods_uint64_c:
1849 
1850  case dods_enum_c:
1851 
1852  case dods_float32_c:
1853  case dods_float64_c: {
1854  u_int8_t *values = 0;
1855  try {
1856  size_t size = buf2val(reinterpret_cast<void **>(&values));
1857  string encoded = base64::Base64::encode(values, size);
1858  delete[] values;
1859  print_compact_element(xml, DmrppCommon::d_ns_prefix, encoded);
1860  }
1861  catch (...) {
1862  delete[] values;
1863  throw;
1864  }
1865  break;
1866  }
1867 
1868  case dods_str_c:
1869  case dods_url_c: {
1870  string *values = 0;
1871  try {
1872  // discard the return value of buf2val()
1873  buf2val(reinterpret_cast<void **>(&values));
1874  string str;
1875  for (int i = 0; i < length(); ++i) {
1876  str = (*(static_cast<string *> (values) + i));
1877  string encoded = base64::Base64::encode(reinterpret_cast<const u_int8_t *>(str.c_str()), str.size());
1878  print_compact_element(xml, DmrppCommon::d_ns_prefix, encoded);
1879  }
1880  delete[] values;
1881  }
1882  catch (...) {
1883  delete[] values;
1884  throw;
1885  }
1886  break;
1887  }
1888 
1889  default:
1890  throw InternalErr(__FILE__, __LINE__, "Vector::val2buf: bad type");
1891  }
1892  }
1893  if (xmlTextWriterEndElement(xml.get_writer()) < 0)
1894  throw InternalErr(__FILE__, __LINE__, "Could not end " + type_name() + " element");
1895 }
1896 
1897 void DmrppArray::dump(ostream &strm) const
1898 {
1899  strm << BESIndent::LMarg << "DmrppArray::" << __func__ << "(" << (void *) this << ")" << endl;
1900  BESIndent::Indent();
1901  DmrppCommon::dump(strm);
1902  Array::dump(strm);
1903  strm << BESIndent::LMarg << "value: " << "----" << /*d_buf <<*/endl;
1904  BESIndent::UnIndent();
1905 }
1906 
1907 } // namespace dmrpp
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
Definition: BESDebug.h:160
Abstract exception class for the BES with basic string message.
Definition: BESError.h:58
exception thrown if internal error encountered
virtual bool start(std::string name)
Definition: BESStopWatch.cc:67
Extend libdap::Array so that a handler can read data using a DMR++ file.
Definition: DmrppArray.h:64
virtual std::vector< unsigned int > get_shape(bool constrained)
Get the array shape.
Definition: DmrppArray.cc:321
virtual unsigned int get_chunk_size_in_elements() const
Get the number of elements in this chunk.
Definition: DmrppCommon.h:166
virtual bool is_shuffle_compression() const
Returns true if this object utilizes shuffle compression.
Definition: DmrppCommon.h:129
virtual bool is_deflate_compression() const
Returns true if this object utilizes deflate compression.
Definition: DmrppCommon.h:119
Type
Type of JSON value.
Definition: rapidjson.h:664