BatchComputeService.h
1 
11 #ifndef WRENCH_BATCH_SERVICE_H
12 #define WRENCH_BATCH_SERVICE_H
13 
14 #include "wrench/services/compute/ComputeService.h"
15 #include "wrench/services/compute/standard_job_executor/StandardJobExecutor.h"
16 #include "wrench/services/compute/batch/BatchJob.h"
17 #include "wrench/services/compute/batch/BatschedNetworkListener.h"
18 #include "wrench/services/compute/batch/BatchComputeServiceProperty.h"
19 #include "wrench/services/compute/batch/BatchComputeServiceMessagePayload.h"
20 #include "wrench/services/helpers/Alarm.h"
21 #include "wrench/workflow/job/StandardJob.h"
22 #include "wrench/workflow/job/WorkflowJob.h"
23 #include <deque>
24 #include <queue>
25 #include <set>
26 #include <tuple>
27 
28 namespace wrench {
29 
30  class WorkloadTraceFileReplayer; // forward
31 
48 
52  private:
53 
54  std::map<std::string, std::string> default_property_values = {
60 #ifdef ENABLE_BATSCHED
62 // {BatchComputeServiceProperty::BATCH_SCHEDULING_ALGORITHM, "easy_bf"},
63 // {BatchComputeServiceProperty::BATCH_SCHEDULING_ALGORITHM, "easy_bf_fast"},
65 #else
67 #endif
77  };
78 
79  std::map<std::string, double> default_messagepayload_values = {
96  };
97 
98  public:
99  BatchComputeService(std::string &hostname,
100  std::vector<std::string> compute_hosts,
101  std::string scratch_space_mount_point,
102  std::map<std::string, std::string> property_list = {},
103  std::map<std::string, double> messagepayload_list = {}
104  );
105 
106  /***********************/
108  /***********************/
109  std::map<std::string,double> getStartTimeEstimates(std::set<std::tuple<std::string,unsigned int,unsigned int, double>> resources);
110 
111  /***********************/
113  /***********************/
114 
115  /***********************/
117  /***********************/
118  ~BatchComputeService() override;
119  /***********************/
121  /***********************/
122 
123  private:
124 
125 
126 
127  friend class WorkloadTraceFileReplayer;
128 
129  BatchComputeService(std::string hostname,
130  std::vector<std::string> compute_hosts,
131  unsigned long cores_per_host,
132  double ram_per_host,
133  std::string scratch_space_mount_point,
134  std::map<std::string, std::string> property_list,
135  std::map<std::string, double> messagepayload_list,
136  std::string suffix
137  );
138 
139  // helper function
140  static unsigned long parseUnsignedLongServiceSpecificArgument(std::string key, std::map<std::string, std::string> &args);
141 
142  // helper function
143  void submitWorkflowJob(WorkflowJob *job, std::map<std::string, std::string> &batch_job_args);
144 
145  //submits a standard job
146  void submitStandardJob(StandardJob *job, std::map<std::string, std::string> &batch_job_args) override;
147 
148  //submits a standard job
149  void submitPilotJob(PilotJob *job, std::map<std::string, std::string> &batch_job_args) override;
150 
151  // helper function
152  void terminateWorkflowJob(WorkflowJob *job);
153 
154  // terminate a standard job
155  void terminateStandardJob(StandardJob *job) override;
156 
157  // terminate a pilot job
158  void terminatePilotJob(PilotJob *job) override;
159 
160  std::vector<std::tuple<std::string, double, double, double, double, unsigned int>> workload_trace;
161  std::shared_ptr<WorkloadTraceFileReplayer> workload_trace_replayer;
162 
163  bool clean_exit = false;
164 
165  //Configuration to create randomness in measurement period initially
166  unsigned long random_interval = 10;
167 
168  //create alarms for standard jobs
169  std::map<std::string,std::shared_ptr<Alarm>> standard_job_alarms;
170 
171  //alarms for pilot jobs (only one pilot job alarm)
172  std::map<std::string,std::shared_ptr<Alarm>> pilot_job_alarms;
173 
174 
175  /* Resources information in Batchservice */
176  unsigned long total_num_of_nodes;
177  unsigned long num_cores_per_node;
178  std::map<std::string, unsigned long> nodes_to_cores_map;
179  std::vector<double> timeslots;
180  std::map<std::string, unsigned long> available_nodes_to_cores;
181  std::map<unsigned long, std::string> host_id_to_names;
182  std::vector<std::string> compute_hosts;
183  /*End Resources information in Batchservice */
184 
185  // Vector of standard job executors
186  std::set<std::shared_ptr<StandardJobExecutor>> running_standard_job_executors;
187 
188  // Vector of standard job executors (which is cleared periodically)
189  std::set<std::shared_ptr<StandardJobExecutor>> finished_standard_job_executors;
190 
191  // Master List of batch jobs
192  std::set<std::unique_ptr<BatchJob>> all_jobs;
193 
194  //Queue of pending batch jobs
195  std::deque<BatchJob *> pending_jobs;
196 
197  //A set of running batch jobs
198  std::set<BatchJob *> running_jobs;
199 
200  // A set of waiting jobs that have been submitted to batsched, but not scheduled
201  std::set<BatchJob *> waiting_jobs;
202 
203 
204 #ifdef ENABLE_BATSCHED
205 
206  std::set<std::string> scheduling_algorithms = {"conservative_bf", "crasher", "easy_bf", "easy_bf_fast",
207  "easy_bf_plot_liquid_load_horizon",
208  "energy_bf", "energy_bf_dicho", "energy_bf_idle_sleeper",
209  "energy_bf_monitoring",
210  "energy_bf_monitoring_inertial", "energy_bf_subpart_sleeper",
211  "energy_watcher", "fcfs_fast", "fast_conservative_bf",
212  "filler", "killer", "killer2", "random", "rejecter",
213  "sequencer", "sleeper", "submitter", "waiting_time_estimator"
214  };
215 
216  std::set<std::string> queue_ordering_options = {"fcfs", "lcfs", "desc_bounded_slowdown", "desc_slowdown",
217  "asc_size", "desc_size", "asc_walltime", "desc_walltime"
218 
219  };
220 #else
221  std::set<std::string> scheduling_algorithms = {"FCFS"
222  };
223 
224  //Batch queue ordering options
225  std::set<std::string> queue_ordering_options = {
226  };
227 
228 #endif
229 
230 
231  unsigned long generateUniqueJobID();
232 
233  void removeJobFromRunningList(BatchJob *job);
234 
235  void freeJobFromJobsList(BatchJob* job);
236 
237  int main() override;
238 
239  bool processNextMessage();
240 
241  void startBackgroundWorkloadProcess();
242 
243  void processGetResourceInformation(const std::string &answer_mailbox);
244 
245  void processStandardJobCompletion(std::shared_ptr<StandardJobExecutor> executor, StandardJob *job);
246 
247  void processStandardJobFailure(std::shared_ptr<StandardJobExecutor> executor,
248  StandardJob *job,
249  std::shared_ptr<FailureCause> cause);
250 
251  void terminateRunningStandardJob(StandardJob *job);
252 
253  std::map<std::string, std::tuple<unsigned long, double>> scheduleOnHosts(std::string host_selection_algorithm,
254  unsigned long, unsigned long, double);
255 
256  BatchJob *pickJobForScheduling(std::string);
257 
258  //Terminate the batch service (this is usually for pilot jobs when they act as a batch service)
259  void cleanup(bool has_returned_from_main, int return_value) override;
260 
261  // Terminate currently running pilot jobs
262  void terminateRunningPilotJobs();
263 
264  //Fail the standard jobs
265  void failCurrentStandardJobs();
266 
267  //Process the pilot job completion
268  void processPilotJobCompletion(PilotJob *job);
269 
270  //Process standardjob timeout
271  void processStandardJobTimeout(StandardJob *job);
272 
273  //process standard job termination request
274  void processStandardJobTerminationRequest(StandardJob *job, std::string answer_mailbox);
275 
276  //process pilot job termination request
277  void processPilotJobTerminationRequest(PilotJob *job, std::string answer_mailbox);
278 
279  // process a batch job tiemout event
280  void processAlarmJobTimeout(BatchJob *job);
281 
282  //Process pilot job timeout
283  void processPilotJobTimeout(PilotJob *job);
284 
285  //free up resources
286  void freeUpResources(std::map<std::string, std::tuple<unsigned long, double>> resources);
287 
288  //send call back to the pilot job submitters
289  void sendPilotJobExpirationNotification(PilotJob *job);
290 
291  //send call back to the standard job submitters
292  void sendStandardJobFailureNotification(StandardJob *job, std::string job_id, std::shared_ptr<FailureCause> cause);
293 
294  // Try to schedule a job
295  bool scheduleOneQueuedJob();
296 
297  // process a job submission
298  void processJobSubmission(BatchJob *job, std::string answer_mailbox);
299 
300 
301  //start a job
302  void startJob(std::map<std::string, std::tuple<unsigned long, double>>, WorkflowJob *,
303  BatchJob *, unsigned long, unsigned long, unsigned long);
304 
305 
306 
307  //vector of network listeners (only useful when ENABLE_BATSCHED == on)
308 
309  std::map<std::string,double> getStartTimeEstimatesForFCFS(std::set<std::tuple<std::string,unsigned int,unsigned int, double>>);
310 
311 
313  std::set<std::shared_ptr<BatschedNetworkListener>> network_listeners;
314  pid_t pid;
315  unsigned long batsched_port;
316 
317 
318 #ifdef ENABLE_BATSCHED
319  friend class BatschedNetworkListener;
320 
321  void startBatsched();
322  void stopBatsched();
323 
324  std::map<std::string,double> getStartTimeEstimatesFromBatsched(std::set<std::tuple<std::string,unsigned int,unsigned int,double>>);
325 
326  void startBatschedNetworkListener();
327 
328  void notifyJobEventsToBatSched(std::string job_id, std::string status, std::string job_state,
329  std::string kill_reason, std::string even_type);
330 
331  void appendJobInfoToCSVOutputFile(BatchJob *batch_job, std::string status);
332 
333  void sendAllQueuedJobsToBatsched();
334 
335  //process execute events from batsched
336  void processExecuteJobFromBatSched(std::string bat_sched_reply);
337 
338 #endif // ENABLE_BATSCHED
339 
340 
341  };
342 }
343 
344 
345 #endif //WRENCH_BATCH_SERVICE_H
static const std::string PILOT_JOB_EXPIRED_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state that a pilot job has expired...
Definition: ComputeServiceMessagePayload.h:44
static const std::string USE_REAL_RUNTIMES_AS_REQUESTED_RUNTIMES_IN_WORKLOAD_TRACE_FILE
Whether, when simulating a workload trace file, to use the actual runtimes as requested runtimes (i...
Definition: BatchComputeServiceProperty.h:91
static const std::string STOP_DAEMON_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to terminate it.
Definition: ServiceMessagePayload.h:31
static const std::string TERMINATE_STANDARD_JOB_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to terminate a standard job...
Definition: ComputeServiceMessagePayload.h:34
static const std::string TERMINATE_PILOT_JOB_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to acknowledge a pilot job termination...
Definition: ComputeServiceMessagePayload.h:54
static const std::string SUBMIT_PILOT_JOB_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent from the daemon to acknowledge a pilot job submission...
Definition: ComputeServiceMessagePayload.h:40
static const std::string SUBMIT_STANDARD_JOB_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to acknowledge a standard job submissio...
Definition: ComputeServiceMessagePayload.h:28
static const std::string SIMULATED_WORKLOAD_TRACE_FILE
Path to a workload trace file to be replayed. The trace file can be be in the SWF format (see http://...
Definition: BatchComputeServiceProperty.h:81
static const std::string TERMINATE_PILOT_JOB_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to terminate a pilot job...
Definition: ComputeServiceMessagePayload.h:52
static const std::string STANDARD_JOB_FAILED_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state that a running standard job ha...
Definition: ComputeServiceMessagePayload.h:32
BatchComputeService(std::string &hostname, std::vector< std::string > compute_hosts, std::string scratch_space_mount_point, std::map< std::string, std::string > property_list={}, std::map< std::string, double > messagepayload_list={})
Constructor.
Definition: BatchComputeService.cpp:95
static const std::string BATSCHED_CONTIGUOUS_ALLOCATION
Controls Batsched node allocation policy.
Definition: BatchComputeServiceProperty.h:156
static const std::string BATCH_RJMS_DELAY
Integral number of seconds that the Batch Scheduler adds to the runtime of each incoming job...
Definition: BatchComputeServiceProperty.h:128
static const std::string IGNORE_INVALID_JOBS_IN_WORKLOAD_TRACE_FILE
Whether, when simulating a workload trace file, to abort when there is an invalid job specification (...
Definition: BatchComputeServiceProperty.h:100
static const std::string TERMINATE_STANDARD_JOB_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to acknowledge a standard job terminati...
Definition: ComputeServiceMessagePayload.h:36
static const std::string OUTPUT_CSV_JOB_LOG
Path to a to-be-generated Batsim-style CSV trace file (e.g. for b3atch schedule visualization purpose...
Definition: BatchComputeServiceProperty.h:118
static const std::string STANDARD_JOB_DONE_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state that it has completed a standa...
Definition: ComputeServiceMessagePayload.h:30
static const std::string TASK_STARTUP_OVERHEAD
The overhead to start a task execution, in seconds.
Definition: BatchComputeServiceProperty.h:27
static const std::string BATCH_SCHEDULING_ALGORITHM
The batch scheduling algorithm. Can be:
Definition: BatchComputeServiceProperty.h:39
static const std::string SUBMIT_PILOT_JOB_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to submit a pilot job.
Definition: ComputeServiceMessagePayload.h:38
The compute service base class.
Definition: ComputeService.h:35
static const std::string HOST_SELECTION_ALGORITHM
The host selection algorithm. Can be:
Definition: BatchComputeServiceProperty.h:58
static const std::string SUPPORTS_PILOT_JOBS
Whether the compute service supports pilot jobs (true or false)
Definition: ComputeServiceProperty.h:26
static const std::string SUBMIT_TIME_OF_FIRST_JOB_IN_WORKLOAD_TRACE_FILE
A specification of the submit time of the first job in a provided trace file.
Definition: BatchComputeServiceProperty.h:107
static const std::string TASK_SELECTION_ALGORITHM
The algorithm to pick which ready computational task (within a standard job executed by the batch ser...
Definition: BatchComputeServiceProperty.h:68
static const std::string BATCH_QUEUE_ORDERING_ALGORITHM
The batch queue ordering algorithm. Can be:
Definition: BatchComputeServiceProperty.h:48
static const std::string RESOURCE_DESCRIPTION_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to ask it for information on its resour...
Definition: ComputeServiceMessagePayload.h:56
static const std::string SUPPORTS_STANDARD_JOBS
Whether the compute service supports standard jobs (true or false)
Definition: ComputeServiceProperty.h:24
static const std::string SIMULATE_COMPUTATION_AS_SLEEP
Simulate computation as just a sleep instead of an actual compute thread. This is for scalability rea...
Definition: BatchComputeServiceProperty.h:138
A batch-scheduled compute service that manages a set of compute hosts and controls access to their re...
Definition: BatchComputeService.h:47
static const std::string SUBMIT_STANDARD_JOB_REQUEST_MESSAGE_PAYLOAD
The number of bytes in the control message sent to the daemon to submit a standard job...
Definition: ComputeServiceMessagePayload.h:26
static const std::string BATSCHED_LOGGING_MUTED
Controls Batsched logging.
Definition: BatchComputeServiceProperty.h:146
static const std::string PILOT_JOB_STARTED_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state that a pilot job has started...
Definition: ComputeServiceMessagePayload.h:42
static const std::string DAEMON_STOPPED_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to confirm it has terminated.
Definition: ServiceMessagePayload.h:33
static const std::string RESOURCE_DESCRIPTION_ANSWER_MESSAGE_PAYLOAD
The number of bytes in the control message sent by the daemon to state information on its resources...
Definition: ComputeServiceMessagePayload.h:58
Definition: Alarm.cpp:19