WRENCH  1.10
Cyberinfrastructure Simulation Workbench
Overview Installation Getting Started WRENCH 101 WRENCH 102
StandardJobExecutor.h
1 
10 #ifndef WRENCH_MULTINODEMULTICORESTANDARDJOBEXECUTOR_H
11 #define WRENCH_MULTINODEMULTICORESTANDARDJOBEXECUTOR_H
12 
13 
14 #include <queue>
15 #include <set>
16 
17 #include "wrench/services/compute/ComputeService.h"
18 #include "wrench/services/compute/workunit_executor/WorkunitExecutor.h"
19 #include "wrench/services/compute/standard_job_executor/StandardJobExecutorProperty.h"
20 #include "wrench/services/compute/standard_job_executor/StandardJobExecutorMessagePayload.h"
21 #include "wrench/services/compute/workunit_executor/Workunit.h"
22 #include "wrench/services/helpers/HostStateChangeDetector.h"
23 
24 
25 namespace wrench {
26 
27  class Simulation;
28 
29  class StorageService;
30 
31  class FailureCause;
32 
33  /***********************/
35  /***********************/
36 
45  class StandardJobExecutor : public Service {
46 
47  public:
48 
49  ~StandardJobExecutor();
50 
51  // Public Constructor
52  StandardJobExecutor (
53  Simulation *simulation,
54  std::string callback_mailbox,
55  std::string hostname,
56  std::shared_ptr<StandardJob> job,
57  std::map<std::string, std::tuple<unsigned long, double>> compute_resources,
58  std::shared_ptr<StorageService> scratch_space,
59  bool part_of_pilot_job,
60  PilotJob* parent_pilot_job,
61  std::map<std::string, std::string> property_list,
62  std::map<std::string, double> messagepayload_list
63  );
64 
65  void kill(bool job_termination);
66 
67  std::shared_ptr<StandardJob> getJob();
68  std::map<std::string, std::tuple<unsigned long, double>> getComputeResources();
69 
70  // Get the set of files stored in scratch space by a standardjob job
71  std::set<WorkflowFile*> getFilesInScratch();
72 
73  private:
74 
75  friend class Simulation;
76  void cleanup(bool has_returned_from_main, int return_value) override;
77 
78  std::string callback_mailbox;
79  std::shared_ptr<StandardJob> job;
80  std::map<std::string, std::tuple<unsigned long, double>> compute_resources;
81  int total_num_cores;
82  double total_ram;
83  std::shared_ptr<StorageService> scratch_space;
84 
85  bool part_of_pilot_job;
86 
87  // if this is not a part of pilot job, then this value will be nullptr
88  PilotJob* parent_pilot_job;
89 
90  // Files stored in scratch
91  std::set<WorkflowFile*> files_stored_in_scratch;
92 
93  // Core availabilities (for each hosts, how many cores are currently available on it)
94  std::map<std::string, unsigned long> core_availabilities;
95  // RAM availabilities (for each host, host many bytes of RAM are currently available on it)
96  std::map<std::string, double> ram_availabilities;
97 
98  // Sets of workunit executors
99  std::set<std::shared_ptr<WorkunitExecutor>> running_workunit_executors;
100  std::set<std::shared_ptr<WorkunitExecutor>> finished_workunit_executors;
101  std::set<std::shared_ptr<WorkunitExecutor>> failed_workunit_executors;
102 
103  // Work units
104  std::set<std::shared_ptr<Workunit>> non_ready_workunits;
105  std::set<std::shared_ptr<Workunit>> ready_workunits;
106  std::set<std::shared_ptr<Workunit>> running_workunits;
107  std::set<std::shared_ptr<Workunit>> completed_workunits;
108 
109  // Property list
110  std::map<std::string, std::string> property_list;
111 
112  std::map<std::string, std::string> default_property_values = {
113  {StandardJobExecutorProperty::TASK_STARTUP_OVERHEAD, "0"},
114  {StandardJobExecutorProperty::CORE_ALLOCATION_ALGORITHM, "maximum"},
115  {StandardJobExecutorProperty::TASK_SELECTION_ALGORITHM, "maximum_flops"},
116  {StandardJobExecutorProperty::HOST_SELECTION_ALGORITHM, "best_fit"},
117  {StandardJobExecutorProperty::SIMULATE_COMPUTATION_AS_SLEEP, "false"},
118  };
119 
120  std::map<std::string, double> default_messagepayload_values = {
121  {StandardJobExecutorMessagePayload::STANDARD_JOB_DONE_MESSAGE_PAYLOAD, 1024},
122  {StandardJobExecutorMessagePayload::STANDARD_JOB_FAILED_MESSAGE_PAYLOAD, 1024},
123  };
124 
125  std::shared_ptr<HostStateChangeDetector> host_state_monitor;
126 
127  int main() override;
128 
129  void processWorkunitExecutorCompletion(std::shared_ptr<WorkunitExecutor> workunit_executor,
130  std::shared_ptr<Workunit> workunit);
131 
132  void processWorkunitExecutorFailure(std::shared_ptr<WorkunitExecutor> workunit_executor,
133  std::shared_ptr<Workunit> workunit,
134  std::shared_ptr<FailureCause> cause);
135 
136  void processWorkunitExecutorCrash(std::shared_ptr<WorkunitExecutor> workunit_executor);
137 
138  bool processNextMessage();
139 
140  unsigned long computeWorkUnitMinNumCores(Workunit *wu);
141  unsigned long computeWorkUnitDesiredNumCores(Workunit *wu);
142  double computeWorkUnitMinMemory(Workunit *wu);
143 
144  void dispatchReadyWorkunits();
145 
146 // void createWorkunits();
147 
148  std::vector<std::shared_ptr<Workunit>> sortReadyWorkunits();
149 
150  //Clean up scratch
151  void cleanUpScratch();
152 
153  //Store the list of files available in scratch
154  void StoreListOfFilesInScratch();
155 
156 
157  };
158 
159  /***********************/
161  /***********************/
162 };
163 
164 
165 #endif //WRENCH_MULTINODEMULTICORESTANDARDJOBEXECUTOR_H
wrench::getJob
std::shared_ptr< WorkflowJob > getJob()
Get the executor's job.
Definition: StandardJobExecutor.cpp:990
wrench
Definition: Alarm.cpp:20