10 #ifndef WRENCH_BAREMETALCOMPUTESERVICE_H
11 #define WRENCH_BAREMETALCOMPUTESERVICE_H
16 #include "wrench/services/compute/ComputeService.h"
17 #include "wrench/services/compute/standard_job_executor/StandardJobExecutor.h"
18 #include "BareMetalComputeServiceProperty.h"
19 #include "BareMetalComputeServiceMessagePayload.h"
20 #include "wrench/services/compute/workunit_executor/Workunit.h"
21 #include "wrench/services/helpers/HostStateChangeDetector.h"
55 std::map<std::string, std::string> default_property_values = {
62 std::map<std::string, double> default_messagepayload_values = {
90 const std::map<std::string, std::tuple<unsigned long, double>> compute_resources,
91 std::string scratch_space_mount_point,
92 std::map<std::string, std::string> property_list = {},
93 std::map<std::string, double> messagepayload_list = {}
98 const std::vector<std::string> compute_hosts,
99 std::string scratch_space_mount_point,
100 std::map<std::string, std::string> property_list = {},
101 std::map<std::string, double> messagepayload_list = {}
108 void submitStandardJob(std::shared_ptr<StandardJob> job,
const std::map<std::string, std::string> &service_specific_args)
override;
110 void submitPilotJob(std::shared_ptr<PilotJob> job,
const std::map<std::string, std::string> &service_specific_args)
override;
112 void terminateStandardJob(std::shared_ptr<StandardJob> job)
override;
114 void terminatePilotJob(std::shared_ptr<PilotJob> job)
override;
126 void validateProperties();
130 std::map<std::string, std::tuple<unsigned long, double>> compute_resources,
131 std::map<std::string, std::string> property_list,
132 std::map<std::string, double> messagepayload_list,
134 std::shared_ptr<PilotJob> pj, std::string suffix,
135 std::shared_ptr<StorageService> scratch_space);
139 std::map<std::string, std::tuple<unsigned long, double>> compute_resources,
140 std::map<std::string, std::string> property_list,
141 std::map<std::string, double> messagepayload_list,
142 std::shared_ptr<StorageService> scratch_space);
145 void initiateInstance(
const std::string &hostname,
146 std::map<std::string, std::tuple<unsigned long, double>> compute_resources,
147 std::map<std::string, std::string> property_list,
148 std::map<std::string, double> messagepayload_list,
150 std::shared_ptr<PilotJob> pj);
153 std::map<std::string, std::tuple<unsigned long, double>> compute_resources;
156 std::map<std::string, double> ram_availabilities;
157 std::map<std::string, unsigned long> running_thread_counts;
159 unsigned long total_num_cores;
164 std::shared_ptr<Alarm> death_alarm =
nullptr;
166 std::shared_ptr<PilotJob> containing_pilot_job;
168 std::map<std::shared_ptr<StandardJob> , std::set<WorkflowFile*>> files_in_scratch;
171 std::set<std::shared_ptr<StandardJob> > running_jobs;
174 std::map<std::shared_ptr<StandardJob> , std::map<WorkflowTask *, std::tuple<std::string, unsigned long>>> job_run_specs;
177 std::map<std::shared_ptr<StandardJob> , std::set<std::shared_ptr<Workunit>>> all_workunits;
179 std::deque<std::shared_ptr<Workunit>> ready_workunits;
181 std::map<std::shared_ptr<StandardJob> , std::set<std::shared_ptr<Workunit>>> completed_workunits;
184 std::map<std::shared_ptr<StandardJob> , std::set<std::shared_ptr<WorkunitExecutor>>> workunit_executors;
187 void storeFilesStoredInScratch(std::set<WorkflowFile*> scratch_files);
190 void cleanUpScratch();
196 void terminate(
bool notify_pilot_job_submitters);
198 void failCurrentStandardJobs();
200 void processWorkunitExecutorCompletion(std::shared_ptr<WorkunitExecutor> workunit_executor, std::shared_ptr<Workunit> workunit);
202 void processWorkunitExecutorFailure(std::shared_ptr<WorkunitExecutor> workunit_executor, std::shared_ptr<Workunit> workunit, std::shared_ptr<FailureCause> cause);
204 void processWorkunitExecutorCrash(std::shared_ptr<WorkunitExecutor> workunit_executor);
206 void forgetWorkunitExecutor(std::shared_ptr<WorkunitExecutor> workunit_executor);
208 void processStandardJobTerminationRequest(std::shared_ptr<StandardJob> job,
const std::string &answer_mailbox);
210 bool processNextMessage();
212 void dispatchReadyWorkunits();
219 enum JobTerminationCause {
224 COMPUTE_SERVICE_KILLED
227 void terminateRunningStandardJob(std::shared_ptr<StandardJob> job, JobTerminationCause termination_cause);
229 void failRunningStandardJob(std::shared_ptr<StandardJob> job, std::shared_ptr<FailureCause> cause);
231 void processGetResourceInformation(
const std::string &answer_mailbox);
233 void processSubmitPilotJob(
const std::string &answer_mailbox, std::shared_ptr<PilotJob> job, std::map<std::string, std::string> service_specific_args);
235 void processSubmitStandardJob(
const std::string &answer_mailbox, std::shared_ptr<StandardJob> job,
236 std::map<std::string, std::string> &service_specific_arguments);
238 void processIsThereAtLeastOneHostWithAvailableResources(
239 const std::string &answer_mailbox,
unsigned long num_cores,
double ram);
241 std::tuple<std::string, unsigned long> pickAllocation(
WorkflowTask *task,
242 std::string required_host,
unsigned long required_num_cores,
double required_ram,
243 std::set<std::string> &hosts_to_avoid);
245 bool jobCanRun(std::shared_ptr<StandardJob> job, std::map<std::string, std::string> &service_specific_arguments);
247 bool isThereAtLeastOneHostWithResources(
unsigned long num_cores,
double ram);
249 void cleanup(
bool has_terminated_cleanly,
int return_value)
override;
251 bool areAllComputeResourcesDownWithNoWUERunning();
256 std::shared_ptr<HostStateChangeDetector> host_state_change_monitor;
262 #endif //WRENCH_BAREMETALCOMPUTESERVICE_H