10 #ifndef WRENCH_ACTION_SCHEDULER_H
11 #define WRENCH_ACTION_SCHEDULER_H
16 #include "wrench/services/compute/ComputeService.h"
17 #include "wrench/services/helper_services/host_state_change_detector/HostStateChangeDetector.h"
18 #include "wrench/services/helper_services/action_execution_service/ActionExecutionServiceProperty.h"
42 WRENCH_PROPERTY_COLLECTION_TYPE default_property_values = {
43 {ActionExecutionServiceProperty::TERMINATE_WHENEVER_ALL_RESOURCES_ARE_DOWN,
"false"},
44 {ActionExecutionServiceProperty::FAIL_ACTION_AFTER_ACTION_EXECUTOR_CRASH,
"true"},
47 WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE default_messagepayload_values = {
54 std::map<std::string, std::tuple<unsigned long, double>> compute_resources,
55 std::shared_ptr<Service> parent_service,
56 WRENCH_PROPERTY_COLLECTION_TYPE property_list = {},
57 WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE messagepayload_list = {}
64 bool actionCanRun(std::shared_ptr<Action> action);
66 std::shared_ptr<Service> getParentService()
const;
68 void setParentService(std::shared_ptr<Service> parent);
70 void submitAction(
const std::shared_ptr<Action> &action);
72 void terminateAction(std::shared_ptr<Action> action, ComputeService::TerminationCause termination_cause);
74 bool IsThereAtLeastOneHostWithAvailableResources(
unsigned long num_cores,
double ram);
76 std::map<std::string, std::tuple<unsigned long, double>> &getComputeResources();
78 std::map<std::string, double> getResourceInformation(
const std::string &key);
90 void validateProperties();
93 std::map<std::string, std::tuple<unsigned long, double>> compute_resources;
96 std::unordered_map<std::string, double> ram_availabilities;
97 std::unordered_map<std::string, unsigned long> running_thread_counts;
99 std::shared_ptr<Service> parent_service =
nullptr;
101 std::unordered_map<std::shared_ptr<StandardJob> , std::set<std::shared_ptr<DataFile>>> files_in_scratch;
104 std::set<std::shared_ptr<Action> > running_actions;
107 std::unordered_map<std::shared_ptr<Action> , std::tuple<std::string, unsigned long>> action_run_specs;
109 std::set<std::shared_ptr<Action>> all_actions;
110 std::deque<std::shared_ptr<Action>> ready_actions;
113 std::unordered_map<std::shared_ptr<Action> , std::shared_ptr<ActionExecutor>> action_executors;
119 void terminate(
bool send_failure_notifications, ComputeService::TerminationCause termination_cause);
121 void failCurrentActions();
123 void processActionExecutorCompletion(std::shared_ptr<ActionExecutor> executor);
125 void processActionExecutorFailure(std::shared_ptr<ActionExecutor> executor);
127 void processActionExecutorCrash(std::shared_ptr<ActionExecutor> executor);
129 void processActionTerminationRequest(std::shared_ptr<Action> action, simgrid::s4u::Mailbox *answer_mailbox, ComputeService::TerminationCause termination_cause);
131 bool processNextMessage();
133 void dispatchReadyActions();
140 enum JobTerminationCause {
145 COMPUTE_SERVICE_KILLED
148 void terminateRunningAction(std::shared_ptr<Action> action,
bool killed_due_to_job_cancelation);
150 void killAction(std::shared_ptr<Action> action, std::shared_ptr<FailureCause> cause);
153 void processSubmitAction(simgrid::s4u::Mailbox *answer_mailbox, std::shared_ptr<Action> action);
155 std::tuple<std::string, unsigned long> pickAllocation(std::shared_ptr<Action> action,
156 std::string required_host,
unsigned long required_num_cores,
157 std::set<std::string> &hosts_to_avoid);
160 bool isThereAtLeastOneHostWithResources(
unsigned long num_cores,
double ram);
162 void cleanup(
bool has_terminated_cleanly,
int return_value)
override;
164 bool areAllComputeResourcesDownWithNoActionExecutorRunning();
169 std::shared_ptr<HostStateChangeDetector> host_state_change_monitor;
175 #endif //WRENCH_ACTION_SCHEDULER_H