StandardJobExecutor.h
1 
10 #ifndef WRENCH_MULTINODEMULTICORESTANDARDJOBEXECUTOR_H
11 #define WRENCH_MULTINODEMULTICORESTANDARDJOBEXECUTOR_H
12 
13 
14 #include <queue>
15 #include <set>
16 
17 #include "wrench/services/compute/ComputeService.h"
18 #include "wrench/services/compute/workunit_executor/WorkunitExecutor.h"
19 #include "wrench/services/compute/standard_job_executor/StandardJobExecutorProperty.h"
20 #include "wrench/services/compute/standard_job_executor/StandardJobExecutorMessagePayload.h"
21 #include "wrench/services/compute/workunit_executor/Workunit.h"
22 
23 
24 namespace wrench {
25 
26  class Simulation;
27 
28  class StorageService;
29 
30  class FailureCause;
31 
32  /***********************/
34  /***********************/
35 
44  class StandardJobExecutor : public Service {
45 
46  public:
47 
49 
50  // Public Constructor
53  std::string callback_mailbox,
54  std::string hostname,
55  StandardJob *job,
56  std::map<std::string, std::tuple<unsigned long, double>> compute_resources,
57  StorageService* scratch_space,
58  bool part_of_pilot_job,
59  PilotJob* parent_pilot_job,
60  std::map<std::string, std::string> property_list,
61  std::map<std::string, std::string> messagepayload_list
62  );
63 
64  void kill();
65 
67  std::map<std::string, std::tuple<unsigned long, double>> getComputeResources();
68 
69  // Get the set of files stored in scratch space by a standardjob job
70  std::set<WorkflowFile*> getFilesInScratch();
71 
72  private:
73 
74  friend class Simulation;
75  void cleanup() override;
76 
77  std::string callback_mailbox;
78  StandardJob *job;
79  std::map<std::string, std::tuple<unsigned long, double>> compute_resources;
80  int total_num_cores;
81  double total_ram;
82  StorageService *scratch_space;
83 
84  bool part_of_pilot_job;
85 
86  // if this is not a part of pilot job, then this value will be nullptr
87  PilotJob* parent_pilot_job;
88 
89  // Files stored in scratch
90  std::set<WorkflowFile*> files_stored_in_scratch;
91 
92  // Core availabilities (for each hosts, how many cores are currently available on it)
93  std::map<std::string, unsigned long> core_availabilities;
94  // RAM availabilities (for each host, host many bytes of RAM are currently available on it)
95  std::map<std::string, double> ram_availabilities;
96 
97  // Sets of workunit executors
98  std::set<std::shared_ptr<WorkunitExecutor>> running_workunit_executors;
99  std::set<std::shared_ptr<WorkunitExecutor>> finished_workunit_executors;
100  std::set<std::shared_ptr<WorkunitExecutor>> failed_workunit_executors;
101 
102  // Work units
103  std::set<std::unique_ptr<Workunit>> non_ready_workunits;
104  std::set<std::unique_ptr<Workunit>> ready_workunits;
105  std::set<std::unique_ptr<Workunit>> running_workunits;
106  std::set<std::unique_ptr<Workunit>> completed_workunits;
107 
108  // Property list
109  std::map<std::string, std::string> property_list;
110 
111  std::map<std::string, std::string> default_property_values = {
117  };
118 
119  std::map<std::string, std::string> default_messagepayload_values = {
122  };
123 
124  int main() override;
125 
126 // void setProperty(std::string property, std::string value);
127 // std::string getPropertyValueAsString(std::string property);
128 // double getPropertyValueAsDouble(std::string property);
129 
130  void processWorkunitExecutorCompletion(WorkunitExecutor *workunit_executor,
131  Workunit *workunit);
132 
133  void processWorkunitExecutorFailure(WorkunitExecutor *workunit_executor,
134  Workunit *workunit,
135  std::shared_ptr<FailureCause> cause);
136 
137  bool processNextMessage();
138 
139  unsigned long computeWorkUnitMinNumCores(Workunit *wu);
140  unsigned long computeWorkUnitDesiredNumCores(Workunit *wu);
141  double computeWorkUnitMinMemory(Workunit *wu);
142 
143  void dispatchReadyWorkunits();
144 
145  void createWorkunits();
146 
147  std::vector<Workunit*> sortReadyWorkunits();
148 
149  //Clean up scratch
150  void cleanUpScratch();
151 
152  //Store the list of files available in scratch
153  void StoreListOfFilesInScratch();
154 
155 
156  };
157 
158  /***********************/
160  /***********************/
161 };
162 
163 
164 #endif //WRENCH_MULTINODEMULTICORESTANDARDJOBEXECUTOR_H
static const std::string STANDARD_JOB_DONE_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the executor to state that it has completed a job...
Definition: StandardJobExecutorMessagePayload.h:30
static const std::string HOST_SELECTION_ALGORITHM
The algorithm that decides on which host a task should be placed. Possible values are: ...
Definition: StandardJobExecutorProperty.h:55
~StandardJobExecutor()
Destructor.
Definition: StandardJobExecutor.cpp:36
A service that knows how to execute a standard job on a multi-node multi-core platform. Note that when killed in the middle of computing, this service will set (internal) running tasks&#39; states to FAILED, and likely the calling service will want to make failed tasks READY and NOT_READY again to "unwind" the failed executions and resubmit tasks for execution. Also, this service does not increment task failure counts, as it does not know if the kill() was an actual failure (i.e., some timeout) or a feature (i.e., a WMS changing its mind)
Definition: StandardJobExecutor.h:44
StandardJob * getJob()
Get the executor&#39;s job.
Definition: StandardJobExecutor.cpp:879
A service that can be added to the simulation and that can be used by a WMS when executing a workflow...
Definition: Service.h:26
StandardJobExecutor(Simulation *simulation, std::string callback_mailbox, std::string hostname, StandardJob *job, std::map< std::string, std::tuple< unsigned long, double >> compute_resources, StorageService *scratch_space, bool part_of_pilot_job, PilotJob *parent_pilot_job, std::map< std::string, std::string > property_list, std::map< std::string, std::string > messagepayload_list)
Constructor.
Definition: StandardJobExecutor.cpp:59
static const std::string THREAD_STARTUP_OVERHEAD
The number of seconds to start a thread (default = 0)
Definition: StandardJobExecutorProperty.h:30
A class to describe a unit of work that&#39;s a sub-component of a StandardJob.
Definition: Workunit.h:34
static const std::string SIMULATE_COMPUTATION_AS_SLEEP
Simulate computation as just a sleep instead of with an actual compute thread. This is for scalabilit...
Definition: StandardJobExecutorProperty.h:61
std::string hostname
The name of the host on which the daemon is running.
Definition: S4U_Daemon.h:47
A pilot (i.e., non-standard) workflow job that can be submitted to a ComputeService by a WMS (via a J...
Definition: PilotJob.h:29
A standard (i.e., non-pilot) workflow job that can be submitted to a ComputeService by a WMS (via a J...
Definition: StandardJob.h:38
A class that provides basic simulation methods.
Definition: Simulation.h:34
static const std::string TASK_SELECTION_ALGORITHM
The algorithm that decides which ready computational task, in case multiple tasks are ready...
Definition: StandardJobExecutorProperty.h:49
void kill()
Kill the executor.
Definition: StandardJobExecutor.cpp:233
static const std::string CORE_ALLOCATION_ALGORITHM
The algorithm that decides how many cores are given to a computational task. Possible values are: ...
Definition: StandardJobExecutorProperty.h:41
std::set< WorkflowFile * > getFilesInScratch()
Get the set of files stored in scratch space during the standard job&#39;s execution. ...
Definition: StandardJobExecutor.cpp:870
std::map< std::string, std::string > messagepayload_list
The service&#39;s messagepayload list.
Definition: Service.h:107
Simulation * simulation
a pointer to the simulation object
Definition: S4U_Daemon.h:84
std::map< std::string, std::tuple< unsigned long, double > > getComputeResources()
Get the executor&#39;s compute resources.
Definition: StandardJobExecutor.cpp:887
static const std::string STANDARD_JOB_FAILED_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the executor to state that a job has failed...
Definition: StandardJobExecutorMessagePayload.h:32
The storage service base class.
Definition: StorageService.h:35
An service that performs a WorkUnit.
Definition: WorkunitExecutor.h:38
Definition: TerminalOutput.cpp:15