WorkflowTask.h
1 
10 #ifndef WRENCH_WORKFLOWTASK_H
11 #define WRENCH_WORKFLOWTASK_H
12 
13 #include <map>
14 #include <stack>
15 #include <set>
16 
17 #include "wrench/workflow/job/WorkflowJob.h"
18 #include "wrench/workflow/WorkflowFile.h"
19 
20 #include <boost/graph/adjacency_list.hpp>
21 
22 namespace wrench {
23 
27  class WorkflowTask {
28 
29  public:
30  std::string getID() const;
31 
32  double getFlops() const;
33 
34  unsigned long getMinNumCores() const;
35 
36  unsigned long getMaxNumCores() const;
37 
38  double getParallelEfficiency() const;
39 
40  double getMemoryRequirement() const;
41 
42  unsigned long getNumberOfChildren() const;
43 
44  std::vector<WorkflowTask *> getChildren() const;
45 
46  unsigned long getNumberOfParents() const;
47 
48  std::vector<WorkflowTask *> getParents() const;
49 
50  void addInputFile(WorkflowFile *file);
51 
52  void addOutputFile(WorkflowFile *file);
53 
54  unsigned int getFailureCount();
55 
56 
57  /***********************/
59  /***********************/
60 
62  enum State {
64  NOT_READY,
66  READY,
68  PENDING,
70  COMPLETED,
72  UNKNOWN
73  };
74 
75  static std::string stateToString(WorkflowTask::State state);
76 
77  WorkflowJob *getJob() const;
78 
79  Workflow *getWorkflow() const;
80 
81  std::string getClusterID() const;
82 
83  void setClusterID(std::string);
84 
85  void setPriority(long);
86 
87  unsigned long getPriority() const;
88 
89  void setAverageCPU(double);
90 
91  double getAverageCPU() const;
92 
93  void setBytesRead(unsigned long);
94 
95  unsigned long getBytesRead() const;
96 
97  void setBytesWritten(unsigned long);
98 
99  unsigned long getBytesWritten() const;
100 
101  std::vector<WorkflowFile *> getInputFiles();
102 
103  std::vector<WorkflowFile *> getOutputFiles();
104 
105  unsigned long getTopLevel();
106 
107  double getStartDate();
108 
109  double getEndDate();
110 
111  double getFailureDate();
112 
113  double getTerminationDate();
114 
115  double getReadInputStartDate();
116 
117  double getReadInputEndDate();
118 
119  double getComputationStartDate();
120 
121  double getComputationEndDate();
122 
123  double getWriteOutputStartDate();
124 
125  double getWriteOutputEndDate();
126 
127  unsigned long getNumCoresAllocated();
128 
129  struct WorkflowTaskExecution;
130 
131  std::stack<WorkflowTaskExecution> getExecutionHistory();
132 
133  std::string getExecutionHost();
134 
135  WorkflowTask::State getState() const;
136 
137  std::string getColor();
138 
139  void setColor(std::string);
140 
141  /***********************/
143  /***********************/
144 
145 
146  /***********************/
148  /***********************/
149 
151  enum InternalState {
152  TASK_NOT_READY,
153  TASK_READY,
154  TASK_RUNNING,
155  TASK_COMPLETED,
156  TASK_FAILED
157  };
158 
159  static std::string stateToString(WorkflowTask::InternalState state);
160 
161  unsigned long updateTopLevel();
162 
163  void setInternalState(WorkflowTask::InternalState);
164 
165  void setState(WorkflowTask::State);
166 
167  void setUpcomingState(WorkflowTask::State);
168 
169  WorkflowTask::State getUpcomingState() const;
170 
171  WorkflowTask::InternalState getInternalState() const;
172 
173  void setJob(WorkflowJob *job);
174 
175  void setStartDate(double date);
176 
177  void setEndDate(double date);
178 
179  void setReadInputStartDate(double date);
180 
181  void setReadInputEndDate(double date);
182 
183  void setComputationStartDate(double date);
184 
185  void setComputationEndDate(double date);
186 
187  void setWriteOutputStartDate(double date);
188 
189  void setWriteOutputEndDate(double date);
190 
191  void setFailureDate(double date);
192 
193  void setTerminationDate(double date);
194 
195  void incrementFailureCount();
196 
197  void setExecutionHost(std::string hostname);
198 
199  void setNumCoresAllocated(unsigned long num_cores);
200 
204  struct WorkflowTaskExecution {
206  double task_start = -1.0;
208  double read_input_start = -1.0;
210  double read_input_end = -1.0;
212  double computation_start = -1.0;
214  double computation_end = -1.0;
216  double write_output_start = -1.0;
218  double write_output_end = -1.0;
220  double task_end = -1.0;
222  double task_failed = -1.0;
224  double task_terminated = -1.0;
225 
227  std::string execution_host = "";
229  unsigned long num_cores_allocated = 0;
230 
236  WorkflowTaskExecution(double task_start) : task_start(task_start) {}
237 
238  };
239 
240  /***********************/
242  /***********************/
243 
244  private:
245  friend class Workflow;
246 
247  std::string id; // Task ID
248  std::string cluster_id; // ID for clustered task
249  std::string color; // A RGB color formatted as "#rrggbb"
250  double flops; // Number of flops
251  double average_cpu = -1; // Average CPU utilization
252  unsigned long bytes_read = -1; // Total bytes read in KB
253  unsigned long bytes_written = -1; // Total bytes written in KB
254  unsigned long min_num_cores;
255  unsigned long max_num_cores;
256  double parallel_efficiency;
257  double memory_requirement;
258  unsigned long priority = 0; // Task priority
259  unsigned long toplevel; // 0 if entry task
260  unsigned int failure_count = 0; // Number of times the tasks has failed
261  std::string execution_host; // Host on which the task executed ("" if not executed successfully - yet)
262  State visible_state; // To be exposed to developer level
263  State upcoming_visible_state; // A visible state that will become active once a WMS has process a previously sent workflow execution event
264  InternalState internal_state; // Not to be exposed to developer level
265 
266  Workflow *workflow; // Containing workflow
267 
268  std::map<std::string, WorkflowFile *> output_files; // List of output files
269  std::map<std::string, WorkflowFile *> input_files; // List of input files
270 
271  // Private constructor (called by Workflow)
272  WorkflowTask(std::string id,
273  double t,
274  unsigned long min_num_cores,
275  unsigned long max_num_cores,
276  double parallel_efficiency,
277  double memory_requirement);
278 
279  // Containing job
280  WorkflowJob *job;
281 
282  std::stack<WorkflowTaskExecution> execution_history;
283 
284  friend class DagOfTasks;
285  };
286 };
287 
288 #endif //WRENCH_WORKFLOWTASK_H
std::string getID() const
Get the id of the task.
Definition: WorkflowTask.cpp:115
WorkflowJob * getJob()
Get the executor's job.
Definition: StandardJobExecutor.cpp:1015
std::vector< WorkflowTask * > getChildren() const
Get the children of a task.
Definition: WorkflowTask.cpp:178
void addInputFile(WorkflowFile *file)
Add an input file to the task.
Definition: WorkflowTask.cpp:55
unsigned long getMaxNumCores() const
Get the maximum number of cores that the task can use.
Definition: WorkflowTask.cpp:142
unsigned long getNumberOfChildren() const
Get the number of children of a task.
Definition: WorkflowTask.cpp:169
double getParallelEfficiency() const
Get the parallel efficiency of the task.
Definition: WorkflowTask.cpp:151
Definition: Alarm.cpp:20
A workflow (to be executed by a WMS)
Definition: Workflow.h:33
unsigned long getMinNumCores() const
Get the minimum number of cores required for running the task.
Definition: WorkflowTask.cpp:133
A computational task in a Workflow.
Definition: WorkflowTask.h:27
unsigned int getFailureCount()
Get the number of times a task has failed.
Definition: WorkflowTask.cpp:603
double getMemoryRequirement() const
Get the memory requirement of the task.
Definition: WorkflowTask.cpp:160
std::vector< WorkflowTask * > getParents() const
Get the parents of a task.
Definition: WorkflowTask.cpp:196
void addOutputFile(WorkflowFile *file)
Add an output file to the task.
Definition: WorkflowTask.cpp:85
unsigned long getNumberOfParents() const
Get the number of parents of a task.
Definition: WorkflowTask.cpp:187
A data file used/produced by a WorkflowTask in a Workflow.
Definition: WorkflowFile.h:26
double getFlops() const
Get the number of flops of the task.
Definition: WorkflowTask.cpp:124