11 #ifndef WRENCH_BATCH_SERVICE_H
12 #define WRENCH_BATCH_SERVICE_H
14 #include "wrench/services/compute/ComputeService.h"
15 #include "wrench/services/compute/standard_job_executor/StandardJobExecutor.h"
16 #include "wrench/services/compute/batch/BatchJob.h"
17 #include "wrench/services/compute/batch/BatschedNetworkListener.h"
18 #include "wrench/services/compute/batch/BatchComputeServiceProperty.h"
19 #include "wrench/services/compute/batch/BatchComputeServiceMessagePayload.h"
20 #include "wrench/services/helpers/Alarm.h"
21 #include "wrench/workflow/job/StandardJob.h"
22 #include "wrench/workflow/job/WorkflowJob.h"
23 #include "wrench/services/compute/batch/batch_schedulers/BatchScheduler.h"
32 class WorkloadTraceFileReplayer;
56 std::map<std::string, std::string> default_property_values = {
62 #ifdef ENABLE_BATSCHED
82 std::map<std::string, double> default_messagepayload_values = {
103 std::vector<std::string> compute_hosts,
104 std::string scratch_space_mount_point,
112 std::map<std::string,double>
getStartTimeEstimates(std::set<std::tuple<std::string,unsigned long,unsigned long, double>> resources);
114 std::vector<std::tuple<std::string, int, int, int, int, double, double>>
getQueue();
136 std::vector<std::string> compute_hosts,
137 unsigned long cores_per_host,
139 std::string scratch_space_mount_point,
146 static unsigned long parseUnsignedLongServiceSpecificArgument(std::string key,
const std::map<std::string, std::string> &args);
149 void submitWorkflowJob(
WorkflowJob *job,
const std::map<std::string, std::string> &batch_job_args);
152 void submitStandardJob(
StandardJob *job,
const std::map<std::string, std::string> &batch_job_args)
override;
155 void submitPilotJob(
PilotJob *job,
const std::map<std::string, std::string> &batch_job_args)
override;
161 void terminateStandardJob(
StandardJob *job)
override;
164 void terminatePilotJob(
PilotJob *job)
override;
166 std::vector<std::tuple<std::string, double, double, double, double, unsigned int, std::string>> workload_trace;
167 std::shared_ptr<WorkloadTraceFileReplayer> workload_trace_replayer;
169 bool clean_exit =
false;
172 unsigned long random_interval = 10;
175 std::map<std::string,std::shared_ptr<Alarm>> standard_job_alarms;
178 std::map<std::string,std::shared_ptr<Alarm>> pilot_job_alarms;
181 unsigned long total_num_of_nodes;
182 unsigned long num_cores_per_node;
183 std::map<std::string, unsigned long> nodes_to_cores_map;
184 std::vector<double> timeslots;
185 std::map<std::string, unsigned long> available_nodes_to_cores;
186 std::map<unsigned long, std::string> host_id_to_names;
187 std::vector<std::string> compute_hosts;
191 std::set<std::shared_ptr<StandardJobExecutor>> running_standard_job_executors;
194 std::set<std::shared_ptr<StandardJobExecutor>> finished_standard_job_executors;
197 std::set<std::shared_ptr<BatchJob>> all_jobs;
200 std::set<std::shared_ptr<BatchJob>> running_jobs;
203 std::deque<std::shared_ptr<BatchJob>> batch_queue;
207 std::set<std::shared_ptr<BatchJob>> waiting_jobs;
210 std::unique_ptr<BatchScheduler> scheduler;
213 #ifdef ENABLE_BATSCHED
215 std::set<std::string> scheduling_algorithms = {
"conservative_bf",
"crasher",
"easy_bf",
"easy_bf_fast",
216 "easy_bf_plot_liquid_load_horizon",
217 "energy_bf",
"energy_bf_dicho",
"energy_bf_idle_sleeper",
218 "energy_bf_monitoring",
219 "energy_bf_monitoring_inertial",
"energy_bf_subpart_sleeper",
220 "energy_watcher",
"fcfs_fast",
"fast_conservative_bf",
221 "filler",
"killer",
"killer2",
"random",
"rejecter",
222 "sequencer",
"sleeper",
"submitter",
"waiting_time_estimator"
225 std::set<std::string> queue_ordering_options = {
"fcfs",
"lcfs",
"desc_bounded_slowdown",
"desc_slowdown",
226 "asc_size",
"desc_size",
"asc_walltime",
"desc_walltime"
230 std::set<std::string> scheduling_algorithms = {
"fcfs",
"conservative_bf",
234 std::set<std::string> queue_ordering_options = {
239 unsigned long generateUniqueJobID();
241 void removeJobFromRunningList(std::shared_ptr<BatchJob> job);
243 void removeJobFromBatchQueue(std::shared_ptr<BatchJob> job);
245 void removeBatchJobFromJobsList(std::shared_ptr<BatchJob> job);
249 bool processNextMessage();
251 void startBackgroundWorkloadProcess();
253 void processGetResourceInformation(
const std::string &answer_mailbox);
255 void processStandardJobCompletion(std::shared_ptr<StandardJobExecutor> executor,
StandardJob *job);
257 void processStandardJobFailure(std::shared_ptr<StandardJobExecutor> executor,
259 std::shared_ptr<FailureCause> cause);
261 void terminateRunningStandardJob(
StandardJob *job);
265 void cleanup(
bool has_returned_from_main,
int return_value)
override;
268 void terminateRunningPilotJobs();
271 void failCurrentStandardJobs();
274 void processPilotJobCompletion(
PilotJob *job);
280 void processStandardJobTerminationRequest(
StandardJob *job, std::string answer_mailbox);
283 void processPilotJobTerminationRequest(
PilotJob *job, std::string answer_mailbox);
286 void processAlarmJobTimeout(std::shared_ptr<BatchJob>job);
289 void processPilotJobTimeout(
PilotJob *job);
292 void freeUpResources(std::map<std::string, std::tuple<unsigned long, double>> resources);
295 void sendPilotJobExpirationNotification(
PilotJob *job);
298 void sendStandardJobFailureNotification(
StandardJob *job, std::string job_id, std::shared_ptr<FailureCause> cause);
301 void processJobSubmission(std::shared_ptr<BatchJob>job, std::string answer_mailbox);
304 void startJob(std::map<std::string, std::tuple<unsigned long, double>>,
WorkflowJob *,
305 std::shared_ptr<BatchJob>,
unsigned long,
unsigned long,
unsigned long);
308 void processExecuteJobFromBatSched(std::string bat_sched_reply);
314 #endif //WRENCH_BATCH_SERVICE_H
static const std::string SIMULATE_COMPUTATION_AS_SLEEP
Simulate computation as just a sleep instead of an actual compute thread. This is for scalability rea...
Definition: BatchComputeServiceProperty.h:139
static const std::string PILOT_JOB_EXPIRED_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state that a pilot job has expired.
Definition: ComputeServiceMessagePayload.h:44
static const std::string TERMINATE_PILOT_JOB_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to acknowledge a pilot job termination.
Definition: ComputeServiceMessagePayload.h:54
A class that implements a FCFS batch scheduler.
Definition: FCFSBatchScheduler.h:25
static const std::string SUBMIT_STANDARD_JOB_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to submit a standard job.
Definition: ComputeServiceMessagePayload.h:26
static const std::string DAEMON_STOPPED_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to confirm it has terminated.
Definition: ServiceMessagePayload.h:33
static const std::string TERMINATE_STANDARD_JOB_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to acknowledge a standard job terminati...
Definition: ComputeServiceMessagePayload.h:36
static const std::string IGNORE_INVALID_JOBS_IN_WORKLOAD_TRACE_FILE
Whether, when simulating a workload trace file, to abort when there is an invalid job specification (...
Definition: BatchComputeServiceProperty.h:101
static const std::string USE_REAL_RUNTIMES_AS_REQUESTED_RUNTIMES_IN_WORKLOAD_TRACE_FILE
Whether, when simulating a workload trace file, to use the actual runtimes as requested runtimes (i....
Definition: BatchComputeServiceProperty.h:92
static const std::string RESOURCE_DESCRIPTION_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to ask it for information on its resour...
Definition: ComputeServiceMessagePayload.h:56
std::vector< std::tuple< std::string, int, int, int, int, double, double > > getQueue()
Gets the state of the batch queue.
Definition: BatchComputeService.cpp:244
static const std::string BATSCHED_CONTIGUOUS_ALLOCATION
Controls Batsched node allocation policy.
Definition: BatchComputeServiceProperty.h:157
static const std::string HOST_SELECTION_ALGORITHM
The host selection algorithm. Can be:
Definition: BatchComputeServiceProperty.h:59
The compute service base class.
Definition: ComputeService.h:35
static const std::string SUBMIT_TIME_OF_FIRST_JOB_IN_WORKLOAD_TRACE_FILE
A specification of the submit time of the first job in a provided trace file.
Definition: BatchComputeServiceProperty.h:108
A standard (i.e., non-pilot) workflow job that can be submitted to a ComputeService by a WMS (via a J...
Definition: StandardJob.h:37
static const std::string SIMULATED_WORKLOAD_TRACE_FILE
Path to a workload trace file to be replayed. The trace file can be be in the SWF format (see http://...
Definition: BatchComputeServiceProperty.h:82
static const std::string TERMINATE_PILOT_JOB_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to terminate a pilot job.
Definition: ComputeServiceMessagePayload.h:52
A batch-scheduled compute service that manages a set of compute hosts and controls access to their re...
Definition: BatchComputeService.h:49
static const std::string STANDARD_JOB_DONE_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state that it has completed a standa...
Definition: ComputeServiceMessagePayload.h:30
static const std::string STANDARD_JOB_FAILED_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state that a running standard job ha...
Definition: ComputeServiceMessagePayload.h:32
static const std::string TERMINATE_STANDARD_JOB_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to terminate a standard job.
Definition: ComputeServiceMessagePayload.h:34
static const std::string SUBMIT_PILOT_JOB_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent from the daemon to acknowledge a pilot job submission...
Definition: ComputeServiceMessagePayload.h:40
std::map< std::string, double > getStartTimeEstimates(std::set< std::tuple< std::string, unsigned long, unsigned long, double >> resources)
Retrieve start time estimates for a set of job configurations.
Definition: BatchComputeService.cpp:219
A class that defines a batsched batch scheduler.
Definition: BatschedBatchScheduler.h:24
static const std::string TASK_STARTUP_OVERHEAD
The overhead to start a task execution, in seconds.
Definition: BatchComputeServiceProperty.h:27
static const std::string BATSCHED_LOGGING_MUTED
Controls Batsched logging.
Definition: BatchComputeServiceProperty.h:147
static const std::string TASK_SELECTION_ALGORITHM
The algorithm to pick which ready computational task (within a standard job executed by the batch ser...
Definition: BatchComputeServiceProperty.h:69
Abstraction of a job used for executing tasks in a Workflow.
Definition: WorkflowJob.h:34
A service that goes through a job submission trace (as loaded by a TraceFileLoader),...
Definition: WorkloadTraceFileReplayer.h:28
std::map< std::string, double > messagepayload_list
The service's messagepayload list.
Definition: Service.h:112
static const std::string RESOURCE_DESCRIPTION_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state information on its resources.
Definition: ComputeServiceMessagePayload.h:58
static const std::string OUTPUT_CSV_JOB_LOG
Path to a to-be-generated Batsim-style CSV trace file (e.g. for b3atch schedule visualization purpose...
Definition: BatchComputeServiceProperty.h:119
static const std::string STOP_DAEMON_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to terminate it.
Definition: ServiceMessagePayload.h:31
static const std::string PILOT_JOB_STARTED_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state that a pilot job has started.
Definition: ComputeServiceMessagePayload.h:42
std::string hostname
The name of the host on which the daemon is running.
Definition: S4U_Daemon.h:51
static const std::string SUBMIT_STANDARD_JOB_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to acknowledge a standard job submissio...
Definition: ComputeServiceMessagePayload.h:28
static const std::string BATCH_SCHEDULING_ALGORITHM
The batch scheduling algorithm. Can be:
Definition: BatchComputeServiceProperty.h:40
static const std::string SUBMIT_PILOT_JOB_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to submit a pilot job.
Definition: ComputeServiceMessagePayload.h:38
static const std::string SUPPORTS_PILOT_JOBS
Whether the compute service supports pilot jobs (true or false)
Definition: ComputeServiceProperty.h:26
A pilot (i.e., non-standard) workflow job that can be submitted to a ComputeService by a WMS (via a J...
Definition: PilotJob.h:29
static const std::string BATCH_RJMS_PADDING_DELAY
Integral number of seconds that the Batch Scheduler adds to the runtime of each incoming job....
Definition: BatchComputeServiceProperty.h:129
A class that defines a conservative backfilling batch scheduler.
Definition: CONSERVATIVEBFBatchScheduler.h:26
BatchComputeService(const std::string &hostname, std::vector< std::string > compute_hosts, std::string scratch_space_mount_point, std::map< std::string, std::string > property_list={}, std::map< std::string, double > messagepayload_list={})
Constructor.
Definition: BatchComputeService.cpp:60
static const std::string BATCH_QUEUE_ORDERING_ALGORITHM
The batch queue ordering algorithm. Can be:
Definition: BatchComputeServiceProperty.h:49
std::map< std::string, std::string > property_list
The service's property list.
Definition: Service.h:109
static const std::string SUPPORTS_STANDARD_JOBS
Whether the compute service supports standard jobs (true or false)
Definition: ComputeServiceProperty.h:24