41 #include <curl/curl.h>
44 #include "D4Dimensions.h"
45 #include "D4StreamMarshaller.h"
47 #include "BESInternalError.h"
49 #include "CurlUtils.h"
50 #include "TheBESKeys.h"
53 #include "BESStopWatch.h"
56 #include "HttpNames.h"
57 #include "EffectiveUrl.h"
58 #include "EffectiveUrlCache.h"
59 #include "RemoteResource.h"
62 #include "CredentialsManager.h"
63 #include "AccessCredentials.h"
64 #include "CredentialsManager.h"
65 #include "CurlHandlePool.h"
66 #include "DmrppCommon.h"
67 #include "DmrppRequestHandler.h"
68 #include "DmrppByte.h"
69 #include "DmrppArray.h"
71 #include "DmrppTypeFactory.h"
72 #include "DmrppD4Group.h"
73 #include "DmrppParserSax2.h"
82 bool bes_debug =
false;
88 #define prolog std::string("retriever::").append(__func__).append("() - ")
90 #define NULL_BODY_HASH "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
98 char *s_err = strerror(errno);
102 return "Unknown error.";
116 const string &bes_config_file,
117 const string &bes_log_file,
118 const string &bes_debug_log_file,
119 const string &bes_debug_keys,
120 const string &http_netrc_file,
121 const string &http_cache_dir
123 if (debug) cerr << prolog <<
"BEGIN" << endl;
130 if (bes_debug)
BESDebug::SetUp(bes_debug_log_file +
"," + bes_debug_keys);
133 if (!http_netrc_file.empty()) {
137 if (!http_cache_dir.empty()) {
144 if (debug) cerr << prolog <<
"END" << endl;
148 curl_slist *aws_sign_request_url(
const string &target_url, curl_slist *request_headers) {
150 if (debug) cerr << prolog <<
"BEGIN" << endl;
153 if (credentials && credentials->
is_s3_cred()) {
155 cerr << prolog <<
"Got AWS S3 AccessCredentials instance: " << endl << credentials->to_json() << endl;
158 const std::time_t request_time = std::time(0);
160 const std::string auth_header =
161 AWSV4::compute_awsv4_signature(
164 credentials->
get(AccessCredentials::ID_KEY),
165 credentials->
get(AccessCredentials::KEY_KEY),
166 credentials->
get(AccessCredentials::REGION_KEY),
172 request_headers = curl::append_http_header(request_headers,
"Authorization", auth_header);
175 request_headers = curl::append_http_header(request_headers,
"x-amz-content-sha256", NULL_BODY_HASH);
176 request_headers = curl::append_http_header(request_headers,
"x-amz-date", AWSV4::ISO8601_date(request_time));
178 if (debug) cerr << prolog <<
"END" << endl;
179 return request_headers;
187 size_t get_remote_size(
string url,
bool aws_signing) {
188 if (debug) cerr << prolog <<
"BEGIN" << endl;
190 char error_buffer[CURL_ERROR_SIZE];
191 std::vector<std::string> resp_hdrs;
192 curl_slist *request_headers = NULL;
194 request_headers = curl::add_auth_headers(request_headers);
197 request_headers = aws_sign_request_url(url, request_headers);
199 CURL *ceh = curl::init(url, request_headers, &resp_hdrs);
200 curl::set_error_buffer(ceh, error_buffer);
203 CURLcode curl_status = curl_easy_setopt(ceh, CURLOPT_NOBODY, 1L);
204 curl::eval_curl_easy_setopt_result(curl_status, prolog,
"CURLOPT_NOBODY", error_buffer, __FILE__, __LINE__);
206 if (Debug) cerr << prolog <<
"cURL HEAD request is configured" << endl;
208 curl::super_easy_perform(ceh);
210 curl::unset_error_buffer(ceh);
212 curl_slist_free_all(request_headers);
214 curl_easy_cleanup(ceh);
217 size_t how_big_it_is = 0;
218 string content_length_hdr_key(
"content-length: ");
219 for (
size_t i = 0; !done && i < resp_hdrs.size(); i++) {
220 if (Debug) cerr << prolog <<
"HEADER[" << i <<
"]: " << resp_hdrs[i] << endl;
222 size_t index = lc_header.find(content_length_hdr_key);
224 string value = lc_header.substr(content_length_hdr_key.size());
225 how_big_it_is = stol(value);
230 throw BESInternalError(prolog +
"Failed to determine size of target resource: " + url, __FILE__, __LINE__);
232 if (debug) cerr << prolog <<
"END" << endl;
234 return how_big_it_is;
236 size_t get_max_retrival_size(
const size_t &max_target_size,
const string &effectiveUrl) {
237 size_t target_size = max_target_size;
238 if (max_target_size == 0) {
239 target_size = get_remote_size(effectiveUrl,
true);
240 if (debug) cerr << prolog <<
"Remote resource size is " << max_target_size <<
" bytes. " << endl;
250 void simple_get(
const string target_url,
const string output_file_base) {
252 string output_file = output_file_base +
"_simple_get.out";
253 vector<string> resp_hdrs;
254 mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
256 if ((fd = open(output_file.c_str(), O_WRONLY | O_CREAT | O_TRUNC, mode)) < 0) {
261 sw.
start(prolog +
"url: " + target_url);
262 curl::http_get_and_write_resource(target_url, fd,
268 for (
size_t i = 0; i < resp_hdrs.size(); i++) {
269 cerr << prolog <<
"ResponseHeader[" << i <<
"]: " << resp_hdrs[i] << endl;
282 void make_chunks(
const string &target_url,
const size_t &target_size,
const size_t &chunk_count,
283 vector<dmrpp::Chunk *> &chunks) {
284 if (debug) cerr << prolog <<
"BEGIN" << endl;
285 size_t chunk_size = target_size / chunk_count;
286 size_t chunk_start = 0;
288 for (chunk_index = 0; chunk_index < chunk_count; chunk_index++) {
289 vector<unsigned int> position_in_array;
290 position_in_array.push_back(chunk_index);
292 cerr << prolog <<
"chunks[" << chunk_index <<
"] chunk_start: " << chunk_start <<
" chunk_size: "
293 << chunk_size << endl;
294 auto chunk =
new dmrpp::Chunk(target_url,
"LE", chunk_size, chunk_start, position_in_array);
295 chunk_start += chunk_size;
296 chunks.push_back(chunk);
298 if (target_size % chunk_size) {
300 size_t last_chunk_size = target_size - chunk_start;
302 cerr << prolog <<
"Remainder chunk. chunk[" << chunks.size() <<
"] last_chunk_size: " << last_chunk_size
305 cerr << prolog <<
"Remainder chunk! target_size: " << target_size <<
" index: " << chunk_index
306 <<
" last_chunk_start: " << chunk_start <<
" last_chunk_size: " << last_chunk_size << endl;
307 if (last_chunk_size > 0) {
308 vector<unsigned int> position_in_array;
309 position_in_array.push_back(chunk_index);
311 cerr << prolog <<
"chunks[" << chunk_index <<
"] chunk_start: " << chunk_start <<
" chunk_size: "
312 << last_chunk_size << endl;
313 auto last_chunk =
new dmrpp::Chunk(target_url,
"LE", last_chunk_size, chunk_start, position_in_array);
314 chunks.push_back(last_chunk);
317 if (debug) cerr << prolog <<
"END chunks: " << chunks.size() << endl;
327 void serial_chunky_get(
const string &target_url,
const size_t target_size,
const unsigned long chunk_count,
328 const string &output_file_base) {
331 if (debug) cerr << prolog <<
"curl::retrieve_effective_url() returned: " << effectiveUrl << endl;
332 size_t retrieval_size = get_max_retrival_size(target_size, effectiveUrl);
334 string output_file = output_file_base +
"_serial_chunky_get.out";
335 vector<dmrpp::Chunk *> chunks;
336 make_chunks(target_url, retrieval_size, chunk_count, chunks);
339 ofs.open(output_file, std::fstream::in | std::fstream::out | std::ofstream::trunc | std::ofstream::binary);
341 throw BESInternalError(prolog +
"Failed to open file: " + output_file, __FILE__, __LINE__);
343 for (
size_t i = 0; i < chunks.size(); i++) {
345 ss << prolog <<
"chunk={index: " << i <<
", offset: " << chunks[i]->get_offset() <<
", size: "
346 << chunks[i]->get_size() <<
"}";
351 chunks[i]->read_chunk();
354 if (debug) cerr << ss.str() <<
" retrieval from: " << target_url <<
" completed, timing finished." << endl;
355 ofs.write(chunks[i]->get_rbuf(), chunks[i]->get_rbuf_size());
356 if (debug) cerr << ss.str() <<
" has been written to: " << output_file << endl;
358 auto itr = chunks.begin();
359 while (itr != chunks.end()) {
368 void parse_dmrpp(
const string &dmrpp_filename_url){
369 if(debug) cerr << prolog <<
"BEGIN" << endl;
372 string target_file_url = dmrpp_filename_url;
375 const string http_protocol(
"http://");
376 const string https_protocol(
"https://");
377 const string file_protocol(
"file://");
379 if(debug) cerr << prolog <<
"dmrpp_filename_url: " << dmrpp_filename_url << endl;
381 if(target_file_url.empty())
382 throw BESInternalError(prolog +
"The dmr++ filename was empty.", __FILE__, __LINE__);
385 if(target_file_url.rfind(http_protocol,0)==0 || target_file_url.rfind(https_protocol,0)==0 ){
388 target_resource.retrieveResource();
389 target_file = target_resource.getCacheFileName();
391 else if(target_file_url.rfind(file_protocol,0)==0){
392 target_file = target_file_url.substr(file_protocol.length());
395 target_file_url = file_protocol + target_file_url;
398 if(debug) cerr << prolog <<
" target_file: " << target_file << endl;
400 ifstream ifs(target_file);
402 throw BESInternalError(prolog +
"Failed open to dmr++ file: " + dmrpp_filename_url, __FILE__, __LINE__);
406 dmr.set_href(target_file_url);
408 msg << prolog << dmrpp_filename_url;
416 cerr << prolog <<
"Built dataset: " << endl;
418 libdap::XMLWriter xmlWriter;
419 dmr.print_dmrpp(xmlWriter, dmr.get_href());
420 cerr << xmlWriter.get_doc() << endl;
422 if(debug) cerr << prolog <<
"END" << endl;
435 void add_chunks(
const string &target_url,
const size_t &target_size,
const size_t &chunk_count,
438 if (debug) cerr << prolog <<
"BEGIN" << endl;
440 size_t chunk_size = target_size / chunk_count;
442 throw BESInternalError(prolog +
"Chunk size was zero.", __FILE__, __LINE__);
443 stringstream chunk_dim_size;
444 chunk_dim_size << chunk_size;
447 size_t chunk_start = 0;
449 for (chunk_index = 0; chunk_index < chunk_count; chunk_index++) {
450 vector<unsigned int> position_in_array;
451 position_in_array.push_back(chunk_start);
453 cerr << prolog <<
"chunks[" << chunk_index <<
"] chunk_start: " << chunk_start <<
" chunk_size: "
454 << chunk_size <<
" chunk_poa: " << position_in_array[0] << endl;
455 target_array->
add_chunk(target_url,
"LE", chunk_size, chunk_start, position_in_array);
456 chunk_start += chunk_size;
458 if (target_size % chunk_size) {
460 size_t last_chunk_size = target_size - chunk_start;
462 cerr << prolog <<
"Remainder chunk! target_size: " << target_size <<
" index: " << chunk_index
463 <<
" last_chunk_start: " << chunk_start <<
" last_chunk_size: " << last_chunk_size << endl;
464 if (last_chunk_size > 0) {
465 vector<unsigned int> position_in_array;
466 position_in_array.push_back(chunk_start);
468 cerr << prolog <<
"chunks[" << chunk_index <<
"] chunk_start: " << chunk_start <<
" chunk_size: "
469 << last_chunk_size <<
" chunk_poa: " << position_in_array[0] << endl;
470 target_array->
add_chunk(target_url,
"LE", last_chunk_size, chunk_start, position_in_array);
473 if (debug) cerr << prolog <<
"END" << endl;
485 size_t array_get(
const string &target_url,
const size_t &target_size,
const size_t &chunk_count,
486 const string &output_file_base) {
488 if (debug) cerr << prolog <<
"BEGIN" << endl;
489 string output_file = output_file_base +
"_array_get.out";
491 ofs.open(output_file, std::fstream::in | std::fstream::out | std::ofstream::trunc | std::ofstream::binary);
493 throw BESInternalError(prolog +
"Failed to open file: " + output_file, __FILE__, __LINE__);
499 target_array->append_dim(target_size);
500 add_chunks(target_url, target_size, chunk_count, target_array);
501 target_array->set_send_p(
true);
505 dmr.set_href(target_url);
507 root->add_var_nocopy(target_array);
508 root->set_in_selection(
true);
511 cerr << prolog <<
"Built dataset: " << endl;
513 libdap::XMLWriter xmlWriter;
514 dmr.print_dmrpp(xmlWriter, dmr.get_href());
515 cerr << xmlWriter.get_doc() << endl;
519 stringstream timer_msg;
520 timer_msg << prolog <<
"DmrppD4Group.intern_data() for " << target_size <<
" bytes in " << chunk_count <<
521 " chunks, parallel transfers ";
522 if (dmrpp::DmrppRequestHandler::d_use_parallel_transfers) {
523 timer_msg <<
"enabled. (max: " << dmrpp::DmrppRequestHandler::d_max_parallel_transfers <<
")";
525 timer_msg <<
"disabled.";
528 sw.
start(timer_msg.str());
532 size_t started = ofs.tellp();
533 libdap::D4StreamMarshaller streamMarshaller(ofs);
534 root->serialize(streamMarshaller, dmr);
536 size_t stopped = ofs.tellp();
537 size_t numberOfBytesWritten = stopped - started;
538 if (debug) cerr << prolog <<
"target_size: " << target_size <<
" numberOfBytesWritten: " << numberOfBytesWritten << endl;
541 if (debug) cerr << prolog <<
"END" << endl;
542 return numberOfBytesWritten;
577 int test_plan_01(
const string &target_url,
578 const string &output_prefix,
579 const unsigned int reps,
580 const size_t retrieval_size,
581 const unsigned int power_of_two_chunk_count,
582 const unsigned int power_of_two_threads_max,
583 const string &output_file_base
587 cerr << prolog <<
"BEGIN" << endl;
592 cerr << prolog <<
"curl::retrieve_effective_url() returned: " << effectiveUrl << endl;
593 size_t target_size = get_max_retrival_size(retrieval_size, effectiveUrl);
596 size_t chunk_count = 2;
597 for (
size_t chunk_pwr = 1; chunk_pwr <= power_of_two_chunk_count; chunk_pwr++) {
600 dmrpp::DmrppRequestHandler::d_use_parallel_transfers =
false;
601 for (
unsigned int rep = 0; rep < reps; rep++) {
602 array_get(effectiveUrl, target_size, chunk_count, output_file_base );
606 dmrpp::DmrppRequestHandler::d_use_parallel_transfers =
true;
607 unsigned int thread_count = 2;
608 for (
unsigned int tpwr = 1; tpwr <= power_of_two_threads_max; tpwr++) {
609 dmrpp::DmrppRequestHandler::d_max_parallel_transfers = thread_count;
610 for (
unsigned int rep = 0; rep < reps; rep++) {
611 array_get(effectiveUrl, target_size, chunk_count, output_file_base);
621 cerr << prolog <<
"Caught BESError. Message: " << e.
get_message() <<
" " << e.
get_file()<<
":" << e. get_line() << endl;
625 cerr << prolog <<
"Caught Unknown Exception." <<
629 cerr << prolog <<
"END" << endl;
640 int main(
int argc,
char *argv[]) {
644 string bes_debug_log_file =
"cerr";
645 string bes_debug_keys =
"bes,http,curl,dmrpp,dmrpp:3,dmrpp:4,rr";
646 string target_url =
"https://www.opendap.org/pub/binary/hyrax-1.16/centos-7.x/bes-debuginfo-3.20.7-1.static.el7.x86_64.rpm";
647 string output_file_base(
"retriever");
648 string http_cache_dir;
650 size_t pwr2_number_o_chunks = 18;
651 size_t max_target_size = 0;
652 string http_netrc_file;
653 unsigned int reps=10;
654 unsigned pwr2_parallel_reads = 0;
655 bool aws_sign_request_url =
false;
657 char *prefixCstr = getenv(
"prefix");
666 GetOpt getopt(argc, argv,
"h:r:n:C:c:o:u:l:S:dbDp:A");
668 while ((option_char = getopt()) != -1) {
669 switch (option_char) {
681 aws_sign_request_url =
true;
684 bes_config_file = getopt.optarg;
687 target_url = getopt.optarg;
690 bes_log_file = getopt.optarg;
693 http_netrc_file = getopt.optarg;
696 output_file_base = getopt.optarg;
699 pwr2_number_o_chunks = atol(getopt.optarg);
702 max_target_size = atol(getopt.optarg);
705 pwr2_parallel_reads = atol(getopt.optarg);
708 reps = atol(getopt.optarg);
711 http_cache_dir = getopt.optarg;
719 if (bes_log_file.empty()) {
720 bes_log_file = output_file_base +
"_bes.log";
723 cerr << prolog <<
"-- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - " << endl;
724 cerr << prolog <<
"debug: " << (debug ?
"true" :
"false") << endl;
725 cerr << prolog <<
"Debug: " << (Debug ?
"true" :
"false") << endl;
726 cerr << prolog <<
"bes_debug: " << (bes_debug ?
"true" :
"false") << endl;
727 cerr << prolog <<
"output_file_base: '" << output_file_base <<
"'" << endl;
728 cerr << prolog <<
"bes_config_file: '" << bes_config_file <<
"'" << endl;
729 cerr << prolog <<
"bes_log_file: '" << bes_log_file <<
"'" << endl;
730 cerr << prolog <<
"bes_debug_log_file: '" << bes_debug_log_file <<
"'" << endl;
731 cerr << prolog <<
"bes_debug_keys: '" << bes_debug_keys <<
"'" << endl;
732 cerr << prolog <<
"http_netrc_file: '" << http_netrc_file <<
"'" << endl;
733 cerr << prolog <<
"target_url: '" << target_url <<
"'" << endl;
734 cerr << prolog <<
"max_target_size: '" << max_target_size <<
"'" << endl;
735 cerr << prolog <<
"number_o_chunks: 2^" << pwr2_number_o_chunks << endl;
736 cerr << prolog <<
"reps: " << reps << endl;
737 if (pwr2_parallel_reads)
738 cerr << prolog <<
"parallel_reads: ENABLED (max: 2^" << pwr2_parallel_reads <<
")" << endl;
740 cerr << prolog <<
"parallel_reads: DISABLED" << endl;
741 cerr << prolog <<
"-- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - " << endl;
745 if(pwr2_parallel_reads){
746 unsigned long long int max_threads = 1ULL << pwr2_parallel_reads;
747 dmrpp::DmrppRequestHandler::d_use_parallel_transfers =
true;
748 dmrpp::DmrppRequestHandler::d_max_parallel_transfers = max_threads;
751 dmrpp::DmrppRequestHandler::d_use_parallel_transfers =
false;
752 dmrpp::DmrppRequestHandler::d_max_parallel_transfers = 1;
756 bes_debug_keys, http_netrc_file,http_cache_dir);
759 if (debug) cerr << prolog <<
"curl::retrieve_effective_url() returned: " << effectiveUrl << endl;
760 size_t target_size = get_max_retrival_size(max_target_size, effectiveUrl);
762 unsigned long long int chunks = 1ULL << pwr2_number_o_chunks;
763 if (debug) cerr << prolog <<
"Dividing target into " << chunks <<
" chunks." << endl;
767 array_get(effectiveUrl, target_size, chunks, output_file_base);
771 result = test_plan_01(
776 pwr2_number_o_chunks,
780 simple_get(effectiveUrl, output_file_base);
781 serial_chunky_get( effectiveUrl, max_target_size, pwr2_number_o_chunks, output_file_base);
783 parse_dmrpp(target_url);
788 cerr << prolog <<
"curl::retrieve_effective_url() returned: " << effectiveUrl << endl;
789 target_size = get_max_retrival_size(retrieval_size, effectiveUrl);
790 array_get(effectiveUrl, max_target_size, pwr2_number_o_chunks, output_file_base);
793 curl_global_cleanup();
802 cerr << prolog <<
"Caught Unknown Exception." << endl;
virtual std::string get(const std::string &key)
virtual bool is_s3_cred()
Do the URL, ID, Key amd Region items make up an S3 Credential?
static void SetUp(const std::string &values)
Sets up debugging for the bes.
Abstract exception class for the BES with basic string message.
virtual int get_line()
get the line number where the exception was thrown
virtual std::string get_file()
get the file name where the exception was thrown
virtual std::string get_message()
get the error message for this exception
exception thrown if internal error encountered
virtual bool start(std::string name)
static std::string lowercase(const std::string &s)
static std::string assemblePath(const std::string &firstPart, const std::string &secondPart, bool leadingSlash=false, bool trailingSlash=false)
Assemble path fragments making sure that they are separated by a single '/' character.
AccessCredentials * get(const std::string &url)
static TheBESKeys * TheKeys()
void set_key(const std::string &key, const std::string &val, bool addto=false)
allows the user to set key/value pairs from within the application.
static std::string ConfigFile
Provide a way to print the DMR++ response.
Extend libdap::Array so that a handler can read data using a DMR++ file.
static bool d_print_chunks
if true, print_dap4() prints chunk elements
virtual unsigned long add_chunk(const std::string &data_url, const std::string &byte_order, unsigned long long size, unsigned long long offset, const std::string &position_in_array="")
Add a new chunk as defined by an h4:byteStream element.
virtual void parse_chunk_dimension_sizes(const std::string &chunk_dim_sizes_string)
Set the dimension sizes for a chunk.
void intern(std::istream &f, libdap::DMR *dest_dmr)
static EffectiveUrlCache * TheCache()
Get the singleton BESCatalogList instance.
std::string get_effective_url(const std::string &source_url)