StandardJobExecutor.h
1 
10 #ifndef WRENCH_MULTINODEMULTICORESTANDARDJOBEXECUTOR_H
11 #define WRENCH_MULTINODEMULTICORESTANDARDJOBEXECUTOR_H
12 
13 
14 #include <queue>
15 #include <set>
16 
17 #include "wrench/services/compute/ComputeService.h"
18 #include "wrench/services/compute/workunit_executor/WorkunitExecutor.h"
19 #include "wrench/services/compute/standard_job_executor/StandardJobExecutorProperty.h"
20 #include "wrench/services/compute/standard_job_executor/StandardJobExecutorMessagePayload.h"
21 #include "wrench/services/compute/workunit_executor/Workunit.h"
22 #include "wrench/services/helpers/HostStateChangeDetector.h"
23 
24 
25 namespace wrench {
26 
27  class Simulation;
28 
29  class StorageService;
30 
31  class FailureCause;
32 
33  /***********************/
35  /***********************/
36 
45  class StandardJobExecutor : public Service {
46 
47  public:
48 
50 
51  // Public Constructor
54  std::string callback_mailbox,
55  std::string hostname,
56  StandardJob *job,
57  std::map<std::string, std::tuple<unsigned long, double>> compute_resources,
58  std::shared_ptr<StorageService> scratch_space,
59  bool part_of_pilot_job,
60  PilotJob* parent_pilot_job,
61  std::map<std::string, std::string> property_list,
62  std::map<std::string, double> messagepayload_list
63  );
64 
65  void kill(bool job_termination);
66 
68  std::map<std::string, std::tuple<unsigned long, double>> getComputeResources();
69 
70  // Get the set of files stored in scratch space by a standardjob job
71  std::set<WorkflowFile*> getFilesInScratch();
72 
73  private:
74 
75  friend class Simulation;
76  void cleanup(bool has_returned_from_main, int return_value) override;
77 
78  std::string callback_mailbox;
79  StandardJob *job;
80  std::map<std::string, std::tuple<unsigned long, double>> compute_resources;
81  int total_num_cores;
82  double total_ram;
83  std::shared_ptr<StorageService> scratch_space;
84 
85  bool part_of_pilot_job;
86 
87  // if this is not a part of pilot job, then this value will be nullptr
88  PilotJob* parent_pilot_job;
89 
90  // Files stored in scratch
91  std::set<WorkflowFile*> files_stored_in_scratch;
92 
93  // Core availabilities (for each hosts, how many cores are currently available on it)
94  std::map<std::string, unsigned long> core_availabilities;
95  // RAM availabilities (for each host, host many bytes of RAM are currently available on it)
96  std::map<std::string, double> ram_availabilities;
97 
98  // Sets of workunit executors
99  std::set<std::shared_ptr<WorkunitExecutor>> running_workunit_executors;
100  std::set<std::shared_ptr<WorkunitExecutor>> finished_workunit_executors;
101  std::set<std::shared_ptr<WorkunitExecutor>> failed_workunit_executors;
102 
103  // Work units
104  std::set<std::shared_ptr<Workunit>> non_ready_workunits;
105  std::set<std::shared_ptr<Workunit>> ready_workunits;
106  std::set<std::shared_ptr<Workunit>> running_workunits;
107  std::set<std::shared_ptr<Workunit>> completed_workunits;
108 
109  // Property list
110  std::map<std::string, std::string> property_list;
111 
112  std::map<std::string, std::string> default_property_values = {
118  };
119 
120  std::map<std::string, double> default_messagepayload_values = {
123  };
124 
125  std::shared_ptr<HostStateChangeDetector> host_state_monitor;
126 
127  int main() override;
128 
129  void processWorkunitExecutorCompletion(std::shared_ptr<WorkunitExecutor> workunit_executor,
130  std::shared_ptr<Workunit> workunit);
131 
132  void processWorkunitExecutorFailure(std::shared_ptr<WorkunitExecutor> workunit_executor,
133  std::shared_ptr<Workunit> workunit,
134  std::shared_ptr<FailureCause> cause);
135 
136  void processWorkunitExecutorCrash(std::shared_ptr<WorkunitExecutor> workunit_executor);
137 
138  bool processNextMessage();
139 
140  unsigned long computeWorkUnitMinNumCores(Workunit *wu);
141  unsigned long computeWorkUnitDesiredNumCores(Workunit *wu);
142  double computeWorkUnitMinMemory(Workunit *wu);
143 
144  void dispatchReadyWorkunits();
145 
146 // void createWorkunits();
147 
148  std::vector<std::shared_ptr<Workunit>> sortReadyWorkunits();
149 
150  //Clean up scratch
151  void cleanUpScratch();
152 
153  //Store the list of files available in scratch
154  void StoreListOfFilesInScratch();
155 
156 
157  };
158 
159  /***********************/
161  /***********************/
162 };
163 
164 
165 #endif //WRENCH_MULTINODEMULTICORESTANDARDJOBEXECUTOR_H
static const std::string STANDARD_JOB_DONE_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the executor to state that it has completed a job...
Definition: StandardJobExecutorMessagePayload.h:30
void kill(bool job_termination)
Kill the executor.
Definition: StandardJobExecutor.cpp:267
static const std::string HOST_SELECTION_ALGORITHM
The algorithm that decides on which host a task should be placed. Possible values are: ...
Definition: StandardJobExecutorProperty.h:55
~StandardJobExecutor()
Destructor.
Definition: StandardJobExecutor.cpp:43
StandardJobExecutor(Simulation *simulation, std::string callback_mailbox, std::string hostname, StandardJob *job, std::map< std::string, std::tuple< unsigned long, double >> compute_resources, std::shared_ptr< StorageService > scratch_space, bool part_of_pilot_job, PilotJob *parent_pilot_job, std::map< std::string, std::string > property_list, std::map< std::string, double > messagepayload_list)
Constructor.
Definition: StandardJobExecutor.cpp:67
A service that knows how to execute a standard job on a multi-node multi-core platform. Note that when killed in the middle of computing, this service will set (internal) running tasks&#39; states to FAILED, and likely the calling service will want to make failed tasks READY and NOT_READY again to "unwind" the failed executions and resubmit tasks for execution. Also, this service does not increment task failure counts, as it does not know if the kill() was an actual failure (i.e., some timeout) or a feature (i.e., a WMS changing its mind)
Definition: StandardJobExecutor.h:45
StandardJob * getJob()
Get the executor&#39;s job.
Definition: StandardJobExecutor.cpp:1023
A service that can be added to the simulation and that can be used by a WMS when executing a workflow...
Definition: Service.h:26
A class to describe a unit of work that&#39;s a sub-component of a StandardJob.
Definition: Workunit.h:35
static const std::string SIMULATE_COMPUTATION_AS_SLEEP
Simulate computation as just a sleep instead of with an actual compute thread. This is for scalabilit...
Definition: StandardJobExecutorProperty.h:61
std::string hostname
The name of the host on which the daemon is running.
Definition: S4U_Daemon.h:51
A pilot (i.e., non-standard) workflow job that can be submitted to a ComputeService by a WMS (via a J...
Definition: PilotJob.h:29
A standard (i.e., non-pilot) workflow job that can be submitted to a ComputeService by a WMS (via a J...
Definition: StandardJob.h:38
A class that provides basic simulation methods. Once the simulation object has been explicitly or imp...
Definition: Simulation.h:45
static const std::string TASK_SELECTION_ALGORITHM
The algorithm that decides which ready computational task, in case multiple tasks are ready...
Definition: StandardJobExecutorProperty.h:49
static const std::string CORE_ALLOCATION_ALGORITHM
The algorithm that decides how many cores are given to a computational task. Possible values are: ...
Definition: StandardJobExecutorProperty.h:41
std::set< WorkflowFile * > getFilesInScratch()
Get the set of files stored in scratch space during the standard job&#39;s execution. ...
Definition: StandardJobExecutor.cpp:1015
Simulation * simulation
a pointer to the simulation object
Definition: S4U_Daemon.h:105
static const std::string TASK_STARTUP_OVERHEAD
The number of seconds to start a task (default = 0)
Definition: StandardJobExecutorProperty.h:30
std::map< std::string, std::tuple< unsigned long, double > > getComputeResources()
Get the executor&#39;s compute resources.
Definition: StandardJobExecutor.cpp:1031
static const std::string STANDARD_JOB_FAILED_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the executor to state that a job has failed...
Definition: StandardJobExecutorMessagePayload.h:32
Definition: Alarm.cpp:19
std::map< std::string, double > messagepayload_list
The service&#39;s messagepayload list.
Definition: Service.h:113