WorkflowTask.h
1 
10 #ifndef WRENCH_WORKFLOWTASK_H
11 #define WRENCH_WORKFLOWTASK_H
12 
13 #include <map>
14 #include <stack>
15 #include <set>
16 
17 #include "wrench/workflow/job/WorkflowJob.h"
18 #include "wrench/workflow/WorkflowFile.h"
19 #include "wrench/workflow/parallel_model/ParallelModel.h"
20 #include "wrench/workflow/parallel_model/AmdahlParallelModel.h"
21 #include "wrench/workflow/parallel_model/ConstantEfficiencyParallelModel.h"
22 #include "wrench/workflow/parallel_model/CustomParallelModel.h"
23 
24 #include <boost/graph/adjacency_list.hpp>
25 
26 namespace wrench {
27 
31  class WorkflowTask {
32 
33  public:
34  std::string getID() const;
35 
36  double getFlops() const;
37 
38  unsigned long getMinNumCores() const;
39 
40  unsigned long getMaxNumCores() const;
41 
42  std::shared_ptr<ParallelModel> getParallelModel();
43 
44  void setParallelModel(std::shared_ptr<ParallelModel> model);
45 
46  double getMemoryRequirement() const;
47 
48  unsigned long getNumberOfChildren() const;
49 
50  std::vector<WorkflowTask *> getChildren() const;
51 
52  unsigned long getNumberOfParents() const;
53 
54  std::vector<WorkflowTask *> getParents() const;
55 
56  void addInputFile(WorkflowFile *file);
57 
58  void addOutputFile(WorkflowFile *file);
59 
60  unsigned int getFailureCount();
61 
62 
63  /***********************/
65  /***********************/
66 
68  enum State {
70  NOT_READY,
72  READY,
74  PENDING,
76  COMPLETED,
78  UNKNOWN
79  };
80 
81  static std::string stateToString(WorkflowTask::State state);
82 
83  WorkflowJob *getJob() const;
84 
85  Workflow *getWorkflow() const;
86 
87  std::string getClusterID() const;
88 
89  void setClusterID(std::string);
90 
91  void setPriority(long);
92 
93  unsigned long getPriority() const;
94 
95  void setAverageCPU(double);
96 
97  double getAverageCPU() const;
98 
99  void setBytesRead(unsigned long);
100 
101  unsigned long getBytesRead() const;
102 
103  void setBytesWritten(unsigned long);
104 
105  unsigned long getBytesWritten() const;
106 
107  std::vector<WorkflowFile *> getInputFiles();
108 
109  std::vector<WorkflowFile *> getOutputFiles();
110 
111  unsigned long getTopLevel();
112 
113  double getStartDate();
114 
115  double getEndDate();
116 
117  double getFailureDate();
118 
119  double getTerminationDate();
120 
121  double getReadInputStartDate();
122 
123  double getReadInputEndDate();
124 
125  double getComputationStartDate();
126 
127  double getComputationEndDate();
128 
129  double getWriteOutputStartDate();
130 
131  double getWriteOutputEndDate();
132 
133  unsigned long getNumCoresAllocated();
134 
135  struct WorkflowTaskExecution;
136 
137  std::stack<WorkflowTaskExecution> getExecutionHistory();
138 
139  std::string getExecutionHost();
140 
141  WorkflowTask::State getState() const;
142 
143  std::string getColor();
144 
145  void setColor(std::string);
146 
147  /***********************/
149  /***********************/
150 
151 
152  /***********************/
154  /***********************/
155 
157  enum InternalState {
158  TASK_NOT_READY,
159  TASK_READY,
160  TASK_RUNNING,
161  TASK_COMPLETED,
162  TASK_FAILED
163  };
164 
165  static std::string stateToString(WorkflowTask::InternalState state);
166 
167  unsigned long updateTopLevel();
168 
169  void setInternalState(WorkflowTask::InternalState);
170 
171  void setState(WorkflowTask::State);
172 
173  void setUpcomingState(WorkflowTask::State);
174 
175  WorkflowTask::State getUpcomingState() const;
176 
177  WorkflowTask::InternalState getInternalState() const;
178 
179  void setJob(WorkflowJob *job);
180 
181  void setStartDate(double date);
182 
183  void setEndDate(double date);
184 
185  void setReadInputStartDate(double date);
186 
187  void setReadInputEndDate(double date);
188 
189  void setComputationStartDate(double date);
190 
191  void setComputationEndDate(double date);
192 
193  void setWriteOutputStartDate(double date);
194 
195  void setWriteOutputEndDate(double date);
196 
197  void setFailureDate(double date);
198 
199  void setTerminationDate(double date);
200 
201  void incrementFailureCount();
202 
203  void setExecutionHost(std::string hostname);
204 
205  void setNumCoresAllocated(unsigned long num_cores);
206 
210  struct WorkflowTaskExecution {
212  double task_start = -1.0;
214  double read_input_start = -1.0;
216  double read_input_end = -1.0;
218  double computation_start = -1.0;
220  double computation_end = -1.0;
222  double write_output_start = -1.0;
224  double write_output_end = -1.0;
226  double task_end = -1.0;
228  double task_failed = -1.0;
230  double task_terminated = -1.0;
231 
233  std::string execution_host = "";
235  unsigned long num_cores_allocated = 0;
236 
242  WorkflowTaskExecution(double task_start) : task_start(task_start) {}
243 
244 
245  };
246 
247 
248  /***********************/
250  /***********************/
251 
252  private:
253  friend class Workflow;
254 
255  std::string id; // Task ID
256  std::string cluster_id; // ID for clustered task
257  std::string color; // A RGB color formatted as "#rrggbb"
258  double flops; // Number of flops
259  double average_cpu = -1; // Average CPU utilization
260  unsigned long bytes_read = -1; // Total bytes read in KB
261  unsigned long bytes_written = -1; // Total bytes written in KB
262  unsigned long min_num_cores;
263  unsigned long max_num_cores;
264  std::shared_ptr<ParallelModel> parallel_model;
265  double memory_requirement;
266  unsigned long priority = 0; // Task priority
267  unsigned long toplevel; // 0 if entry task
268  unsigned int failure_count = 0; // Number of times the tasks has failed
269  std::string execution_host; // Host on which the task executed ("" if not executed successfully - yet)
270  State visible_state; // To be exposed to developer level
271  State upcoming_visible_state; // A visible state that will become active once a WMS has process a previously sent workflow execution event
272  InternalState internal_state; // Not to be exposed to developer level
273 
274  Workflow *workflow; // Containing workflow
275 
276  std::map<std::string, WorkflowFile *> output_files; // List of output files
277  std::map<std::string, WorkflowFile *> input_files; // List of input files
278 
279  // Private constructor (called by Workflow)
280  WorkflowTask(std::string id,
281  double t,
282  unsigned long min_num_cores,
283  unsigned long max_num_cores,
284  double memory_requirement);
285 
286  // Containing job
287  WorkflowJob *job;
288 
289  std::stack<WorkflowTaskExecution> execution_history;
290 
291  friend class DagOfTasks;
292  };
293 };
294 
295 #endif //WRENCH_WORKFLOWTASK_H
std::shared_ptr< ParallelModel > getParallelModel()
Get the task's parallel model.
Definition: WorkflowTask.cpp:819
std::string getID() const
Get the id of the task.
Definition: WorkflowTask.cpp:117
std::vector< WorkflowTask * > getChildren() const
Get the children of a task.
Definition: WorkflowTask.cpp:171
void addInputFile(WorkflowFile *file)
Add an input file to the task.
Definition: WorkflowTask.cpp:57
unsigned long getMaxNumCores() const
Get the maximum number of cores that the task can use.
Definition: WorkflowTask.cpp:144
unsigned long getNumberOfChildren() const
Get the number of children of a task.
Definition: WorkflowTask.cpp:162
std::shared_ptr< WorkflowJob > getJob()
Get the executor's job.
Definition: StandardJobExecutor.cpp:1015
Definition: Alarm.cpp:20
A workflow (to be executed by a WMS)
Definition: Workflow.h:34
unsigned long getMinNumCores() const
Get the minimum number of cores required for running the task.
Definition: WorkflowTask.cpp:135
void setParallelModel(std::shared_ptr< ParallelModel > model)
Set the task's parallel model.
Definition: WorkflowTask.cpp:811
A computational task in a Workflow.
Definition: WorkflowTask.h:31
unsigned int getFailureCount()
Get the number of times a task has failed.
Definition: WorkflowTask.cpp:594
double getMemoryRequirement() const
Get the memory_manager_service requirement of the task.
Definition: WorkflowTask.cpp:153
std::vector< WorkflowTask * > getParents() const
Get the parents of a task.
Definition: WorkflowTask.cpp:189
void addOutputFile(WorkflowFile *file)
Add an output file to the task.
Definition: WorkflowTask.cpp:87
unsigned long getNumberOfParents() const
Get the number of parents of a task.
Definition: WorkflowTask.cpp:180
A data file used/produced by a WorkflowTask in a Workflow.
Definition: WorkflowFile.h:26
double getFlops() const
Get the number of flops of the task.
Definition: WorkflowTask.cpp:126