WRENCH  1.11
Cyberinfrastructure Simulation Workbench
Overview Installation Getting Started WRENCH 101 WRENCH 102
JobManager.h
1 
11 #ifndef WRENCH_PILOTJOBMANAGER_H
12 #define WRENCH_PILOTJOBMANAGER_H
13 
14 #include <vector>
15 #include <set>
16 
17 #include "wrench/services/Service.h"
18 #include "wrench/services/storage/storage_helpers/FileLocation.h"
19 
20 
21 namespace wrench {
22 
23  class FailureCause;
24 
25  class WMS;
26 
27  class ExecutionController;
28 
29  class Workflow;
30 
31  class WorkflowTask;
32 
33  class DataFile;
34 
35  class Job;
36 
37  class PilotJob;
38 
39  class CompoundJob;
40 
41  class StandardJob;
42 
43  class ComputeService;
44 
45  class StorageService;
46 
47  /***********************/
49  /***********************/
50 
51 
56  class JobManager : public Service {
57 
58  public:
59 
60  void stop() override;
61 
62  void kill();
63 
64  std::shared_ptr<CompoundJob> createCompoundJob(std::string name);
65 
66  std::shared_ptr<StandardJob> createStandardJob(const std::vector<std::shared_ptr<WorkflowTask>>& tasks,
67  const std::map<std::shared_ptr<DataFile>, std::shared_ptr<FileLocation> >& file_locations,
68  std::vector<std::tuple<std::shared_ptr<DataFile>, std::shared_ptr<FileLocation>, std::shared_ptr<FileLocation> >> pre_file_copies,
69  std::vector<std::tuple<std::shared_ptr<DataFile>, std::shared_ptr<FileLocation>, std::shared_ptr<FileLocation> >> post_file_copies,
70  std::vector<std::tuple<std::shared_ptr<DataFile>, std::shared_ptr<FileLocation> >> cleanup_file_deletions);
71 
72  std::shared_ptr<StandardJob> createStandardJob(const std::vector<std::shared_ptr<WorkflowTask>>& tasks,
73  std::map<std::shared_ptr<DataFile>, std::vector<std::shared_ptr<FileLocation>>> file_locations,
74  std::vector<std::tuple<std::shared_ptr<DataFile>, std::shared_ptr<FileLocation>, std::shared_ptr<FileLocation> >> pre_file_copies,
75  std::vector<std::tuple<std::shared_ptr<DataFile>, std::shared_ptr<FileLocation>, std::shared_ptr<FileLocation> >> post_file_copies,
76  std::vector<std::tuple<std::shared_ptr<DataFile>, std::shared_ptr<FileLocation> >> cleanup_file_deletions);
77 
78 
79  std::shared_ptr<StandardJob> createStandardJob(const std::vector<std::shared_ptr<WorkflowTask>>& tasks,
80  const std::map<std::shared_ptr<DataFile>, std::shared_ptr<FileLocation>>& file_locations);
81 
82  std::shared_ptr<StandardJob> createStandardJob(const std::vector<std::shared_ptr<WorkflowTask>>& tasks,
83  std::map<std::shared_ptr<DataFile>, std::vector<std::shared_ptr<FileLocation>>> file_locations);
84 
85  std::shared_ptr<StandardJob> createStandardJob(const std::shared_ptr<WorkflowTask>&task,
86  const std::map<std::shared_ptr<DataFile>, std::shared_ptr<FileLocation>>& file_locations);
87 
88  std::shared_ptr<StandardJob> createStandardJob(const std::shared_ptr<WorkflowTask>&task,
89  std::map<std::shared_ptr<DataFile>, std::vector<std::shared_ptr<FileLocation>>> file_locations);
90 
91  std::shared_ptr<StandardJob> createStandardJob(const std::vector<std::shared_ptr<WorkflowTask>>& tasks);
92 
93  std::shared_ptr<StandardJob> createStandardJob(const std::shared_ptr<WorkflowTask>&task);
94 
95  std::shared_ptr<PilotJob> createPilotJob();
96 
97  void submitJob(const std::shared_ptr<StandardJob>& job, const std::shared_ptr<ComputeService>& compute_service,
98  std::map<std::string, std::string> service_specific_args = {});
99 
100  void submitJob(const std::shared_ptr<CompoundJob>& job, const std::shared_ptr<ComputeService>& compute_service,
101  std::map<std::string, std::string> service_specific_args = {});
102 
103  void submitJob(const std::shared_ptr<PilotJob>& job, const std::shared_ptr<ComputeService>& compute_service,
104  std::map<std::string, std::string> service_specific_args = {});
105 
106  void terminateJob(const std::shared_ptr<StandardJob>& job);
107 
108  void terminateJob(const std::shared_ptr<CompoundJob>& job);
109 
110  void terminateJob(const std::shared_ptr<PilotJob>& job);
111 
112  simgrid::s4u::Mailbox *getCreatorMailbox();
113 
114  unsigned long getNumRunningPilotJobs() const;
115 
116  /***********************/
118  /***********************/
119 
120  ~JobManager() override;
121 
122  protected:
123 
124  friend class ExecutionController;
125  friend class WMS;
126 
127  explicit JobManager(std::string hostname, simgrid::s4u::Mailbox *creator_mailbox);
128 
129  /***********************/
131  /***********************/
132 
133  private:
134 
135  int main() override;
136 
137  void dispatchJobs();
138 
139  void dispatchJob(const std::shared_ptr<CompoundJob>& job);
140 
141  bool processNextMessage();
142 
143  void
144  processStandardJobCompletion(const std::shared_ptr<StandardJob>& job, std::shared_ptr<ComputeService> compute_service);
145 
146  void
147  processStandardJobFailure(std::shared_ptr<StandardJob> job, std::shared_ptr<ComputeService> compute_service);
148 
149  void
150  processCompoundJobCompletion(const std::shared_ptr<CompoundJob>& job, std::shared_ptr<ComputeService> compute_service);
151 
152  void
153  processCompoundJobFailure(const std::shared_ptr<CompoundJob>& job, std::shared_ptr<ComputeService> compute_service);
154 
155  void processPilotJobStart(const std::shared_ptr<PilotJob>& job, std::shared_ptr<ComputeService> compute_service);
156 
157  void processPilotJobExpiration(const std::shared_ptr<PilotJob>& job, std::shared_ptr<ComputeService> compute_service);
158 
159  void processPilotJobFailure(const std::shared_ptr<PilotJob>& job, std::shared_ptr<ComputeService> compute_service, std::shared_ptr<FailureCause> cause);
160 
161  // Mailbox of the creator of this job manager
162  simgrid::s4u::Mailbox *creator_mailbox;
163 
164  std::vector<std::shared_ptr<CompoundJob>> jobs_to_dispatch;
165  std::set<std::shared_ptr<CompoundJob>> jobs_dispatched;
166 
167  unsigned long num_running_pilot_jobs = 0;
168 
169 // std::map<std::shared_ptr<CompoundJob>, std::map<std::string, std::string>> cjob_args;
170 
171  std::map<std::shared_ptr<CompoundJob>, std::shared_ptr<StandardJob>> cjob_to_sjob_map;
172  std::map<std::shared_ptr<CompoundJob>, std::shared_ptr<PilotJob>> cjob_to_pjob_map;
173 
174  };
175 
176  /***********************/
178  /***********************/
179 
180 };
181 
182 #endif //WRENCH_PILOTJOBMANAGER_H
wrench::JobManager::createPilotJob
std::shared_ptr< PilotJob > createPilotJob()
Create a pilot job.
Definition: JobManager.cpp:367
wrench::JobManager::kill
void kill()
Kill the job manager (brutally terminate the daemon, clears all jobs)
Definition: JobManager.cpp:56
wrench::ExecutionController
An abstraction of an execution controller, i.e., a running process that interacts with other services...
Definition: ExecutionController.h:37
wrench::JobManager::stop
void stop() override
Stop the job manager.
Definition: JobManager.cpp:68
wrench::JobManager
A helper daemon (co-located with and explicitly started by an execution controller),...
Definition: JobManager.h:56
wrench::JobManager::getNumRunningPilotJobs
unsigned long getNumRunningPilotJobs() const
Get the list of currently running pilot jobs.
Definition: JobManager.cpp:849
wrench
Definition: Action.cpp:28
wrench::JobManager::createCompoundJob
std::shared_ptr< CompoundJob > createCompoundJob(std::string name)
Create a Compound job.
Definition: JobManager.cpp:1137
wrench::JobManager::submitJob
void submitJob(const std::shared_ptr< StandardJob > &job, const std::shared_ptr< ComputeService > &compute_service, std::map< std::string, std::string > service_specific_args={})
Submit a standard job to a compute service.
Definition: JobManager.cpp:398
wrench::JobManager::getCreatorMailbox
simgrid::s4u::Mailbox * getCreatorMailbox()
Return the mailbox of the job manager's creator.
Definition: JobManager.cpp:1300
wrench::Service
A service that can be added to the simulation and that can be used by a WMS when executing a workflow...
Definition: Service.h:31
wrench::JobManager::terminateJob
void terminateJob(const std::shared_ptr< StandardJob > &job)
Terminate a standard job that hasn't completed/expired/failed yet.
Definition: JobManager.cpp:741