10 #ifndef WRENCH_MULTINODEMULTICORESTANDARDJOBEXECUTOR_H
11 #define WRENCH_MULTINODEMULTICORESTANDARDJOBEXECUTOR_H
17 #include "wrench/services/compute/ComputeService.h"
18 #include "wrench/services/compute/workunit_executor/WorkunitExecutor.h"
19 #include "wrench/services/compute/standard_job_executor/StandardJobExecutorProperty.h"
20 #include "wrench/services/compute/standard_job_executor/StandardJobExecutorMessagePayload.h"
21 #include "wrench/services/compute/workunit_executor/Workunit.h"
22 #include "wrench/services/helpers/HostStateChangeDetector.h"
45 class StandardJobExecutor :
public Service {
49 ~StandardJobExecutor();
53 Simulation *simulation,
54 std::string callback_mailbox,
56 std::shared_ptr<StandardJob> job,
57 std::map<std::string, std::tuple<unsigned long, double>> compute_resources,
58 std::shared_ptr<StorageService> scratch_space,
59 bool part_of_pilot_job,
60 PilotJob* parent_pilot_job,
61 std::map<std::string, std::string> property_list,
62 std::map<std::string, double> messagepayload_list
65 void kill(
bool job_termination);
67 std::shared_ptr<StandardJob>
getJob();
68 std::map<std::string, std::tuple<unsigned long, double>> getComputeResources();
71 std::set<WorkflowFile*> getFilesInScratch();
75 friend class Simulation;
76 void cleanup(
bool has_returned_from_main,
int return_value)
override;
78 std::string callback_mailbox;
79 std::shared_ptr<StandardJob> job;
80 std::map<std::string, std::tuple<unsigned long, double>> compute_resources;
83 std::shared_ptr<StorageService> scratch_space;
85 bool part_of_pilot_job;
88 PilotJob* parent_pilot_job;
91 std::set<WorkflowFile*> files_stored_in_scratch;
94 std::map<std::string, unsigned long> core_availabilities;
96 std::map<std::string, double> ram_availabilities;
99 std::set<std::shared_ptr<WorkunitExecutor>> running_workunit_executors;
100 std::set<std::shared_ptr<WorkunitExecutor>> finished_workunit_executors;
101 std::set<std::shared_ptr<WorkunitExecutor>> failed_workunit_executors;
104 std::set<std::shared_ptr<Workunit>> non_ready_workunits;
105 std::set<std::shared_ptr<Workunit>> ready_workunits;
106 std::set<std::shared_ptr<Workunit>> running_workunits;
107 std::set<std::shared_ptr<Workunit>> completed_workunits;
110 std::map<std::string, std::string> property_list;
112 std::map<std::string, std::string> default_property_values = {
113 {StandardJobExecutorProperty::TASK_STARTUP_OVERHEAD,
"0"},
114 {StandardJobExecutorProperty::CORE_ALLOCATION_ALGORITHM,
"maximum"},
115 {StandardJobExecutorProperty::TASK_SELECTION_ALGORITHM,
"maximum_flops"},
116 {StandardJobExecutorProperty::HOST_SELECTION_ALGORITHM,
"best_fit"},
117 {StandardJobExecutorProperty::SIMULATE_COMPUTATION_AS_SLEEP,
"false"},
120 std::map<std::string, double> default_messagepayload_values = {
121 {StandardJobExecutorMessagePayload::STANDARD_JOB_DONE_MESSAGE_PAYLOAD, 1024},
122 {StandardJobExecutorMessagePayload::STANDARD_JOB_FAILED_MESSAGE_PAYLOAD, 1024},
125 std::shared_ptr<HostStateChangeDetector> host_state_monitor;
129 void processWorkunitExecutorCompletion(std::shared_ptr<WorkunitExecutor> workunit_executor,
130 std::shared_ptr<Workunit> workunit);
132 void processWorkunitExecutorFailure(std::shared_ptr<WorkunitExecutor> workunit_executor,
133 std::shared_ptr<Workunit> workunit,
134 std::shared_ptr<FailureCause> cause);
136 void processWorkunitExecutorCrash(std::shared_ptr<WorkunitExecutor> workunit_executor);
138 bool processNextMessage();
140 unsigned long computeWorkUnitMinNumCores(Workunit *wu);
141 unsigned long computeWorkUnitDesiredNumCores(Workunit *wu);
142 double computeWorkUnitMinMemory(Workunit *wu);
144 void dispatchReadyWorkunits();
148 std::vector<std::shared_ptr<Workunit>> sortReadyWorkunits();
151 void cleanUpScratch();
154 void StoreListOfFilesInScratch();
165 #endif //WRENCH_MULTINODEMULTICORESTANDARDJOBEXECUTOR_H