BatchService.h
1 
11 #ifndef WRENCH_BATCH_SERVICE_H
12 #define WRENCH_BATCH_SERVICE_H
13 
14 #include "wrench/services/compute/ComputeService.h"
15 #include "wrench/services/compute/standard_job_executor/StandardJobExecutor.h"
16 #include "wrench/services/compute/batch/BatchJob.h"
17 #include "wrench/services/compute/batch/BatschedNetworkListener.h"
18 #include "wrench/services/compute/batch/BatchServiceProperty.h"
19 #include "wrench/services/compute/batch/BatchServiceMessagePayload.h"
20 #include "wrench/services/helpers/Alarm.h"
21 #include "wrench/workflow/job/StandardJob.h"
22 #include "wrench/workflow/job/WorkflowJob.h"
23 #include <deque>
24 #include <queue>
25 #include <set>
26 #include <tuple>
27 
28 namespace wrench {
29 
30  class WorkloadTraceFileReplayer; // forward
31 
47  class BatchService : public ComputeService {
48 
52  private:
53 
54  std::map<std::string, std::string> default_property_values = {
60  #ifdef ENABLE_BATSCHED
62 // {BatchServiceProperty::BATCH_SCHEDULING_ALGORITHM, "easy_bf"},
63 // {BatchServiceProperty::BATCH_SCHEDULING_ALGORITHM, "easy_bf_fast"},
65  #else
67  #endif
75  };
76 
77  std::map<std::string, std::string> default_messagepayload_values = {
94  };
95 
96  public:
97  BatchService(std::string &hostname,
98  std::vector<std::string> compute_hosts,
99  double scratch_space_size,
100  std::map<std::string, std::string> property_list = {},
101  std::map<std::string, std::string> messagepayload_list = {}
102  );
103 
104  /***********************/
106  /***********************/
107  std::map<std::string,double> getStartTimeEstimates(std::set<std::tuple<std::string,unsigned int,unsigned int, double>>);
108 
109  /***********************/
111  /***********************/
112 
113  /***********************/
115  /***********************/
116  ~BatchService() override;
117  /***********************/
119  /***********************/
120 
121 
122  private:
123  friend class WorkloadTraceFileReplayer;
124 
125  BatchService(std::string hostname,
126  std::vector<std::string> compute_hosts,
127  unsigned long cores_per_host,
128  double ram_per_host,
129  double scratch_space_size,
130  std::map<std::string, std::string> property_list,
131  std::map<std::string, std::string> messagepayload_list,
132  std::string suffix
133  );
134 
135  // helper function
136  static unsigned long parseUnsignedLongServiceSpecificArgument(std::string key, std::map<std::string, std::string> &args);
137 
138  // helper function
139  void submitWorkflowJob(WorkflowJob *job, std::map<std::string, std::string> &batch_job_args);
140 
141  //submits a standard job
142  void submitStandardJob(StandardJob *job, std::map<std::string, std::string> &batch_job_args) override;
143 
144  //submits a standard job
145  void submitPilotJob(PilotJob *job, std::map<std::string, std::string> &batch_job_args) override;
146 
147  // helper function
148  void terminateWorkflowJob(WorkflowJob *job);
149 
150  // terminate a standard job
151  void terminateStandardJob(StandardJob *job) override;
152 
153  // terminate a pilot job
154  void terminatePilotJob(PilotJob *job) override;
155 
156  std::vector<std::tuple<std::string, double, double, double, double, unsigned int>> workload_trace;
157  std::shared_ptr<WorkloadTraceFileReplayer> workload_trace_replayer;
158 
159  bool clean_exit = false;
160 
161  //Configuration to create randomness in measurement period initially
162  unsigned long random_interval = 10;
163 
164  //create alarms for standard jobs
165  std::map<std::string,std::shared_ptr<Alarm>> standard_job_alarms;
166 
167  //alarms for pilot jobs (only one pilot job alarm)
168  std::map<std::string,std::shared_ptr<Alarm>> pilot_job_alarms;
169 
170 
171  /* Resources information in Batchservice */
172  unsigned long total_num_of_nodes;
173  unsigned long num_cores_per_node;
174  std::map<std::string, unsigned long> nodes_to_cores_map;
175  std::vector<double> timeslots;
176  std::map<std::string, unsigned long> available_nodes_to_cores;
177  std::map<unsigned long, std::string> host_id_to_names;
178  std::vector<std::string> compute_hosts;
179  /*End Resources information in Batchservice */
180 
181  // Vector of standard job executors
182  std::set<std::shared_ptr<StandardJobExecutor>> running_standard_job_executors;
183 
184  // Vector of standard job executors (which is cleared periodically)
185  std::set<std::shared_ptr<StandardJobExecutor>> finished_standard_job_executors;
186 
187  // Master List of batch jobs
188  std::set<std::unique_ptr<BatchJob>> all_jobs;
189 
190  //Queue of pending batch jobs
191  std::deque<BatchJob *> pending_jobs;
192 
193  //A set of running batch jobs
194  std::set<BatchJob *> running_jobs;
195 
196  // A set of waiting jobs that have been submitted to batsched, but not scheduled
197  std::set<BatchJob *> waiting_jobs;
198 
199 
200 #ifdef ENABLE_BATSCHED
201 
202  std::set<std::string> scheduling_algorithms = {"conservative_bf", "crasher", "easy_bf", "easy_bf_fast",
203  "easy_bf_plot_liquid_load_horizon",
204  "energy_bf", "energy_bf_dicho", "energy_bf_idle_sleeper",
205  "energy_bf_monitoring",
206  "energy_bf_monitoring_inertial", "energy_bf_subpart_sleeper",
207  "energy_watcher", "fcfs_fast", "fast_conservative_bf",
208  "filler", "killer", "killer2", "random", "rejecter",
209  "sequencer", "sleeper", "submitter", "waiting_time_estimator"
210  };
211 
212  std::set<std::string> queue_ordering_options = {"fcfs", "lcfs", "desc_bounded_slowdown", "desc_slowdown",
213  "asc_size", "desc_size", "asc_walltime", "desc_walltime"
214 
215  };
216 #else
217  std::set<std::string> scheduling_algorithms = {"FCFS"
218  };
219 
220  //Batch queue ordering options
221  std::set<std::string> queue_ordering_options = {
222  };
223 
224 #endif
225 
226 
227  unsigned long generateUniqueJobID();
228 
229  void removeJobFromRunningList(BatchJob *job);
230 
231  void freeJobFromJobsList(BatchJob* job);
232 
233  int main() override;
234 
235  bool processNextMessage();
236 
237  void startBackgroundWorkloadProcess();
238 
239  void processGetResourceInformation(const std::string &answer_mailbox);
240 
241  void processStandardJobCompletion(StandardJobExecutor *executor, StandardJob *job);
242 
243  void processStandardJobFailure(StandardJobExecutor *executor,
244  StandardJob *job,
245  std::shared_ptr<FailureCause> cause);
246 
247  void terminateRunningStandardJob(StandardJob *job);
248 
249  std::map<std::string, std::tuple<unsigned long, double>> scheduleOnHosts(std::string host_selection_algorithm,
250  unsigned long, unsigned long, double);
251 
252  BatchJob *pickJobForScheduling(std::string);
253 
254  //Terminate the batch service (this is usually for pilot jobs when they act as a batch service)
255  void cleanup() override;
256 
257  // Terminate currently running pilot jobs
258  void terminateRunningPilotJobs();
259 
260  //Fail the standard jobs
261  void failCurrentStandardJobs();
262 
263  //Process the pilot job completion
264  void processPilotJobCompletion(PilotJob *job);
265 
266  //Process standardjob timeout
267  void processStandardJobTimeout(StandardJob *job);
268 
269  //process standard job termination request
270  void processStandardJobTerminationRequest(StandardJob *job, std::string answer_mailbox);
271 
272  //process pilot job termination request
273  void processPilotJobTerminationRequest(PilotJob *job, std::string answer_mailbox);
274 
275  //Process standardjob timeout
276  void processPilotJobTimeout(PilotJob *job);
277 
278  //free up resources
279  void freeUpResources(std::map<std::string, std::tuple<unsigned long, double>> resources);
280 
281  //send call back to the pilot job submitters
282  void sendPilotJobExpirationNotification(PilotJob *job);
283 
284  //send call back to the standard job submitters
285  void sendStandardJobFailureNotification(StandardJob *job, std::string job_id, std::shared_ptr<FailureCause> cause);
286 
287  // Try to schedule a job
288  bool scheduleOneQueuedJob();
289 
290  // process a job submission
291  void processJobSubmission(BatchJob *job, std::string answer_mailbox);
292 
293 
294  //start a job
295  void startJob(std::map<std::string, std::tuple<unsigned long, double>>, WorkflowJob *,
296  BatchJob *, unsigned long, double, unsigned long);
297 
298 
299 
300  //vector of network listeners (only useful when ENABLE_BATSCHED == on)
301 
302  std::map<std::string,double> getStartTimeEstimatesForFCFS(std::set<std::tuple<std::string,unsigned int,unsigned int, double>>);
303 
304 
306  std::set<std::shared_ptr<BatschedNetworkListener>> network_listeners;
307  pid_t pid;
308  unsigned long batsched_port;
309 
310 
311 #ifdef ENABLE_BATSCHED
312  friend class BatschedNetworkListener;
313 
314  void startBatsched();
315  void stopBatsched();
316 
317  std::map<std::string,double> getStartTimeEstimatesFromBatsched(std::set<std::tuple<std::string,unsigned int,unsigned int,double>>);
318 
319  void startBatschedNetworkListener();
320 
321  void notifyJobEventsToBatSched(std::string job_id, std::string status, std::string job_state,
322  std::string kill_reason, std::string even_type);
323 
324  void appendJobInfoToCSVOutputFile(BatchJob *batch_job, std::string status);
325 
326  void sendAllQueuedJobsToBatsched();
327 
328  //process execute events from batsched
329  void processExecuteJobFromBatSched(std::string bat_sched_reply);
330 
331 #endif // ENABLE_BATSCHED
332 
333 
334  };
335 }
336 
337 
338 #endif //WRENCH_BATCH_SERVICE_H
static const std::string PILOT_JOB_EXPIRED_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state that a pilot job has expired...
Definition: ComputeServiceMessagePayload.h:44
A batch-scheduled compute service that manages a set of compute hosts and controls access to their re...
Definition: BatchService.h:47
static const std::string HOST_SELECTION_ALGORITHM
The host selection algorithm. Can be:
Definition: BatchServiceProperty.h:58
static const std::string SIMULATE_COMPUTATION_AS_SLEEP
Simulate computation as just a sleep instead of an actual comput thread. This is for scalability reas...
Definition: BatchServiceProperty.h:120
static const std::string BATCH_RJMS_DELAY
Number of seconds that the Batch Scheduler adds to the runtime of each incoming job. This is something production batch systems do to avoid too aggressive job terminations. For instance, if a job says it wants to run for (at most) 60 seconds, the system will actually assume the job wants to run for (at most) 60 + 5 seconds.
Definition: BatchServiceProperty.h:112
static const std::string STOP_DAEMON_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to terminate it.
Definition: ServiceMessagePayload.h:31
static const std::string TERMINATE_STANDARD_JOB_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to terminate a standard job...
Definition: ComputeServiceMessagePayload.h:34
static const std::string TERMINATE_PILOT_JOB_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to acknowledge a pilot job termination...
Definition: ComputeServiceMessagePayload.h:54
static const std::string SUBMIT_PILOT_JOB_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent from the daemon to acknowledge a pilot job submission...
Definition: ComputeServiceMessagePayload.h:40
static const std::string SUBMIT_STANDARD_JOB_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to acknowledge a standard job submissio...
Definition: ComputeServiceMessagePayload.h:28
static const std::string TERMINATE_PILOT_JOB_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to terminate a pilot job...
Definition: ComputeServiceMessagePayload.h:52
static const std::string STANDARD_JOB_FAILED_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state that a running standard job ha...
Definition: ComputeServiceMessagePayload.h:32
static const std::string BATSCHED_CONTIGUOUS_ALLOCATION
Controls Batsched node allocation policy.
Definition: BatchServiceProperty.h:138
static const std::string SIMULATED_WORKLOAD_TRACE_FILE
Path to a workload trace file to be replayed. The trace file can be be in the SWF format (see http://...
Definition: BatchServiceProperty.h:81
static const std::string TERMINATE_STANDARD_JOB_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to acknowledge a standard job terminati...
Definition: ComputeServiceMessagePayload.h:36
static const std::string BATCH_SCHEDULING_ALGORITHM
The batch scheduling algorithm. Can be:
Definition: BatchServiceProperty.h:39
static const std::string STANDARD_JOB_DONE_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state that it has completed a standa...
Definition: ComputeServiceMessagePayload.h:30
static const std::string OUTPUT_CSV_JOB_LOG
Path to a to-be-generated Batsim-style CSV trace file (e.g. for b3atch schedule visualization purpose...
Definition: BatchServiceProperty.h:102
static const std::string THREAD_STARTUP_OVERHEAD
The overhead to start a thread execution, in seconds.
Definition: BatchServiceProperty.h:27
static const std::string SUBMIT_PILOT_JOB_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to submit a pilot job.
Definition: ComputeServiceMessagePayload.h:38
The compute service base class.
Definition: ComputeService.h:35
BatchService(std::string &hostname, std::vector< std::string > compute_hosts, double scratch_space_size, std::map< std::string, std::string > property_list={}, std::map< std::string, std::string > messagepayload_list={})
Constructor.
Definition: BatchService.cpp:98
static const std::string TASK_SELECTION_ALGORITHM
The algorithm to pick which ready computational task (within a standard job executed by the batch ser...
Definition: BatchServiceProperty.h:68
static const std::string BATSCHED_LOGGING_MUTED
Controls Batsched logging.
Definition: BatchServiceProperty.h:128
static const std::string SUPPORTS_PILOT_JOBS
Whether the compute service supports pilot jobs (true or false)
Definition: ComputeServiceProperty.h:26
static const std::string RESOURCE_DESCRIPTION_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to ask it for information on its resour...
Definition: ComputeServiceMessagePayload.h:56
static const std::string USE_REAL_RUNTIMES_AS_REQUESTED_RUNTIMES
Whether, when simulating a workload trace file, to use the actual runtimes as requested runtimes (i...
Definition: BatchServiceProperty.h:91
static const std::string BATCH_QUEUE_ORDERING_ALGORITHM
The batch queue ordering algorithm. Can be:
Definition: BatchServiceProperty.h:48
static const std::string SUPPORTS_STANDARD_JOBS
Whether the compute service supports standard jobs (true or false)
Definition: ComputeServiceProperty.h:24
static const std::string SUBMIT_STANDARD_JOB_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to submit a standard job...
Definition: ComputeServiceMessagePayload.h:26
static const std::string PILOT_JOB_STARTED_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state that a pilot job has started...
Definition: ComputeServiceMessagePayload.h:42
static const std::string DAEMON_STOPPED_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to confirm it has terminated.
Definition: ServiceMessagePayload.h:33
static const std::string RESOURCE_DESCRIPTION_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state information on its resources...
Definition: ComputeServiceMessagePayload.h:58
Definition: TerminalOutput.cpp:15