StandardJobExecutor.h
1 
10 #ifndef WRENCH_MULTINODEMULTICORESTANDARDJOBEXECUTOR_H
11 #define WRENCH_MULTINODEMULTICORESTANDARDJOBEXECUTOR_H
12 
13 
14 #include <queue>
15 #include <set>
16 
17 #include "wrench/services/compute/ComputeService.h"
18 #include "wrench/services/compute/workunit_executor/WorkunitExecutor.h"
19 #include "wrench/services/compute/standard_job_executor/StandardJobExecutorProperty.h"
20 #include "wrench/services/compute/standard_job_executor/StandardJobExecutorMessagePayload.h"
21 #include "wrench/services/compute/workunit_executor/Workunit.h"
22 #include "wrench/services/helpers/HostStateChangeDetector.h"
23 
24 
25 namespace wrench {
26 
27  class Simulation;
28 
29  class StorageService;
30 
31  class FailureCause;
32 
33  /***********************/
35  /***********************/
36 
45  class StandardJobExecutor : public Service {
46 
47  public:
48 
50 
51  // Public Constructor
54  std::string callback_mailbox,
55  std::string hostname,
56  StandardJob *job,
57  std::map<std::string, std::tuple<unsigned long, double>> compute_resources,
58  std::shared_ptr<StorageService> scratch_space,
59  bool part_of_pilot_job,
60  PilotJob* parent_pilot_job,
61  std::map<std::string, std::string> property_list,
62  std::map<std::string, double> messagepayload_list
63  );
64 
65  void kill(bool job_termination);
66 
68  std::map<std::string, std::tuple<unsigned long, double>> getComputeResources();
69 
70  // Get the set of files stored in scratch space by a standardjob job
71  std::set<WorkflowFile*> getFilesInScratch();
72 
73  private:
74 
75  friend class Simulation;
76  void cleanup(bool has_returned_from_main, int return_value) override;
77 
78  std::string callback_mailbox;
79  StandardJob *job;
80  std::map<std::string, std::tuple<unsigned long, double>> compute_resources;
81  int total_num_cores;
82  double total_ram;
83  std::shared_ptr<StorageService> scratch_space;
84 
85  bool part_of_pilot_job;
86 
87  // if this is not a part of pilot job, then this value will be nullptr
88  PilotJob* parent_pilot_job;
89 
90  // Files stored in scratch
91  std::set<WorkflowFile*> files_stored_in_scratch;
92 
93  // Core availabilities (for each hosts, how many cores are currently available on it)
94  std::map<std::string, unsigned long> core_availabilities;
95  // RAM availabilities (for each host, host many bytes of RAM are currently available on it)
96  std::map<std::string, double> ram_availabilities;
97 
98  // Sets of workunit executors
99  std::set<std::shared_ptr<WorkunitExecutor>> running_workunit_executors;
100  std::set<std::shared_ptr<WorkunitExecutor>> finished_workunit_executors;
101  std::set<std::shared_ptr<WorkunitExecutor>> failed_workunit_executors;
102 
103  // Work units
104  std::set<std::shared_ptr<Workunit>> non_ready_workunits;
105  std::set<std::shared_ptr<Workunit>> ready_workunits;
106  std::set<std::shared_ptr<Workunit>> running_workunits;
107  std::set<std::shared_ptr<Workunit>> completed_workunits;
108 
109  // Property list
110  std::map<std::string, std::string> property_list;
111 
112  std::map<std::string, std::string> default_property_values = {
118  };
119 
120  std::map<std::string, double> default_messagepayload_values = {
123  };
124 
125  std::shared_ptr<HostStateChangeDetector> host_state_monitor;
126 
127  int main() override;
128 
129  void processWorkunitExecutorCompletion(std::shared_ptr<WorkunitExecutor> workunit_executor,
130  std::shared_ptr<Workunit> workunit);
131 
132  void processWorkunitExecutorFailure(std::shared_ptr<WorkunitExecutor> workunit_executor,
133  std::shared_ptr<Workunit> workunit,
134  std::shared_ptr<FailureCause> cause);
135 
136  void processWorkunitExecutorCrash(std::shared_ptr<WorkunitExecutor> workunit_executor);
137 
138  bool processNextMessage();
139 
140  unsigned long computeWorkUnitMinNumCores(Workunit *wu);
141  unsigned long computeWorkUnitDesiredNumCores(Workunit *wu);
142  double computeWorkUnitMinMemory(Workunit *wu);
143 
144  void dispatchReadyWorkunits();
145 
146 // void createWorkunits();
147 
148  std::vector<std::shared_ptr<Workunit>> sortReadyWorkunits();
149 
150  //Clean up scratch
151  void cleanUpScratch();
152 
153  //Store the list of files available in scratch
154  void StoreListOfFilesInScratch();
155 
156 
157  };
158 
159  /***********************/
161  /***********************/
162 };
163 
164 
165 #endif //WRENCH_MULTINODEMULTICORESTANDARDJOBEXECUTOR_H
static const std::string STANDARD_JOB_DONE_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the executor to state that it has completed a job.
Definition: StandardJobExecutorMessagePayload.h:30
static const std::string STANDARD_JOB_FAILED_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the executor to state that a job has failed.
Definition: StandardJobExecutorMessagePayload.h:32
void kill(bool job_termination)
Kill the executor.
Definition: StandardJobExecutor.cpp:268
~StandardJobExecutor()
Destructor.
Definition: StandardJobExecutor.cpp:44
A class to describe a unit of work that's a sub-component of a StandardJob.
Definition: Workunit.h:35
StandardJob * getJob()
Get the executor's job.
Definition: StandardJobExecutor.cpp:1015
A standard (i.e., non-pilot) workflow job that can be submitted to a ComputeService by a WMS (via a J...
Definition: StandardJob.h:37
static const std::string CORE_ALLOCATION_ALGORITHM
The algorithm that decides how many cores are given to a computational task. Possible values are:
Definition: StandardJobExecutorProperty.h:41
Definition: Alarm.cpp:20
static const std::string TASK_SELECTION_ALGORITHM
The algorithm that decides which ready computational task, in case multiple tasks are ready,...
Definition: StandardJobExecutorProperty.h:49
static const std::string HOST_SELECTION_ALGORITHM
The algorithm that decides on which host a task should be placed. Possible values are:
Definition: StandardJobExecutorProperty.h:55
A service that knows how to execute a standard job on a multi-node multi-core platform....
Definition: StandardJobExecutor.h:45
static const std::string TASK_STARTUP_OVERHEAD
The number of seconds to start a task (default = 0)
Definition: StandardJobExecutorProperty.h:30
std::map< std::string, std::tuple< unsigned long, double > > getComputeResources()
Get the executor's compute resources.
Definition: StandardJobExecutor.cpp:1023
std::map< std::string, double > messagepayload_list
The service's messagepayload list.
Definition: Service.h:112
std::string hostname
The name of the host on which the daemon is running.
Definition: S4U_Daemon.h:51
A class that provides basic simulation methods. Once the simulation object has been explicitly or imp...
Definition: Simulation.h:45
static const std::string SIMULATE_COMPUTATION_AS_SLEEP
Simulate computation as just a sleep instead of with an actual compute thread. This is for scalabilit...
Definition: StandardJobExecutorProperty.h:61
std::set< WorkflowFile * > getFilesInScratch()
Get the set of files stored in scratch space during the standard job's execution.
Definition: StandardJobExecutor.cpp:1007
A pilot (i.e., non-standard) workflow job that can be submitted to a ComputeService by a WMS (via a J...
Definition: PilotJob.h:29
StandardJobExecutor(Simulation *simulation, std::string callback_mailbox, std::string hostname, StandardJob *job, std::map< std::string, std::tuple< unsigned long, double >> compute_resources, std::shared_ptr< StorageService > scratch_space, bool part_of_pilot_job, PilotJob *parent_pilot_job, std::map< std::string, std::string > property_list, std::map< std::string, double > messagepayload_list)
Constructor.
Definition: StandardJobExecutor.cpp:68
A service that can be added to the simulation and that can be used by a WMS when executing a workflow...
Definition: Service.h:26
Simulation * simulation
a pointer to the simulation object
Definition: S4U_Daemon.h:105