dmlite  0.6
TaskExec.h
Go to the documentation of this file.
1 /*
2  * Copyright 2015 CERN
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 
18 
19 
20 /** @file dmTaskExec.h
21  * @brief A class that spawns commands that perform actions
22  * @author Fabrizio Furano
23  * @date Dec 2015
24  */
25 
26 
27 #include <boost/thread.hpp>
28 #include <signal.h>
29 #include <vector>
30 #include <string>
31 #include <algorithm>
32 #include <sstream>
33 #include <iterator>
34 #include <iostream>
35 
36 namespace dmlite {
37  class dmTaskExec;
38 
39  class dmTask: public boost::mutex {
40 
41  protected:
42  /// Threads waiting for result about this task will wait and synchronize here
43  /// using something like
44  /// boost::lock_guard< boost::mutex > l(workmutex);
45  ///
46  boost::condition_variable condvar;
47  public:
48  dmTask(dmTaskExec *wheretolog);
49  dmTask(const dmTask &o) {
50  key = o.key;
51  cmd = o.cmd;
52  for(unsigned int i = 0; i < 64; i++) parms[i] = NULL;
54  starttime = o.starttime;
55  endtime = o.endtime;
56  finished = o.finished;
57  fd[0] = 0; fd[1] = 0; fd[2] = 0;
58  this->stdout = o.stdout;
59  this->loggerinst = o.loggerinst;
60  }
61 
62  ~dmTask();
63  int key;
64 
65  std::string cmd;
66  const char *parms[64];
67 
69 
70  time_t starttime, endtime;
71  bool finished;
72 
73  int fd[3];
74  pid_t pid;
75  std::string stdout;
76 
77  /// Split che command string into the single parms
78  void splitCmd();
79 
80  /// Wait until the task has finished or the timeout is expired
81  int waitFinished(int tmout=5);
82 
83  void notifyAll() {
84  condvar.notify_all();
85  }
86 
88  };
89 
90 
91  /// Allows to spawn commands, useful for checksum calculations or file pulling
92  /// The spawned commands are pollable, i.e. in a given moment it's possible to
93  /// know the list of commands that are still running.
94  /// Objects belonging to this class in general are created in the disk nodes,
95  /// e.g. for running checksums or file copies and pulls
96  class dmTaskExec: public boost::recursive_mutex {
97 
98  public:
99  dmTaskExec();
100  ~dmTaskExec();
101  std::string instance;
102  /// Executes a command. Returns a positive integer as a key to reference
103  /// the execution status and the result
104  /// The mechanics is that a detached thread is started. This guy invokes popen3
105  /// and blocks waiting for the process to end. Upon end it updates the corresponding
106  /// instance of dmTask with the result and the stdout
107  int submitCmd(std::string cmd);
108 
109 
110  /// Executes a command. Returns a positive integer as a key to reference
111  // the execution status and the result
112  // The mechanics is that a detached thread is started. This guy invokes popen3
113  // and blocks waiting for the process to end. Upon end it updates the corresponding
114  // instance of dmTask with the result and the stdout
115  // -1 is returned in case of error in the submission
116  int submitCmd(std::vector<std::string> &args);
117 
118  /// Actually starts the thread corresponding to a command that was just submitted
119  /// Avoids race conditions
120  void goCmd(int id);
121 
122  /// Split che command string into the single parms
123  void assignCmd(dmTask *task, std::vector<std::string> &args);
124 
125  /// Get the results of a task.
126  /// Wait at max tmout seconds until the task finishes
127  /// Return 0 if the task has finished and there is a result
128  /// Return nonzero if the task is still running
129  int waitResult(int taskID, int tmout=5);
130 
131  //kill a specific task given the id
132  int killTask(int taskID);
133 
134  //get a dmTask given the id ( mainly for testing)
135  dmTask* getTask(int taskID);
136 
137  //get the current stdout of a task which may be running
138  int getTaskStdout(int taskID, std::string &stdout);
139 
140  /// Loops over all the tasks and:
141  /// - send a notification to the head node about all the processes that are running or that have finished
142  /// - garbage collect the task list.
143  /// - Task that are finished since long (e.g. 1 hour)
144  /// - Tasks that are stuck (e.g. 1 day)
145  void tick();
146 
147  /// Event invoked internally to log stuff
148  virtual void onLoggingRequest(Logger::Level lvl, std::string const & msg) = 0;
149  /// Event invoked internally to log stuff
150  virtual void onErrLoggingRequest(std::string const & msg) = 0;
151 
152  protected:
153 
154  /// event for immediate notifications when a task finishes
155  /// Subclasses can specialize this and apply app-dependent behavior to
156  /// perform actions when something has finished running
157  /// NOTE the signature. This passes copies of Task objects, not the originals
158  virtual void onTaskCompleted(dmTask &task);
159 
160  // event that notifies that a task is running
161  // This event can be invoked multiple times during the life of a task
162  /// NOTE the signature. This passes copies of Task objects, not the originals
163  virtual void onTaskRunning(dmTask &task);
164 
165 
166  private:
167 
168  int popen3(int fd[3], pid_t *pid, const char ** argv );
169 
170  /// Used to create keys to be inserted into the map. This has to be treated modulo MAXINT or similar big number
171  int taskcnt;
172  /// This map works like a sparse array :-)
173  std::map<int, dmTask*> tasks;
174 
175 
176  /// Here we invoke popen3
177  /// and block waiting for the process to end. Upon end it updates the corresponding
178  /// instance of dmTask with the result and the stdout
179  virtual void run(dmTask &task);
180 
181  friend void taskfunc(dmTaskExec *, int);
182 
183  //kill a specific task
184  int killTask(dmTask *task);
185  };
186 
187 
188 
189 }
std::map< int, dmTask * > tasks
This map works like a sparse array :-)
Definition: TaskExec.h:173
void splitCmd()
Split che command string into the single parms.
int key
Definition: TaskExec.h:63
Definition: TaskExec.h:39
dmTaskExec * loggerinst
Definition: TaskExec.h:87
Definition: TaskExec.h:96
virtual void onLoggingRequest(Logger::Level lvl, std::string const &msg)=0
Event invoked internally to log stuff.
int popen3(int fd[3], pid_t *pid, const char **argv)
int waitResult(int taskID, int tmout=5)
const char * parms[64]
Definition: TaskExec.h:66
bool finished
Definition: TaskExec.h:71
dmTask(dmTaskExec *wheretolog)
int waitFinished(int tmout=5)
Wait until the task has finished or the timeout is expired.
void goCmd(int id)
void assignCmd(dmTask *task, std::vector< std::string > &args)
Split che command string into the single parms.
int submitCmd(std::string cmd)
time_t starttime
Definition: TaskExec.h:70
int fd[3]
Definition: TaskExec.h:73
int taskcnt
Used to create keys to be inserted into the map. This has to be treated modulo MAXINT or similar big ...
Definition: TaskExec.h:171
void notifyAll()
Definition: TaskExec.h:83
friend void taskfunc(dmTaskExec *, int)
boost::condition_variable condvar
Definition: TaskExec.h:46
std::string stdout
Definition: TaskExec.h:75
std::string instance
Definition: TaskExec.h:101
std::string cmd
Definition: TaskExec.h:65
int getTaskStdout(int taskID, std::string &stdout)
time_t endtime
Definition: TaskExec.h:70
pid_t pid
Definition: TaskExec.h:74
virtual void onTaskCompleted(dmTask &task)
int resultcode
Definition: TaskExec.h:68
virtual void onTaskRunning(dmTask &task)
NOTE the signature. This passes copies of Task objects, not the originals.
dmTask(const dmTask &o)
Definition: TaskExec.h:49
int killTask(int taskID)
Level
Definition: logger.h:50
Namespace for the dmlite C++ API.
Definition: authn.h:15
virtual void run(dmTask &task)
virtual void onErrLoggingRequest(std::string const &msg)=0
Event invoked internally to log stuff.
dmTask * getTask(int taskID)