JobManager.h
1 
11 #ifndef WRENCH_PILOTJOBMANAGER_H
12 #define WRENCH_PILOTJOBMANAGER_H
13 
14 #include <vector>
15 #include <set>
16 
17 #include "wrench/services/Service.h"
18 
19 namespace wrench {
20 
21  class WMS;
22  class Workflow;
23  class WorkflowTask;
24  class WorkflowFile;
25  class WorkflowJob;
26  class PilotJob;
27  class StandardJob;
28  class ComputeService;
29  class StorageService;
30 
31  /***********************/
33  /***********************/
34 
35 
40  class JobManager : public Service {
41 
42  public:
43 
44 
45  void stop() override;
46 
47  void kill();
48 
49 
50  std::shared_ptr<StandardJob> createStandardJob(std::vector<WorkflowTask *> tasks,
51  std::map<WorkflowFile *, std::shared_ptr<FileLocation> > file_locations,
52  std::vector<std::tuple<WorkflowFile *, std::shared_ptr<FileLocation> , std::shared_ptr<FileLocation> >> pre_file_copies,
53  std::vector<std::tuple<WorkflowFile *, std::shared_ptr<FileLocation> , std::shared_ptr<FileLocation> >> post_file_copies,
54  std::vector<std::tuple<WorkflowFile *, std::shared_ptr<FileLocation> >> cleanup_file_deletions);
55 
56 
57  std::shared_ptr<StandardJob> createStandardJob(std::vector<WorkflowTask *> tasks,
58  std::map<WorkflowFile *,
59  std::shared_ptr<FileLocation> > file_locations);
60 
61  std::shared_ptr<StandardJob> createStandardJob(WorkflowTask *task,
62  std::map<WorkflowFile *,
63  std::shared_ptr<FileLocation> > file_locations);
64 
65  std::shared_ptr<PilotJob> createPilotJob();
66 
67  void submitJob(std::shared_ptr<WorkflowJob> job, std::shared_ptr<ComputeService> compute_service, std::map<std::string, std::string> service_specific_args = {});
68 
69  void terminateJob(std::shared_ptr<WorkflowJob> job);
70 
71 // void forgetJob(WorkflowJob *job);
72 
73  std::set<std::shared_ptr<PilotJob>> getPendingPilotJobs();
74 
75  std::set<std::shared_ptr<PilotJob>> getRunningPilotJobs();
76 
77  /***********************/
79  /***********************/
80 
81  ~JobManager() override;
82 
83  protected:
84 
85  friend class WMS;
86 
87  explicit JobManager(std::shared_ptr<WMS> wms);
88 
89  /***********************/
91  /***********************/
92 
93  private:
94 
95  int main() override;
96  bool processNextMessage();
97  void processStandardJobCompletion(std::shared_ptr<StandardJob>job, std::shared_ptr<ComputeService> compute_service);
98  void processStandardJobFailure(std::shared_ptr<StandardJob>job, std::shared_ptr<ComputeService> compute_service, std::shared_ptr<FailureCause> cause);
99  void processPilotJobStart(std::shared_ptr<PilotJob>job, std::shared_ptr<ComputeService> compute_service);
100  void processPilotJobExpiration(std::shared_ptr<PilotJob>job, std::shared_ptr<ComputeService> compute_service);
101 
102 
103 
104  // Relevant WMS
105  std::shared_ptr<WMS> wms;
106 
107  // Job lists
108  std::set<std::shared_ptr<StandardJob>> new_standard_jobs;
109  std::set<std::shared_ptr<StandardJob>> pending_standard_jobs;
110  std::set<std::shared_ptr<StandardJob>> running_standard_jobs;
111  std::set<std::shared_ptr<StandardJob>> completed_standard_jobs;
112  std::set<std::shared_ptr<StandardJob>> failed_standard_jobs;
113 
114  std::set<std::shared_ptr<PilotJob>> new_pilot_jobs;
115  std::set<std::shared_ptr<PilotJob>> pending_pilot_jobs;
116  std::set<std::shared_ptr<PilotJob>> running_pilot_jobs;
117  std::set<std::shared_ptr<PilotJob>> completed_pilot_jobs;
118 
119  };
120 
121  /***********************/
123  /***********************/
124 
125 };
126 
127 #endif //WRENCH_PILOTJOBMANAGER_H
std::shared_ptr< PilotJob > createPilotJob()
Create a pilot job.
Definition: JobManager.cpp:241
void terminateJob(std::shared_ptr< WorkflowJob > job)
Terminate a job (standard or pilot) that hasn't completed/expired/failed yet.
Definition: JobManager.cpp:371
std::set< std::shared_ptr< PilotJob > > getPendingPilotJobs()
Get the list of currently pending pilot jobs.
Definition: JobManager.cpp:438
void submitJob(std::shared_ptr< WorkflowJob > job, std::shared_ptr< ComputeService > compute_service, std::map< std::string, std::string > service_specific_args={})
Submit a job to compute service.
Definition: JobManager.cpp:270
void kill()
Kill the job manager (brutally terminate the daemon, clears all jobs)
Definition: JobManager.cpp:62
void stop() override
Stop the job manager.
Definition: JobManager.cpp:80
A helper daemon (co-located with and explicitly started by a WMS), which is used to handle all job ex...
Definition: JobManager.h:40
Definition: Alarm.cpp:20
A computational task in a Workflow.
Definition: WorkflowTask.h:31
std::shared_ptr< StandardJob > createStandardJob(std::vector< WorkflowTask * > tasks, std::map< WorkflowFile *, std::shared_ptr< FileLocation > > file_locations, std::vector< std::tuple< WorkflowFile *, std::shared_ptr< FileLocation >, std::shared_ptr< FileLocation > >> pre_file_copies, std::vector< std::tuple< WorkflowFile *, std::shared_ptr< FileLocation >, std::shared_ptr< FileLocation > >> post_file_copies, std::vector< std::tuple< WorkflowFile *, std::shared_ptr< FileLocation > >> cleanup_file_deletions)
Create a standard job.
Definition: JobManager.cpp:108
A service that can be added to the simulation and that can be used by a WMS when executing a workflow...
Definition: Service.h:26
std::set< std::shared_ptr< PilotJob > > getRunningPilotJobs()
Get the list of currently running pilot jobs.
Definition: JobManager.cpp:430
A data file used/produced by a WorkflowTask in a Workflow.
Definition: WorkflowFile.h:26
A workflow management system (WMS)
Definition: WMS.h:43