WorkflowTask.h
1 
10 #ifndef WRENCH_WORKFLOWTASK_H
11 #define WRENCH_WORKFLOWTASK_H
12 
13 #include <map>
14 #include <stack>
15 #include <lemon/list_graph.h>
16 #include <set>
17 
18 #include "wrench/workflow/job/WorkflowJob.h"
19 #include "wrench/workflow/WorkflowFile.h"
20 
21 namespace wrench {
22 
26  class WorkflowTask {
27 
28  public:
29  std::string getID() const;
30 
31  double getFlops() const;
32 
33  unsigned long getMinNumCores() const;
34 
35  unsigned long getMaxNumCores() const;
36 
37  double getParallelEfficiency() const;
38 
39  double getMemoryRequirement() const;
40 
41  int getNumberOfChildren() const;
42 
43  int getNumberOfParents() const;
44 
45  void addInputFile(WorkflowFile *file);
46 
47  void addOutputFile(WorkflowFile *file);
48 
49  unsigned int getFailureCount();
50 
51  /***********************/
53  /***********************/
54 
56  enum TaskType {
57  COMPUTE,
58  AUXILIARY,
59  TRANSFER
60  };
61 
63  enum State {
74  };
75 
76  static std::string stateToString(WorkflowTask::State state);
77 
78  WorkflowJob *getJob() const;
79 
80  Workflow *getWorkflow() const;
81 
82  std::string getClusterID() const;
83 
84  void setClusterID(std::string);
85 
86  void setTaskType(TaskType);
87 
88  TaskType getTaskType() const;
89 
90  void setPriority(long);
91 
92  long getPriority() const;
93 
94  void setAverageCPU(double);
95 
96  double getAverageCPU() const;
97 
98  void setBytesRead(unsigned long);
99 
100  unsigned long getBytesRead() const;
101 
102  void setBytesWritten(unsigned long);
103 
104  unsigned long getBytesWritten() const;
105 
106  std::set<WorkflowFile *> getInputFiles();
107 
108  std::set<WorkflowFile *> getOutputFiles();
109 
110  unsigned long getTopLevel();
111 
112  double getStartDate();
113 
114  double getEndDate();
115 
116  double getFailureDate();
117 
118  double getTerminationDate();
119 
120  double getReadInputStartDate();
121 
122  double getReadInputEndDate();
123 
124  double getComputationStartDate();
125 
126  double getComputationEndDate();
127 
128  double getWriteOutputStartDate();
129 
130  double getWriteOutputEndDate();
131 
132  unsigned long getNumCoresAllocated();
133 
134  struct WorkflowTaskExecution;
135 
136  std::stack<WorkflowTaskExecution> getExecutionHistory();
137 
138  std::string getExecutionHost();
139 
141 
142  void addSrcDest(WorkflowFile *, const std::string &, const std::string &);
143 
144  std::map<WorkflowFile *, std::pair<std::string, std::string>> getFileTransfers() const;
145 
146  /***********************/
148  /***********************/
149 
150 
151  /***********************/
153  /***********************/
154 
156  enum InternalState {
157  TASK_NOT_READY,
158  TASK_READY,
159  TASK_RUNNING,
160  TASK_COMPLETED,
161  TASK_FAILED
162  };
163 
164  static std::string stateToString(WorkflowTask::InternalState state);
165 
166  unsigned long updateTopLevel();
167 
168  void setInternalState(WorkflowTask::InternalState);
169 
170  void setState(WorkflowTask::State);
171 
172  void setUpcomingState(WorkflowTask::State);
173 
174  WorkflowTask::State getUpcomingState() const;
175 
176  WorkflowTask::InternalState getInternalState() const;
177 
178  void setJob(WorkflowJob *job);
179 
180  void setStartDate(double date);
181 
182  void setEndDate(double date);
183 
184  void setReadInputStartDate(double date);
185 
186  void setReadInputEndDate(double date);
187 
188  void setComputationStartDate(double date);
189 
190  void setComputationEndDate(double date);
191 
192  void setWriteOutputStartDate(double date);
193 
194  void setWriteOutputEndDate(double date);
195 
196  void setFailureDate(double date);
197 
198  void setTerminationDate(double date);
199 
200  void incrementFailureCount();
201 
202  void setExecutionHost(std::string hostname);
203 
204  void setNumCoresAllocated(unsigned long num_cores);
205 
206  struct WorkflowTaskExecution {
207  double task_start = -1.0;
208  double read_input_start = -1.0;
209  double read_input_end = -1.0;
210  double computation_start = -1.0;
211  double computation_end = -1.0;
212  double write_output_start = -1.0;
213  double write_output_end = -1.0;
214  double task_end = -1.0;
215  double task_failed = -1.0;
216  double task_terminated = -1.0;
217 
218  std::string execution_host = "";
219  unsigned long num_cores_allocated = 0;
220 
221  WorkflowTaskExecution(double task_start) : task_start(task_start) {
222 
223  }
224  };
225 
226  /***********************/
228  /***********************/
229 
230  private:
231  friend class Workflow;
232 
233  std::string id; // Task ID
234  std::string cluster_id; // ID for clustered task
235  TaskType task_type; // Task type
236  double flops; // Number of flops
237  double average_cpu = -1; // Average CPU utilization
238  unsigned long bytes_read = -1; // Total bytes read in KB
239  unsigned long bytes_written = -1; // Total bytes written in KB
240  unsigned long min_num_cores;
241  unsigned long max_num_cores;
242  double parallel_efficiency;
243  double memory_requirement;
244  long priority = 0; // Task priority
245  unsigned long toplevel; // 0 if entry task
246  unsigned int failure_count = 0; // Number of times the tasks has failed
247  std::string execution_host; // Host on which the task executed ("" if not executed successfully - yet)
248  State visible_state; // To be exposed to developer level
249  State upcoming_visible_state; // A visible state that will become active once a WMS has process a previously sent workflow execution event
250  InternalState internal_state; // Not to be exposed to developer level
251 
252  Workflow *workflow; // Containing workflow
253  lemon::ListDigraph *DAG; // Containing workflow
254  lemon::ListDigraph::Node DAG_node; // pointer to the underlying DAG node
255  std::map<std::string, WorkflowFile *> output_files; // List of output files
256  std::map<std::string, WorkflowFile *> input_files; // List of input files
257  std::map<WorkflowFile *, std::pair<std::string, std::string>> fileTransfers; // Map of transfer files and hosts
258 
259  // Private constructor (called by Workflow)
260  WorkflowTask(std::string id,
261  double t,
262  unsigned long min_num_cores,
263  unsigned long max_num_cores,
264  double parallel_efficiency,
265  double memory_requirement,
266  TaskType type);
267 
268  // Containing job
269  WorkflowJob *job;
270 
271  // Private helper function
272  void addFileToMap(std::map<std::string, WorkflowFile *> &map_to_insert,
273  std::map<std::string, WorkflowFile *> &map_to_check,
274  WorkflowFile *f);
275 
276  std::stack<WorkflowTaskExecution> execution_history;
277  };
278 };
279 
280 #endif //WRENCH_WORKFLOWTASK_H
int getNumberOfChildren() const
Get the number of children of a task.
Definition: WorkflowTask.cpp:145
void setBytesRead(unsigned long)
Set the amount of bytes read by the task.
Definition: WorkflowTask.cpp:412
Workflow * getWorkflow() const
Get the workflow that contains the task.
Definition: WorkflowTask.cpp:245
Some Unknown state (should not happen)
Definition: WorkflowTask.h:73
void setTaskType(TaskType)
Set the task type.
Definition: WorkflowTask.cpp:364
double getComputationEndDate()
Get the task&#39;s most recent computation end date.
Definition: WorkflowTask.cpp:674
TaskType getTaskType() const
Get the workflow task type.
Definition: WorkflowTask.cpp:356
Pending (has been submitted to a compute service)
Definition: WorkflowTask.h:69
double getMemoryRequirement() const
Get the memory requirement of the task.
Definition: WorkflowTask.cpp:135
std::string getClusterID() const
Get the cluster Id for the task.
Definition: WorkflowTask.cpp:339
unsigned long getMaxNumCores() const
Get the maximum number of cores that the task can use.
Definition: WorkflowTask.cpp:117
Not ready (parents have not completed)
Definition: WorkflowTask.h:65
Completed (successfully completed)
Definition: WorkflowTask.h:71
void addOutputFile(WorkflowFile *file)
Add an output file to the task.
Definition: WorkflowTask.cpp:72
unsigned long getMinNumCores() const
Get the minimum number of cores required for running the task.
Definition: WorkflowTask.cpp:108
A computational task in a Workflow.
Definition: WorkflowTask.h:26
double getReadInputStartDate()
Get the task&#39;s most recent read input start date.
Definition: WorkflowTask.cpp:682
unsigned int getFailureCount()
Get the number of times a task has failed.
Definition: WorkflowTask.cpp:609
void setAverageCPU(double)
Set the task average CPU usage.
Definition: WorkflowTask.cpp:396
std::map< WorkflowFile *, std::pair< std::string, std::string > > getFileTransfers() const
Get a map of src and dst hosts for file transfers (only available for WorkflowTask::TaskType::TRANSFE...
Definition: WorkflowTask.cpp:806
Abstraction of a job used for executing tasks in a Workflow.
Definition: WorkflowJob.h:31
double getWriteOutputEndDate()
Get the task&#39;s most recent write output end date.
Definition: WorkflowTask.cpp:706
A data file used/produced by a WorkflowTask in a Workflow.
Definition: WorkflowFile.h:26
A workflow (to be executed by a WMS)
Definition: Workflow.h:30
void addSrcDest(WorkflowFile *, const std::string &, const std::string &)
Set a pair of src and dest hosts for transfers (it is only meaningful for WorkflowTask::TaskType::TRA...
Definition: WorkflowTask.cpp:818
long getPriority() const
Get the task priority. By default, priority is 0.
Definition: WorkflowTask.cpp:372
WorkflowTask::State getState() const
Get the state of the task.
Definition: WorkflowTask.cpp:171
double getParallelEfficiency() const
Get the parallel efficiency of the task.
Definition: WorkflowTask.cpp:126
unsigned long getNumCoresAllocated()
Returns the number of cores allocated for this task&#39;s most recent execution or 0 if an execution atte...
Definition: WorkflowTask.cpp:770
double getFailureDate()
Get the task&#39;s most recent failure date.
Definition: WorkflowTask.cpp:714
WorkflowJob * getJob() const
Get the task&#39;s containing job.
Definition: WorkflowTask.cpp:331
unsigned long getTopLevel()
Returns the task&#39;s top level (max number of hops on a reverse path up to an entry task...
Definition: WorkflowTask.cpp:753
double getFlops() const
Get the number of flops of the task.
Definition: WorkflowTask.cpp:99
int getNumberOfParents() const
Get the number of parents of a task.
Definition: WorkflowTask.cpp:158
Ready (parents have completed)
Definition: WorkflowTask.h:67
double getEndDate()
Get the task&#39;s most recent end date.
Definition: WorkflowTask.cpp:658
std::string getID() const
Get the id of the task.
Definition: WorkflowTask.cpp:90
unsigned long getBytesWritten() const
Set the amount of bytes written by the task.
Definition: WorkflowTask.cpp:420
double getComputationStartDate()
Get the tasks&#39;s most recent computation start date.
Definition: WorkflowTask.cpp:666
State
Task states.
Definition: WorkflowTask.h:63
unsigned long getBytesRead() const
Get the amount of bytes read by the task.
Definition: WorkflowTask.cpp:404
std::set< WorkflowFile * > getInputFiles()
Get the set of input WorkflowFile objects for the task.
Definition: WorkflowTask.cpp:624
void setClusterID(std::string)
Set the cluster id for the task.
Definition: WorkflowTask.cpp:348
void addInputFile(WorkflowFile *file)
Add an input file to the task.
Definition: WorkflowTask.cpp:54
double getReadInputEndDate()
Get the task&#39;s most recent read input end date.
Definition: WorkflowTask.cpp:690
TaskType
Task types.
Definition: WorkflowTask.h:56
void setBytesWritten(unsigned long)
Set the amount of bytes written by the task.
Definition: WorkflowTask.cpp:428
double getAverageCPU() const
Get the task average CPU usage.
Definition: WorkflowTask.cpp:388
static std::string stateToString(WorkflowTask::State state)
Convert task state to a string (useful for output, debugging, logging, etc.)
Definition: WorkflowTask.cpp:199
double getWriteOutputStartDate()
Get the task&#39;s most recent write output start date.
Definition: WorkflowTask.cpp:698
std::stack< WorkflowTaskExecution > getExecutionHistory()
Get the execution history of this task.
Definition: WorkflowTask.cpp:577
Definition: TerminalOutput.cpp:15
double getTerminationDate()
Get the tasks&#39;s most recent termination date (when it was explicitly requested to be terminated by th...
Definition: WorkflowTask.cpp:722
double getStartDate()
Get the task&#39;s most recent start date.
Definition: WorkflowTask.cpp:650
void setPriority(long)
Set the task priority.
Definition: WorkflowTask.cpp:380
std::set< WorkflowFile * > getOutputFiles()
Get the set of output WorkflowFile objects for the task.
Definition: WorkflowTask.cpp:637
std::string getExecutionHost()
Returns the name of the host on which the task has most recently been executed, or "" if the task has...
Definition: WorkflowTask.cpp:762