WRENCH  1.11
Cyberinfrastructure Simulation Workbench
Overview Installation Getting Started WRENCH 101 WRENCH 102
WorkflowTask.h
1 
10 #ifndef WRENCH_WORKFLOWTASK_H
11 #define WRENCH_WORKFLOWTASK_H
12 
13 #include <map>
14 #include <stack>
15 #include <set>
16 #include <memory>
17 
18 
19 #include "wrench/job/Job.h"
20 #include "wrench/data_file/DataFile.h"
21 #include "wrench/workflow/parallel_model/ParallelModel.h"
22 #include "wrench/workflow/parallel_model/AmdahlParallelModel.h"
23 #include "wrench/workflow/parallel_model/ConstantEfficiencyParallelModel.h"
24 #include "wrench/workflow/parallel_model/CustomParallelModel.h"
25 
26 #include <boost/graph/adjacency_list.hpp>
27 
28 namespace wrench {
29 
33  class WorkflowTask : public std::enable_shared_from_this<WorkflowTask> {
34 
35  public:
36  const std::string& getID() const;
37 
38  double getFlops() const;
39 
40  unsigned long getMinNumCores() const;
41 
42  unsigned long getMaxNumCores() const;
43 
44  std::shared_ptr<ParallelModel> getParallelModel() const;
45 
46  void setParallelModel(std::shared_ptr<ParallelModel> model);
47 
48  double getMemoryRequirement() const;
49 
50  unsigned long getNumberOfChildren();
51 
52  std::vector<std::shared_ptr<WorkflowTask>> getChildren();
53 
54  unsigned long getNumberOfParents();
55 
56  std::vector<std::shared_ptr<WorkflowTask>> getParents();
57 
58  void addInputFile(std::shared_ptr<DataFile>file);
59 
60  void addOutputFile(std::shared_ptr<DataFile>file);
61 
62  unsigned int getFailureCount();
63 
64  std::shared_ptr<WorkflowTask> getSharedPtr() { return this->shared_from_this(); }
65 
66 
67  /***********************/
69  /***********************/
70 
72  enum State {
83  };
84 
85  static std::string stateToString(WorkflowTask::State state);
86 
87  Job *getJob() const;
88 
89  std::shared_ptr<Workflow> getWorkflow() const;
90 
91  std::string getClusterID() const;
92 
93  void setClusterID(std::string);
94 
95  void setPriority(long);
96 
97  unsigned long getPriority() const;
98 
99  void setAverageCPU(double);
100 
101  double getAverageCPU() const;
102 
103  void setBytesRead(unsigned long);
104 
105  unsigned long getBytesRead() const;
106 
107  void setBytesWritten(unsigned long);
108 
109  unsigned long getBytesWritten() const;
110 
111  std::vector<std::shared_ptr<DataFile>> getInputFiles() const;
112 
113  std::vector<std::shared_ptr<DataFile>> getOutputFiles() const;
114 
115  unsigned long getTopLevel() const;
116 
117  double getStartDate() const;
118 
119  double getEndDate() const;
120 
121  double getFailureDate() const;
122 
123  double getTerminationDate() const;
124 
125  double getReadInputStartDate() const;
126 
127  double getReadInputEndDate() const;
128 
129  double getComputationStartDate() const;
130 
131  double getComputationEndDate() const;
132 
133  double getWriteOutputStartDate() const;
134 
135  double getWriteOutputEndDate() const;
136 
137  unsigned long getNumCoresAllocated() const;
138 
139  struct WorkflowTaskExecution;
140 
141  std::stack<WorkflowTaskExecution> getExecutionHistory() const;
142 
143  std::string getExecutionHost() const;
144 
145  std::string getPhysicalExecutionHost()const ;
146 
148 
149  std::string getStateAsString() const;
150 
151  std::string getColor() const;
152 
153  void setColor(std::string);
154 
155  /***********************/
157  /***********************/
158 
159 
160  /***********************/
162  /***********************/
163 
165  enum InternalState {
166  TASK_NOT_READY,
167  TASK_READY,
168  TASK_RUNNING,
169  TASK_COMPLETED,
170  TASK_FAILED
171  };
172 
173  void updateReadiness();
174 
175  static std::string stateToString(WorkflowTask::InternalState state);
176 
177  unsigned long updateTopLevel();
178 
179  void setInternalState(WorkflowTask::InternalState);
180 
181  void setState(WorkflowTask::State);
182 
183 // void setUpcomingState(WorkflowTask::State);
184 //
185 // WorkflowTask::State getUpcomingState() const;
186 
187  WorkflowTask::InternalState getInternalState() const;
188 
189  void setJob(Job *job);
190 
191  void setStartDate(double date);
192 
193  void updateStartDate(double date);
194 
195  void setEndDate(double date);
196 
197  void setReadInputStartDate(double date);
198 
199  void setReadInputEndDate(double date);
200 
201  void setComputationStartDate(double date);
202 
203  void setComputationEndDate(double date);
204 
205  void setWriteOutputStartDate(double date);
206 
207  void setWriteOutputEndDate(double date);
208 
209  void setFailureDate(double date);
210 
211  void setTerminationDate(double date);
212 
213  void incrementFailureCount();
214 
215  void setExecutionHost(std::string hostname);
216 
217  void setNumCoresAllocated(unsigned long num_cores);
218 
222  struct WorkflowTaskExecution {
224  double task_start = -1.0;
226  double read_input_start = -1.0;
228  double read_input_end = -1.0;
230  double computation_start = -1.0;
232  double computation_end = -1.0;
234  double write_output_start = -1.0;
236  double write_output_end = -1.0;
238  double task_end = -1.0;
240  double task_failed = -1.0;
242  double task_terminated = -1.0;
243 
245  std::string execution_host = "";
247  std::string physical_execution_host = "";
249  unsigned long num_cores_allocated = 0;
250 
256  WorkflowTaskExecution(double task_start) : task_start(task_start) {}
257 
258 
259  };
260 
261 
262  /***********************/
264  /***********************/
265 
266  private:
267  friend class Workflow;
268 
269  std::string id; // Task ID
270  std::string cluster_id; // ID for clustered task
271  std::string color; // A RGB color formatted as "#rrggbb"
272  double flops; // Number of flops
273  double average_cpu = -1; // Average CPU utilization
274  unsigned long bytes_read = -1; // Total bytes read in KB
275  unsigned long bytes_written = -1; // Total bytes written in KB
276  unsigned long min_num_cores;
277  unsigned long max_num_cores;
278  std::shared_ptr<ParallelModel> parallel_model;
279  double memory_requirement;
280  unsigned long priority = 0; // Task priority
281  unsigned long toplevel; // 0 if entry task
282  unsigned int failure_count = 0; // Number of times the tasks has failed
283  std::string execution_host; // Host on which the task executed ("" if not executed successfully - yet)
284  State visible_state; // To be exposed to developer level
285  State upcoming_visible_state; // A visible state that will become active once a WMS has process a previously sent workflow execution event
286  InternalState internal_state; // Not to be exposed to developer level
287 
288  std::shared_ptr<Workflow> workflow; // Containing workflow
289 
290  std::map<std::string, std::shared_ptr<DataFile>> output_files; // List of output files
291  std::map<std::string, std::shared_ptr<DataFile>> input_files; // List of input files
292 
293  // Private constructor (called by Workflow)
294  WorkflowTask(std::string id,
295  double t,
296  unsigned long min_num_cores,
297  unsigned long max_num_cores,
298  double memory_requirement);
299 
300  // Containing job
301  Job *job;
302 
303  std::stack<WorkflowTaskExecution> execution_history;
304 
305  friend class DagOfTasks;
306  };
307 };
308 
309 #endif //WRENCH_WORKFLOWTASK_H
wrench::WorkflowTask::PENDING
@ PENDING
Pending (has been submitted to a compute service)
Definition: WorkflowTask.h:78
wrench::WorkflowTask::getInputFiles
std::vector< std::shared_ptr< DataFile > > getInputFiles() const
Get the list of input DataFile objects for the task.
Definition: WorkflowTask.cpp:611
wrench::WorkflowTask::READY
@ READY
Ready (parents have completed)
Definition: WorkflowTask.h:76
wrench::WorkflowTask::addInputFile
void addInputFile(std::shared_ptr< DataFile >file)
Add an input file to the task.
Definition: WorkflowTask.cpp:57
wrench::WorkflowTask::getExecutionHistory
std::stack< WorkflowTaskExecution > getExecutionHistory() const
Get the execution history of this task.
Definition: WorkflowTask.cpp:587
wrench::WorkflowTask::getOutputFiles
std::vector< std::shared_ptr< DataFile > > getOutputFiles() const
Get the list of output DataFile objects for the task.
Definition: WorkflowTask.cpp:624
wrench::WorkflowTask::UNKNOWN
@ UNKNOWN
Some Unknown state (should not happen)
Definition: WorkflowTask.h:82
wrench::WorkflowTask::getState
WorkflowTask::State getState() const
Get the state of the task.
Definition: WorkflowTask.cpp:198
wrench::WorkflowTask::State
State
Task states.
Definition: WorkflowTask.h:72
wrench::WorkflowTask::getWorkflow
std::shared_ptr< Workflow > getWorkflow() const
Get the workflow that contains the task.
Definition: WorkflowTask.cpp:291
wrench::Job
Abstraction of a job used for executing tasks in a Workflow.
Definition: Job.h:34
wrench::WorkflowTask::getStateAsString
std::string getStateAsString() const
Get the state of the task as a string.
Definition: WorkflowTask.cpp:207
wrench::WorkflowTask::getAverageCPU
double getAverageCPU() const
Get the task average CPU usage.
Definition: WorkflowTask.cpp:383
wrench::WorkflowTask::setClusterID
void setClusterID(std::string)
Set the cluster id for the task.
Definition: WorkflowTask.cpp:359
wrench::WorkflowTask::getTerminationDate
double getTerminationDate() const
Get the tasks's most recent termination date (when it was explicitly requested to be terminated by th...
Definition: WorkflowTask.cpp:709
wrench::WorkflowTask::getID
const std::string & getID() const
Get the id of the task.
Definition: WorkflowTask.cpp:117
wrench::WorkflowTask::getComputationStartDate
double getComputationStartDate() const
Get the tasks's most recent computation start date.
Definition: WorkflowTask.cpp:653
wrench::WorkflowTask::getMaxNumCores
unsigned long getMaxNumCores() const
Get the maximum number of cores that the task can use.
Definition: WorkflowTask.cpp:144
wrench::WorkflowTask::setBytesWritten
void setBytesWritten(unsigned long)
Set the number of bytes written by the task.
Definition: WorkflowTask.cpp:423
wrench::WorkflowTask::getJob
Job * getJob() const
Get the task's containing job.
Definition: WorkflowTask.cpp:342
wrench::WorkflowTask::getTopLevel
unsigned long getTopLevel() const
Returns the task's top level (max number of hops on a reverse path up to an entry task....
Definition: WorkflowTask.cpp:740
wrench
Definition: Action.cpp:28
wrench::WorkflowTask::COMPLETED
@ COMPLETED
Completed (successfully completed)
Definition: WorkflowTask.h:80
wrench::WorkflowTask::stateToString
static std::string stateToString(WorkflowTask::State state)
Convert task state to a string (useful for output, debugging, logging, etc.)
Definition: WorkflowTask.cpp:246
wrench::WorkflowTask::setPriority
void setPriority(long)
Set the task priority.
Definition: WorkflowTask.cpp:375
wrench::WorkflowTask::getMinNumCores
unsigned long getMinNumCores() const
Get the minimum number of cores required for running the task.
Definition: WorkflowTask.cpp:135
wrench::WorkflowTask::getBytesWritten
unsigned long getBytesWritten() const
Get the number of bytes written by the task.
Definition: WorkflowTask.cpp:415
wrench::WorkflowTask::getFailureDate
double getFailureDate() const
Get the task's most recent failure date.
Definition: WorkflowTask.cpp:701
wrench::WorkflowTask::getParallelModel
std::shared_ptr< ParallelModel > getParallelModel() const
Get the task's parallel model.
Definition: WorkflowTask.cpp:834
wrench::WorkflowTask::setParallelModel
void setParallelModel(std::shared_ptr< ParallelModel > model)
Set the task's parallel model.
Definition: WorkflowTask.cpp:826
wrench::WorkflowTask
A computational task in a Workflow.
Definition: WorkflowTask.h:33
wrench::WorkflowTask::getPhysicalExecutionHost
std::string getPhysicalExecutionHost() const
Returns the name of the PHYSICAL host on which the task has most recently been executed,...
Definition: WorkflowTask.cpp:758
wrench::WorkflowTask::getNumberOfChildren
unsigned long getNumberOfChildren()
Get the number of children of a task.
Definition: WorkflowTask.cpp:162
wrench::WorkflowTask::getFailureCount
unsigned int getFailureCount()
Get the number of times a task has failed.
Definition: WorkflowTask.cpp:596
wrench::WorkflowTask::getMemoryRequirement
double getMemoryRequirement() const
Get the memory_manager_service requirement of the task.
Definition: WorkflowTask.cpp:153
wrench::WorkflowTask::getNumberOfParents
unsigned long getNumberOfParents()
Get the number of parents of a task.
Definition: WorkflowTask.cpp:180
wrench::WorkflowTask::setBytesRead
void setBytesRead(unsigned long)
Set the number of bytes read by the task.
Definition: WorkflowTask.cpp:407
wrench::WorkflowTask::getStartDate
double getStartDate() const
Get the task's most recent start date.
Definition: WorkflowTask.cpp:637
wrench::WorkflowTask::getWriteOutputStartDate
double getWriteOutputStartDate() const
Get the task's most recent write output start date.
Definition: WorkflowTask.cpp:685
wrench::WorkflowTask::getExecutionHost
std::string getExecutionHost() const
Returns the name of the host on which the task has most recently been executed, or "" if the task has...
Definition: WorkflowTask.cpp:749
wrench::WorkflowTask::getChildren
std::vector< std::shared_ptr< WorkflowTask > > getChildren()
Get the children of a task.
Definition: WorkflowTask.cpp:171
wrench::WorkflowTask::getReadInputStartDate
double getReadInputStartDate() const
Get the task's most recent read input start date.
Definition: WorkflowTask.cpp:669
wrench::WorkflowTask::getPriority
unsigned long getPriority() const
Get the task priority. By default, priority is 0.
Definition: WorkflowTask.cpp:367
wrench::WorkflowTask::getClusterID
std::string getClusterID() const
Get the cluster Id for the task.
Definition: WorkflowTask.cpp:350
wrench::WorkflowTask::getBytesRead
unsigned long getBytesRead() const
Get the number of bytes read by the task.
Definition: WorkflowTask.cpp:399
wrench::WorkflowTask::getParents
std::vector< std::shared_ptr< WorkflowTask > > getParents()
Get the parents of a task.
Definition: WorkflowTask.cpp:189
wrench::WorkflowTask::addOutputFile
void addOutputFile(std::shared_ptr< DataFile >file)
Add an output file to the task.
Definition: WorkflowTask.cpp:87
wrench::WorkflowTask::NOT_READY
@ NOT_READY
Not ready (parents have not completed)
Definition: WorkflowTask.h:74
wrench::WorkflowTask::getReadInputEndDate
double getReadInputEndDate() const
Get the task's most recent read input end date.
Definition: WorkflowTask.cpp:677
wrench::WorkflowTask::getWriteOutputEndDate
double getWriteOutputEndDate() const
Get the task's most recent write output end date.
Definition: WorkflowTask.cpp:693
wrench::WorkflowTask::setAverageCPU
void setAverageCPU(double)
Set the task average CPU usage.
Definition: WorkflowTask.cpp:391
wrench::WorkflowTask::getEndDate
double getEndDate() const
Get the task's most recent end date.
Definition: WorkflowTask.cpp:645
wrench::WorkflowTask::getNumCoresAllocated
unsigned long getNumCoresAllocated() const
Returns the number of cores allocated for this task's most recent execution or 0 if an execution atte...
Definition: WorkflowTask.cpp:766
wrench::WorkflowTask::getFlops
double getFlops() const
Get the number of flops of the task.
Definition: WorkflowTask.cpp:126
wrench::WorkflowTask::getColor
std::string getColor() const
Get the task's color ("" if none)
Definition: WorkflowTask.cpp:810
wrench::WorkflowTask::setColor
void setColor(std::string)
Set the task's color.
Definition: WorkflowTask.cpp:818
wrench::WorkflowTask::getComputationEndDate
double getComputationEndDate() const
Get the task's most recent computation end date.
Definition: WorkflowTask.cpp:661