WRENCH  1.10
Cyberinfrastructure Simulation Workbench
Overview Installation Getting Started WRENCH 101 WRENCH 102
BatchComputeService.h
1 
11 #ifndef WRENCH_BATCH_SERVICE_H
12 #define WRENCH_BATCH_SERVICE_H
13 
14 #include "wrench/services/compute/ComputeService.h"
15 #include "wrench/services/compute/standard_job_executor/StandardJobExecutor.h"
16 #include "wrench/services/compute/batch/BatchJob.h"
17 #include "wrench/services/compute/batch/BatschedNetworkListener.h"
18 #include "wrench/services/compute/batch/BatchComputeServiceProperty.h"
19 #include "wrench/services/compute/batch/BatchComputeServiceMessagePayload.h"
20 #include "wrench/services/helpers/Alarm.h"
21 #include "wrench/workflow/job/StandardJob.h"
22 #include "wrench/workflow/job/WorkflowJob.h"
23 #include "wrench/services/compute/batch/batch_schedulers/BatchScheduler.h"
24 
25 #include <deque>
26 #include <queue>
27 #include <set>
28 #include <tuple>
29 
30 namespace wrench {
31 
32  class WorkloadTraceFileReplayer; // forward
33 
50 
54  private:
55 
56  std::map<std::string, std::string> default_property_values = {
62 #ifdef ENABLE_BATSCHED
64 // {BatchComputeServiceProperty::BATCH_SCHEDULING_ALGORITHM, "easy_bf"},
65 // {BatchComputeServiceProperty::BATCH_SCHEDULING_ALGORITHM, "easy_bf_fast"},
66 
68 #else
70 #endif
80  };
81 
82  std::map<std::string, double> default_messagepayload_values = {
101  };
102 
103  public:
104  BatchComputeService(const std::string &hostname,
105  std::vector<std::string> compute_hosts,
106  std::string scratch_space_mount_point,
107  std::map<std::string, std::string> property_list = {},
108  std::map<std::string, double> messagepayload_list = {}
109  );
110 
111  /***********************/
113  /***********************/
114  std::map<std::string,double> getStartTimeEstimates(std::set<std::tuple<std::string,unsigned long,unsigned long, double>> resources);
115 
116  std::vector<std::tuple<std::string, std::string, int, int, int, double, double>> getQueue();
117 
118  /***********************/
120  /***********************/
121 
122  /***********************/
124  /***********************/
125  ~BatchComputeService() override;
126  // helper function
127  static unsigned long parseUnsignedLongServiceSpecificArgument(std::string key, const std::map<std::string, std::string> &args);
128  /***********************/
131  /***********************/
132 
133  private:
134  friend class WorkloadTraceFileReplayer;
135  friend class FCFSBatchScheduler;
136  friend class CONSERVATIVEBFBatchScheduler;
138 
139  friend class BatschedBatchScheduler;
140 
141  BatchComputeService(const std::string hostname,
142  std::vector<std::string> compute_hosts,
143  unsigned long cores_per_host,
144  double ram_per_host,
145  std::string scratch_space_mount_point,
146  std::map<std::string, std::string> property_list,
147  std::map<std::string, double> messagepayload_list,
148  std::string suffix
149  );
150 
151  // helper function
152  void submitWorkflowJob(std::shared_ptr<WorkflowJob> job, const std::map<std::string, std::string> &batch_job_args);
153 
154  //submits a standard job
155  void submitStandardJob(std::shared_ptr<StandardJob> job, const std::map<std::string, std::string> &batch_job_args) override;
156 
157  //submits a standard job
158  void submitPilotJob(std::shared_ptr<PilotJob> job, const std::map<std::string, std::string> &batch_job_args) override;
159 
160  // helper function
161  void terminateWorkflowJob(std::shared_ptr<WorkflowJob> job);
162 
163  // terminate a standard job
164  void terminateStandardJob(std::shared_ptr<StandardJob> job) override;
165 
166  // terminate a pilot job
167  void terminatePilotJob(std::shared_ptr<PilotJob> job) override;
168 
169  std::vector<std::tuple<std::string, double, double, double, double, unsigned int, std::string>> workload_trace;
170  std::shared_ptr<WorkloadTraceFileReplayer> workload_trace_replayer;
171 
172  bool clean_exit = false;
173 
174  //create alarms for standard jobs
175  std::map<std::string,std::shared_ptr<Alarm>> standard_job_alarms;
176 
177  //alarms for pilot jobs (only one pilot job alarm)
178  std::map<std::string,std::shared_ptr<Alarm>> pilot_job_alarms;
179 
180  /* Resources information in batch */
181  unsigned long total_num_of_nodes;
182  unsigned long num_cores_per_node;
183  std::map<std::string, unsigned long> nodes_to_cores_map;
184  std::vector<double> timeslots;
185  std::map<std::string, unsigned long> available_nodes_to_cores;
186  std::map<unsigned long, std::string> host_id_to_names;
187  std::vector<std::string> compute_hosts;
188  /* End Resources information in batch */
189 
190  // Vector of standard job executors
191  std::set<std::shared_ptr<StandardJobExecutor>> running_standard_job_executors;
192 
193  // Vector of standard job executors (which is cleared periodically)
194  std::set<std::shared_ptr<StandardJobExecutor>> finished_standard_job_executors;
195 
196  // Master List of batch jobs
197  std::set<std::shared_ptr<BatchJob>> all_jobs;
198 
199  //A set of running batch jobs
200  std::set<std::shared_ptr<BatchJob>> running_jobs;
201 
202  // The batch queue
203  std::deque<std::shared_ptr<BatchJob>> batch_queue;
204 
205  // A set of "waiting" batch jobs, i.e., jobs that are waiting to be sent to
206  // the scheduler (useful for batsched only)
207  std::set<std::shared_ptr<BatchJob>> waiting_jobs;
208 
209  // Scheduler
210  std::unique_ptr<BatchScheduler> scheduler;
211 
212 
213 #ifdef ENABLE_BATSCHED
214 
215  std::set<std::string> scheduling_algorithms = {"conservative_bf", "crasher", "easy_bf", "easy_bf_fast",
216  "easy_bf_plot_liquid_load_horizon",
217  "energy_bf", "energy_bf_dicho", "energy_bf_idle_sleeper",
218  "energy_bf_monitoring",
219  "energy_bf_monitoring_inertial", "energy_bf_subpart_sleeper",
220  "energy_watcher", "fcfs_fast", "fast_conservative_bf",
221  "filler", "killer", "killer2", "random", "rejecter",
222  "sequencer", "sleeper", "submitter", "waiting_time_estimator"
223  };
224 
225  std::set<std::string> queue_ordering_options = {"fcfs", "lcfs", "desc_bounded_slowdown", "desc_slowdown",
226  "asc_size", "desc_size", "asc_walltime", "desc_walltime"
227 
228  };
229 #else
230  std::set<std::string> scheduling_algorithms = {"fcfs", "conservative_bf", "conservative_bf_core_level"
231  };
232 
233  //Batch queue ordering options
234  std::set<std::string> queue_ordering_options = {
235  };
236 
237 #endif
238 
239  unsigned long generateUniqueJobID();
240 
241  void removeJobFromRunningList(std::shared_ptr<BatchJob> job);
242 
243  void removeJobFromBatchQueue(std::shared_ptr<BatchJob> job);
244 
245  void removeBatchJobFromJobsList(std::shared_ptr<BatchJob> job);
246 
247  int main() override;
248 
249  bool processNextMessage();
250 
251  void startBackgroundWorkloadProcess();
252 
253  void processGetResourceInformation(const std::string &answer_mailbox);
254 
255  void processStandardJobCompletion(std::shared_ptr<StandardJobExecutor> executor, std::shared_ptr<StandardJob> job);
256 
257  void processStandardJobFailure(std::shared_ptr<StandardJobExecutor> executor,
258  std::shared_ptr<StandardJob> job,
259  std::shared_ptr<FailureCause> cause);
260 
261  void terminateRunningStandardJob(std::shared_ptr<StandardJob> job);
262 
263 
264  //Terminate the batch service (this is usually for pilot jobs when they act as a batch service)
265  void cleanup(bool has_returned_from_main, int return_value) override;
266 
267  // Terminate currently running pilot jobs
268  void terminateRunningPilotJobs();
269 
270  //Fail the standard jobs
271  void failCurrentStandardJobs();
272 
273  //Process the pilot job completion
274  void processPilotJobCompletion(std::shared_ptr<PilotJob> job);
275 
276  //Process standard job timeout
277  void processStandardJobTimeout(std::shared_ptr<StandardJob> job);
278 
279  //process standard job termination request
280  void processStandardJobTerminationRequest(std::shared_ptr<StandardJob> job, std::string answer_mailbox);
281 
282  //process pilot job termination request
283  void processPilotJobTerminationRequest(std::shared_ptr<PilotJob> job, std::string answer_mailbox);
284 
285  // process a batch job tiemout event
286  void processAlarmJobTimeout(std::shared_ptr<BatchJob>job);
287 
288  //Process pilot job timeout
289  void processPilotJobTimeout(std::shared_ptr<PilotJob> job);
290 
291  //free up resources
292  void freeUpResources(std::map<std::string, std::tuple<unsigned long, double>> resources);
293 
294  //send call back to the pilot job submitters
295  void sendPilotJobExpirationNotification(std::shared_ptr<PilotJob> job);
296 
297  //send call back to the standard job submitters
298  void sendStandardJobFailureNotification(std::shared_ptr<StandardJob> job, std::string job_id, std::shared_ptr<FailureCause> cause);
299 
300  // process a job submission
301  void processJobSubmission(std::shared_ptr<BatchJob>job, std::string answer_mailbox);
302 
303  //start a job
304  void startJob(std::map<std::string, std::tuple<unsigned long, double>>, std::shared_ptr<WorkflowJob> ,
305  std::shared_ptr<BatchJob>, unsigned long, unsigned long, unsigned long);
306 
307 
308  void processExecuteJobFromBatSched(std::string bat_sched_reply);
309 
310  void processIsThereAtLeastOneHostWithAvailableResources(const std::string &answer_mailbox, unsigned long num_cores, double ram);
311 
312  };
313 }
314 
315 
316 #endif //WRENCH_BATCH_SERVICE_H
wrench::BatchComputeServiceProperty::SIMULATE_COMPUTATION_AS_SLEEP
static const std::string SIMULATE_COMPUTATION_AS_SLEEP
Simulate computation as just a sleep instead of an actual compute thread. This is for scalability rea...
Definition: BatchComputeServiceProperty.h:142
wrench::ComputeServiceMessagePayload::PILOT_JOB_EXPIRED_MESSAGE_PAYLOAD
static const std::string PILOT_JOB_EXPIRED_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state that a pilot job has expired.
Definition: ComputeServiceMessagePayload.h:44
wrench::ComputeServiceMessagePayload::IS_THERE_AT_LEAST_ONE_HOST_WITH_AVAILABLE_RESOURCES_ANSWER_MESSAGE_PAYLOAD
static const std::string IS_THERE_AT_LEAST_ONE_HOST_WITH_AVAILABLE_RESOURCES_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to ask is one host has some resources a...
Definition: ComputeServiceMessagePayload.h:60
wrench::ComputeServiceMessagePayload::TERMINATE_PILOT_JOB_ANSWER_MESSAGE_PAYLOAD
static const std::string TERMINATE_PILOT_JOB_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to acknowledge a pilot job termination.
Definition: ComputeServiceMessagePayload.h:54
wrench::FCFSBatchScheduler
A class that implements a FCFS batch scheduler.
Definition: FCFSBatchScheduler.h:25
wrench::ComputeServiceMessagePayload::SUBMIT_STANDARD_JOB_REQUEST_MESSAGE_PAYLOAD
static const std::string SUBMIT_STANDARD_JOB_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to submit a standard job.
Definition: ComputeServiceMessagePayload.h:26
wrench::ServiceMessagePayload::DAEMON_STOPPED_MESSAGE_PAYLOAD
static const std::string DAEMON_STOPPED_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to confirm it has terminated.
Definition: ServiceMessagePayload.h:33
wrench::ComputeServiceMessagePayload::TERMINATE_STANDARD_JOB_ANSWER_MESSAGE_PAYLOAD
static const std::string TERMINATE_STANDARD_JOB_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to acknowledge a standard job terminati...
Definition: ComputeServiceMessagePayload.h:36
wrench::BatchComputeServiceProperty::IGNORE_INVALID_JOBS_IN_WORKLOAD_TRACE_FILE
static const std::string IGNORE_INVALID_JOBS_IN_WORKLOAD_TRACE_FILE
Whether, when simulating a workload trace file, to abort when there is an invalid job specification (...
Definition: BatchComputeServiceProperty.h:104
wrench::BatchComputeServiceProperty::USE_REAL_RUNTIMES_AS_REQUESTED_RUNTIMES_IN_WORKLOAD_TRACE_FILE
static const std::string USE_REAL_RUNTIMES_AS_REQUESTED_RUNTIMES_IN_WORKLOAD_TRACE_FILE
Whether, when simulating a workload trace file, to use the actual runtimes as requested runtimes (i....
Definition: BatchComputeServiceProperty.h:95
wrench::ComputeServiceMessagePayload::RESOURCE_DESCRIPTION_REQUEST_MESSAGE_PAYLOAD
static const std::string RESOURCE_DESCRIPTION_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to ask it for information on its resour...
Definition: ComputeServiceMessagePayload.h:56
wrench::ComputeServiceMessagePayload::IS_THERE_AT_LEAST_ONE_HOST_WITH_AVAILABLE_RESOURCES_REQUEST_MESSAGE_PAYLOAD
static const std::string IS_THERE_AT_LEAST_ONE_HOST_WITH_AVAILABLE_RESOURCES_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message by the the daemon to state whether one host has some resou...
Definition: ComputeServiceMessagePayload.h:62
wrench::BatchComputeServiceProperty::BATSCHED_CONTIGUOUS_ALLOCATION
static const std::string BATSCHED_CONTIGUOUS_ALLOCATION
Controls Batsched node allocation policy.
Definition: BatchComputeServiceProperty.h:160
wrench::BatchComputeServiceProperty::HOST_SELECTION_ALGORITHM
static const std::string HOST_SELECTION_ALGORITHM
The host selection algorithm. Can be:
Definition: BatchComputeServiceProperty.h:62
wrench::ComputeService
The compute service base class.
Definition: ComputeService.h:33
wrench::BatchComputeServiceProperty::SUBMIT_TIME_OF_FIRST_JOB_IN_WORKLOAD_TRACE_FILE
static const std::string SUBMIT_TIME_OF_FIRST_JOB_IN_WORKLOAD_TRACE_FILE
A specification of the submit time of the first job in a provided trace file.
Definition: BatchComputeServiceProperty.h:111
wrench::CONSERVATIVEBFBatchSchedulerCoreLevel
A class that defines a conservative backfilling batch scheduler.
Definition: CONSERVATIVEBFBatchSchedulerCoreLevel.h:26
wrench::BatchComputeServiceProperty::SIMULATED_WORKLOAD_TRACE_FILE
static const std::string SIMULATED_WORKLOAD_TRACE_FILE
Path to a workload trace file to be replayed. The trace file can be be in the SWF format (see http://...
Definition: BatchComputeServiceProperty.h:85
wrench::ComputeServiceMessagePayload::TERMINATE_PILOT_JOB_REQUEST_MESSAGE_PAYLOAD
static const std::string TERMINATE_PILOT_JOB_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to terminate a pilot job.
Definition: ComputeServiceMessagePayload.h:52
wrench::BatchComputeService
A batch-scheduled compute service that manages a set of compute hosts and controls access to their re...
Definition: BatchComputeService.h:49
wrench::ComputeServiceMessagePayload::STANDARD_JOB_DONE_MESSAGE_PAYLOAD
static const std::string STANDARD_JOB_DONE_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state that it has completed a standa...
Definition: ComputeServiceMessagePayload.h:30
wrench::ComputeServiceMessagePayload::STANDARD_JOB_FAILED_MESSAGE_PAYLOAD
static const std::string STANDARD_JOB_FAILED_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state that a running standard job ha...
Definition: ComputeServiceMessagePayload.h:32
wrench
Definition: Alarm.cpp:20
wrench::ComputeServiceMessagePayload::TERMINATE_STANDARD_JOB_REQUEST_MESSAGE_PAYLOAD
static const std::string TERMINATE_STANDARD_JOB_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to terminate a standard job.
Definition: ComputeServiceMessagePayload.h:34
wrench::ComputeServiceMessagePayload::SUBMIT_PILOT_JOB_ANSWER_MESSAGE_PAYLOAD
static const std::string SUBMIT_PILOT_JOB_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent from the daemon to acknowledge a pilot job submission...
Definition: ComputeServiceMessagePayload.h:40
wrench::BatchComputeService::getQueue
std::vector< std::tuple< std::string, std::string, int, int, int, double, double > > getQueue()
Gets the state of the batch queue.
Definition: BatchComputeService.cpp:244
wrench::BatchComputeService::getStartTimeEstimates
std::map< std::string, double > getStartTimeEstimates(std::set< std::tuple< std::string, unsigned long, unsigned long, double >> resources)
Retrieve start time estimates for a set of job configurations.
Definition: BatchComputeService.cpp:219
wrench::BatschedBatchScheduler
A class that defines a batsched batch scheduler.
Definition: BatschedBatchScheduler.h:24
wrench::BatchComputeServiceProperty::TASK_STARTUP_OVERHEAD
static const std::string TASK_STARTUP_OVERHEAD
The overhead to start a task execution, in seconds.
Definition: BatchComputeServiceProperty.h:27
wrench::BatchComputeServiceProperty::BATSCHED_LOGGING_MUTED
static const std::string BATSCHED_LOGGING_MUTED
Controls Batsched logging.
Definition: BatchComputeServiceProperty.h:150
wrench::BatchComputeServiceProperty::TASK_SELECTION_ALGORITHM
static const std::string TASK_SELECTION_ALGORITHM
The algorithm to pick which ready computational task (within a standard job executed by the batch ser...
Definition: BatchComputeServiceProperty.h:72
wrench::WorkloadTraceFileReplayer
A service that goes through a job submission trace (as loaded by a TraceFileLoader),...
Definition: WorkloadTraceFileReplayer.h:28
wrench::Service::messagepayload_list
std::map< std::string, double > messagepayload_list
The service's messagepayload list.
Definition: Service.h:112
wrench::ComputeServiceMessagePayload::RESOURCE_DESCRIPTION_ANSWER_MESSAGE_PAYLOAD
static const std::string RESOURCE_DESCRIPTION_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state information on its resources.
Definition: ComputeServiceMessagePayload.h:58
wrench::BatchComputeServiceProperty::OUTPUT_CSV_JOB_LOG
static const std::string OUTPUT_CSV_JOB_LOG
Path to a to-be-generated Batsim-style CSV trace file (e.g. for b3atch schedule visualization purpose...
Definition: BatchComputeServiceProperty.h:122
wrench::ServiceMessagePayload::STOP_DAEMON_MESSAGE_PAYLOAD
static const std::string STOP_DAEMON_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to terminate it.
Definition: ServiceMessagePayload.h:31
wrench::ComputeServiceMessagePayload::PILOT_JOB_STARTED_MESSAGE_PAYLOAD
static const std::string PILOT_JOB_STARTED_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state that a pilot job has started.
Definition: ComputeServiceMessagePayload.h:42
wrench::S4U_Daemon::hostname
std::string hostname
The name of the host on which the daemon is running.
Definition: S4U_Daemon.h:51
wrench::ComputeServiceMessagePayload::SUBMIT_STANDARD_JOB_ANSWER_MESSAGE_PAYLOAD
static const std::string SUBMIT_STANDARD_JOB_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to acknowledge a standard job submissio...
Definition: ComputeServiceMessagePayload.h:28
wrench::BatchComputeServiceProperty::BATCH_SCHEDULING_ALGORITHM
static const std::string BATCH_SCHEDULING_ALGORITHM
The batch scheduling algorithm. Can be:
Definition: BatchComputeServiceProperty.h:43
wrench::ComputeServiceMessagePayload::SUBMIT_PILOT_JOB_REQUEST_MESSAGE_PAYLOAD
static const std::string SUBMIT_PILOT_JOB_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to submit a pilot job.
Definition: ComputeServiceMessagePayload.h:38
wrench::ComputeServiceProperty::SUPPORTS_PILOT_JOBS
static const std::string SUPPORTS_PILOT_JOBS
Whether the compute service supports pilot jobs (true or false)
Definition: ComputeServiceProperty.h:26
wrench::BatchComputeServiceProperty::BATCH_RJMS_PADDING_DELAY
static const std::string BATCH_RJMS_PADDING_DELAY
Integral number of seconds that the Batch Scheduler adds to the runtime of each incoming job....
Definition: BatchComputeServiceProperty.h:132
wrench::CONSERVATIVEBFBatchScheduler
A class that defines a conservative backfilling batch scheduler.
Definition: CONSERVATIVEBFBatchScheduler.h:26
wrench::BatchComputeService::parseUnsignedLongServiceSpecificArgument
static unsigned long parseUnsignedLongServiceSpecificArgument(std::string key, const std::map< std::string, std::string > &args)
Helper function for service-specific job arguments.
Definition: BatchComputeService.cpp:309
wrench::BatchComputeService::BatchComputeService
BatchComputeService(const std::string &hostname, std::vector< std::string > compute_hosts, std::string scratch_space_mount_point, std::map< std::string, std::string > property_list={}, std::map< std::string, double > messagepayload_list={})
Constructor.
Definition: BatchComputeService.cpp:59
wrench::BatchComputeServiceProperty::BATCH_QUEUE_ORDERING_ALGORITHM
static const std::string BATCH_QUEUE_ORDERING_ALGORITHM
The batch queue ordering algorithm. Can be:
Definition: BatchComputeServiceProperty.h:52
wrench::Service::property_list
std::map< std::string, std::string > property_list
The service's property list.
Definition: Service.h:109
wrench::ComputeServiceProperty::SUPPORTS_STANDARD_JOBS
static const std::string SUPPORTS_STANDARD_JOBS
Whether the compute service supports standard jobs (true or false)
Definition: ComputeServiceProperty.h:24