10 #ifndef WRENCH_BAREMETALCOMPUTESERVICE_H
11 #define WRENCH_BAREMETALCOMPUTESERVICE_H
16 #include "wrench/services/compute/ComputeService.h"
17 #include "wrench/services/compute/standard_job_executor/StandardJobExecutor.h"
18 #include "BareMetalComputeServiceProperty.h"
19 #include "BareMetalComputeServiceMessagePayload.h"
20 #include "wrench/services/compute/workunit_executor/Workunit.h"
21 #include "wrench/services/helpers/HostStateChangeDetector.h"
55 std::map<std::string, std::string> default_property_values = {
62 std::map<std::string, double> default_messagepayload_values = {
88 const std::map<std::string, std::tuple<unsigned long, double>> compute_resources,
89 std::string scratch_space_mount_point,
90 std::map<std::string, std::string> property_list = {},
91 std::map<std::string, double> messagepayload_list = {}
96 const std::vector<std::string> compute_hosts,
97 std::string scratch_space_mount_point,
98 std::map<std::string, std::string> property_list = {},
99 std::map<std::string, double> messagepayload_list = {}
107 void submitStandardJob(std::shared_ptr<StandardJob> job,
const std::map<std::string, std::string> &service_specific_args)
override;
109 void submitPilotJob(std::shared_ptr<PilotJob> job,
const std::map<std::string, std::string> &service_specific_args)
override;
111 void terminateStandardJob(std::shared_ptr<StandardJob> job)
override;
113 void terminatePilotJob(std::shared_ptr<PilotJob> job)
override;
125 void validateProperties();
129 std::map<std::string, std::tuple<unsigned long, double>> compute_resources,
130 std::map<std::string, std::string> property_list,
131 std::map<std::string, double> messagepayload_list,
133 std::shared_ptr<PilotJob> pj, std::string suffix,
134 std::shared_ptr<StorageService> scratch_space);
138 std::map<std::string, std::tuple<unsigned long, double>> compute_resources,
139 std::map<std::string, std::string> property_list,
140 std::map<std::string, double> messagepayload_list,
141 std::shared_ptr<StorageService> scratch_space);
144 void initiateInstance(
const std::string &hostname,
145 std::map<std::string, std::tuple<unsigned long, double>> compute_resources,
146 std::map<std::string, std::string> property_list,
147 std::map<std::string, double> messagepayload_list,
149 std::shared_ptr<PilotJob> pj);
152 std::map<std::string, std::tuple<unsigned long, double>> compute_resources;
155 std::map<std::string, double> ram_availabilities;
156 std::map<std::string, unsigned long> running_thread_counts;
158 unsigned long total_num_cores;
163 std::shared_ptr<Alarm> death_alarm =
nullptr;
165 std::shared_ptr<PilotJob> containing_pilot_job;
167 std::map<std::shared_ptr<StandardJob> , std::set<WorkflowFile*>> files_in_scratch;
170 std::set<std::shared_ptr<StandardJob> > running_jobs;
173 std::map<std::shared_ptr<StandardJob> , std::map<WorkflowTask *, std::tuple<std::string, unsigned long>>> job_run_specs;
176 std::map<std::shared_ptr<StandardJob> , std::set<std::shared_ptr<Workunit>>> all_workunits;
178 std::deque<std::shared_ptr<Workunit>> ready_workunits;
180 std::map<std::shared_ptr<StandardJob> , std::set<std::shared_ptr<Workunit>>> completed_workunits;
183 std::map<std::shared_ptr<StandardJob> , std::set<std::shared_ptr<WorkunitExecutor>>> workunit_executors;
186 void storeFilesStoredInScratch(std::set<WorkflowFile*> scratch_files);
189 void cleanUpScratch();
195 void terminate(
bool notify_pilot_job_submitters);
197 void failCurrentStandardJobs();
199 void processWorkunitExecutorCompletion(std::shared_ptr<WorkunitExecutor> workunit_executor, std::shared_ptr<Workunit> workunit);
201 void processWorkunitExecutorFailure(std::shared_ptr<WorkunitExecutor> workunit_executor, std::shared_ptr<Workunit> workunit, std::shared_ptr<FailureCause> cause);
203 void processWorkunitExecutorCrash(std::shared_ptr<WorkunitExecutor> workunit_executor);
205 void forgetWorkunitExecutor(std::shared_ptr<WorkunitExecutor> workunit_executor);
207 void processStandardJobTerminationRequest(std::shared_ptr<StandardJob> job,
const std::string &answer_mailbox);
209 bool processNextMessage();
211 void dispatchReadyWorkunits();
218 enum JobTerminationCause {
223 COMPUTE_SERVICE_KILLED
226 void terminateRunningStandardJob(std::shared_ptr<StandardJob> job, JobTerminationCause termination_cause);
228 void failRunningStandardJob(std::shared_ptr<StandardJob> job, std::shared_ptr<FailureCause> cause);
230 void processGetResourceInformation(
const std::string &answer_mailbox);
232 void processSubmitPilotJob(
const std::string &answer_mailbox, std::shared_ptr<PilotJob> job, std::map<std::string, std::string> service_specific_args);
234 void processSubmitStandardJob(
const std::string &answer_mailbox, std::shared_ptr<StandardJob> job,
235 std::map<std::string, std::string> &service_specific_arguments);
237 std::tuple<std::string, unsigned long> pickAllocation(
WorkflowTask *task,
238 std::string required_host,
unsigned long required_num_cores,
double required_ram,
239 std::set<std::string> &hosts_to_avoid);
241 bool jobCanRun(std::shared_ptr<StandardJob> job, std::map<std::string, std::string> &service_specific_arguments);
243 bool isThereAtLeastOneHostWithResources(
unsigned long num_cores,
double ram);
245 void cleanup(
bool has_terminated_cleanly,
int return_value)
override;
247 bool areAllComputeResourcesDownWithNoWUERunning();
252 std::shared_ptr<HostStateChangeDetector> host_state_change_monitor;
258 #endif //WRENCH_BAREMETALCOMPUTESERVICE_H