WMS.h
1 
10 #ifndef WRENCH_WMS_H
11 #define WRENCH_WMS_H
12 
13 #include <wrench/services/metering/EnergyMeterService.h>
14 #include <wrench/services/metering/BandwidthMeterService.h>
15 #include "wrench/services/Service.h"
16 #include "wrench/wms/DynamicOptimization.h"
17 #include "wrench/wms/StaticOptimization.h"
18 #include "wrench/wms/scheduler/PilotJobScheduler.h"
19 #include "wrench/wms/scheduler/StandardJobScheduler.h"
20 #include "wrench/services/compute/cloud/CloudComputeService.h"
21 #include "wrench/workflow/execution_events/StandardJobCompletedEvent.h"
22 #include "wrench/workflow/execution_events/StandardJobFailedEvent.h"
23 #include "wrench/workflow/execution_events/PilotJobStartedEvent.h"
24 #include "wrench/workflow/execution_events/PilotJobExpiredEvent.h"
25 #include "wrench/workflow/execution_events/FileCopyCompletedEvent.h"
26 #include "wrench/workflow/execution_events/FileCopyFailedEvent.h"
27 #include "wrench/workflow/execution_events/TimerEvent.h"
28 #include "wrench/workflow/Workflow.h"
29 
30 namespace wrench {
31 
32  class Simulation;
33  class ComputeService;
34  class CloudComputeService;
35  class VirtualizedClusterComputeService;
36  class StorageService;
37  class NetworkProximityService;
38  class FileRegistryService;
39 
43  class WMS : public Service {
44 
45  public:
46  void addWorkflow(Workflow *workflow, double start_time = 0);
50 
51  void addStaticOptimization(std::unique_ptr<StaticOptimization>);
52 
53  void addDynamicOptimization(std::unique_ptr<DynamicOptimization>);
54 
55 
56  protected:
57 
58  /***********************/
60  /***********************/
61 
62  WMS(std::unique_ptr<StandardJobScheduler> standard_job_scheduler,
63  std::unique_ptr<PilotJobScheduler> pilot_job_scheduler,
64  const std::set<std::shared_ptr<ComputeService>> &compute_services,
65  const std::set<std::shared_ptr<StorageService>> &storage_services,
66  const std::set<std::shared_ptr<NetworkProximityService>> &network_proximity_services,
67  std::shared_ptr<FileRegistryService> file_registry_service,
68  const std::string &hostname,
69  const std::string suffix);
70 
71 
72  void checkDeferredStart();
73 
74  void setTimer(double date, std::string message);
75 
76  std::shared_ptr<JobManager> createJobManager();
77  std::shared_ptr<DataMovementManager> createDataMovementManager();
78  std::shared_ptr<EnergyMeterService> createEnergyMeter(const std::map<std::string, double> &measurement_periods);
79  std::shared_ptr<EnergyMeterService> createEnergyMeter(const std::vector<std::string> &hostnames, double measurement_period);
80  std::shared_ptr<BandwidthMeterService> createBandwidthMeter(const std::map<std::string, double> &measurement_periods);
81  std::shared_ptr<BandwidthMeterService> createBandwidthMeter(const std::vector<std::string> &linknames, double measurement_period);
82 
83  void runDynamicOptimizations();
84 
85  void runStaticOptimizations();
86 
92  template <class T>
93  std::set<std::shared_ptr<T>> getAvailableComputeServices() {
94  bool is_cloud = (std::type_index(typeid(T)) == std::type_index(typeid(CloudComputeService)));
95  std::set<std::shared_ptr<T>> to_return;
96  for (auto const &h : this->compute_services) {
97  if (not is_cloud) {
98  auto shared_ptr = std::dynamic_pointer_cast<T>(h);
99  if (shared_ptr) {
100  to_return.insert(shared_ptr);
101  }
102  } else {
103  auto shared_ptr_cloud = std::dynamic_pointer_cast<T>(h);
104  auto shared_ptr_vc = std::dynamic_pointer_cast<VirtualizedClusterComputeService>(h);
105  if (shared_ptr_cloud and (not shared_ptr_vc)) {
106  to_return.insert(shared_ptr_cloud);
107  }
108  }
109  }
110  return to_return;
111  }
112 
113  // The template specialization below does not compile with gcc, hence the above method!
114 
115 // /**
116 // * @brief Obtain the list of compute services available to the WMS
117 // * @tparam CloudComputeService
118 // * @return a set of compute services
119 // * @return
120 // */
121 // template <>
122 // std::set<std::shared_ptr<CloudComputeService>> getAvailableComputeServices<CloudComputeService>() {
123 // std::set<std::shared_ptr<CloudComputeService>> to_return;
124 // for (auto const &h : this->compute_services) {
125 // auto shared_ptr_cloud = std::dynamic_pointer_cast<CloudComputeService>(h);
126 // auto shared_ptr_vc = std::dynamic_pointer_cast<VirtualizedClusterComputeService>(h);
127 // if (shared_ptr_cloud and (not shared_ptr_vc)) {
128 // to_return.insert(shared_ptr_cloud);
129 // }
130 // }
131 // return to_return;
132 // }
133 
134 
135  std::set<std::shared_ptr<StorageService>> getAvailableStorageServices();
136  std::set<std::shared_ptr<NetworkProximityService>> getAvailableNetworkProximityServices();
137  std::shared_ptr<FileRegistryService> getAvailableFileRegistryService();
138 
139  void waitForAndProcessNextEvent();
140  bool waitForAndProcessNextEvent(double timeout);
141  std::shared_ptr<WorkflowExecutionEvent> waitForNextEvent();
142  std::shared_ptr<WorkflowExecutionEvent> waitForNextEvent(double timeout);
143  virtual void processEventStandardJobCompletion(std::shared_ptr<StandardJobCompletedEvent>);
144 
145  virtual void processEventStandardJobFailure(std::shared_ptr<StandardJobFailedEvent>);
146 
147  virtual void processEventPilotJobStart(std::shared_ptr<PilotJobStartedEvent>);
148 
149  virtual void processEventPilotJobExpiration(std::shared_ptr<PilotJobExpiredEvent>);
150 
151  virtual void processEventFileCopyCompletion(std::shared_ptr<FileCopyCompletedEvent>);
152 
153  virtual void processEventFileCopyFailure(std::shared_ptr<FileCopyFailedEvent>);
154 
155  virtual void processEventTimer(std::shared_ptr<TimerEvent>);
156 
157  /***********************/
159  /***********************/
160 
161  /***********************/
163  /***********************/
164 
165  private:
166  friend class Simulation;
167  friend class DataMovementManager;
168  friend class JobManager;
169 
171  Workflow *workflow;
173  double start_time;
175  std::set<std::shared_ptr<ComputeService>> compute_services;
177  std::set<std::shared_ptr<StorageService>> storage_services;
179  std::set<std::shared_ptr<NetworkProximityService>> network_proximity_services;
181  std::shared_ptr<FileRegistryService> file_registry_service;
182 
184  std::shared_ptr<StandardJobScheduler> standard_job_scheduler;
186  std::shared_ptr<PilotJobScheduler> pilot_job_scheduler;
187 
189  std::vector<std::unique_ptr<DynamicOptimization>> dynamic_optimizations;
191  std::vector<std::unique_ptr<StaticOptimization>> static_optimizations;
192 
193  /***********************/
195  /***********************/
196 
197  private:
198  virtual int main() = 0;
199 
200  };
201 
202 };
203 
204 
205 #endif //WRENCH_WMS_H
void addStaticOptimization(std::unique_ptr< StaticOptimization >)
Add a static optimization to the list of optimizations. Optimizations are executed in order of insert...
Definition: WMS.cpp:76
PilotJobScheduler * getPilotJobScheduler()
Get the WMS's pilot scheduler.
Definition: WMS.cpp:427
StandardJobScheduler * getStandardJobScheduler()
Get the WMS's pilot scheduler.
Definition: WMS.cpp:436
A cloud-based compute service that manages a set of physical hosts and controls access to their resou...
Definition: CloudComputeService.h:36
Definition: Alarm.cpp:20
A workflow (to be executed by a WMS)
Definition: Workflow.h:34
void addWorkflow(Workflow *workflow, double start_time=0)
Assign a workflow to the WMS.
Definition: WMS.cpp:288
void addDynamicOptimization(std::unique_ptr< DynamicOptimization >)
Add a dynamic optimization to the list of optimizations. Optimizations are executed in order of inser...
Definition: WMS.cpp:66
A (mostly virtual) base class for implementing PilotJob scheduling algorithms to be used by a WMS.
Definition: PilotJobScheduler.h:26
Workflow * getWorkflow()
Get the workflow that was assigned to the WMS.
Definition: WMS.cpp:313
A class that provides basic simulation methods. Once the simulation object has been explicitly or imp...
Definition: Simulation.h:46
A (mostly virtual) base class for implementing StandardJob scheduling algorithms to be used by a WMS.
Definition: StandardJobScheduler.h:31
A service that can be added to the simulation and that can be used by a WMS when executing a workflow...
Definition: Service.h:26
A workflow management system (WMS)
Definition: WMS.h:43