WRENCH  1.11
Cyberinfrastructure Simulation Workbench
Overview Installation Getting Started WRENCH 101 WRENCH 102
Public Member Functions | Protected Member Functions | List of all members
wrench::JobManager Class Reference

A helper daemon (co-located with and explicitly started by an execution controller), which is used to handle all job executions. More...

#include <JobManager.h>

Inheritance diagram for wrench::JobManager:
wrench::Service wrench::S4U_Daemon

Public Member Functions

 ~JobManager () override
 Destructor, which kills the daemon (and clears all the jobs)
 
std::shared_ptr< CompoundJobcreateCompoundJob (std::string name)
 Create a Compound job. More...
 
std::shared_ptr< PilotJobcreatePilotJob ()
 Create a pilot job. More...
 
std::shared_ptr< StandardJobcreateStandardJob (const std::shared_ptr< WorkflowTask > &task)
 Create a standard job. More...
 
std::shared_ptr< StandardJobcreateStandardJob (const std::shared_ptr< WorkflowTask > &task, const std::map< std::shared_ptr< DataFile >, std::shared_ptr< FileLocation >> &file_locations)
 Create a standard job. More...
 
std::shared_ptr< StandardJobcreateStandardJob (const std::shared_ptr< WorkflowTask > &task, std::map< std::shared_ptr< DataFile >, std::vector< std::shared_ptr< FileLocation >>> file_locations)
 Create a standard job. More...
 
std::shared_ptr< StandardJobcreateStandardJob (const std::vector< std::shared_ptr< WorkflowTask >> &tasks)
 Create a standard job. More...
 
std::shared_ptr< StandardJobcreateStandardJob (const std::vector< std::shared_ptr< WorkflowTask >> &tasks, const std::map< std::shared_ptr< DataFile >, std::shared_ptr< FileLocation > > &file_locations, std::vector< std::tuple< std::shared_ptr< DataFile >, std::shared_ptr< FileLocation >, std::shared_ptr< FileLocation > >> pre_file_copies, std::vector< std::tuple< std::shared_ptr< DataFile >, std::shared_ptr< FileLocation >, std::shared_ptr< FileLocation > >> post_file_copies, std::vector< std::tuple< std::shared_ptr< DataFile >, std::shared_ptr< FileLocation > >> cleanup_file_deletions)
 
std::shared_ptr< StandardJobcreateStandardJob (const std::vector< std::shared_ptr< WorkflowTask >> &tasks, const std::map< std::shared_ptr< DataFile >, std::shared_ptr< FileLocation >> &file_locations)
 
std::shared_ptr< StandardJobcreateStandardJob (const std::vector< std::shared_ptr< WorkflowTask >> &tasks, std::map< std::shared_ptr< DataFile >, std::vector< std::shared_ptr< FileLocation >>> file_locations)
 Create a standard job. More...
 
std::shared_ptr< StandardJobcreateStandardJob (const std::vector< std::shared_ptr< WorkflowTask >> &tasks, std::map< std::shared_ptr< DataFile >, std::vector< std::shared_ptr< FileLocation >>> file_locations, std::vector< std::tuple< std::shared_ptr< DataFile >, std::shared_ptr< FileLocation >, std::shared_ptr< FileLocation > >> pre_file_copies, std::vector< std::tuple< std::shared_ptr< DataFile >, std::shared_ptr< FileLocation >, std::shared_ptr< FileLocation > >> post_file_copies, std::vector< std::tuple< std::shared_ptr< DataFile >, std::shared_ptr< FileLocation > >> cleanup_file_deletions)
 Create a standard job. More...
 
simgrid::s4u::Mailbox * getCreatorMailbox ()
 Return the mailbox of the job manager's creator. More...
 
unsigned long getNumRunningPilotJobs () const
 Get the list of currently running pilot jobs. More...
 
void kill ()
 Kill the job manager (brutally terminate the daemon, clears all jobs)
 
void stop () override
 Stop the job manager. More...
 
void submitJob (const std::shared_ptr< CompoundJob > &job, const std::shared_ptr< ComputeService > &compute_service, std::map< std::string, std::string > service_specific_args={})
 Submit a compound job to a compute service. More...
 
void submitJob (const std::shared_ptr< PilotJob > &job, const std::shared_ptr< ComputeService > &compute_service, std::map< std::string, std::string > service_specific_args={})
 Submit a pilot job to a compute service. More...
 
void submitJob (const std::shared_ptr< StandardJob > &job, const std::shared_ptr< ComputeService > &compute_service, std::map< std::string, std::string > service_specific_args={})
 Submit a standard job to a compute service. More...
 
void terminateJob (const std::shared_ptr< CompoundJob > &job)
 Terminate a compound job that hasn't completed/expired/failed yet. More...
 
void terminateJob (const std::shared_ptr< PilotJob > &job)
 Terminate a pilot jobthat hasn't completed/expired/failed yet. More...
 
void terminateJob (const std::shared_ptr< StandardJob > &job)
 Terminate a standard job that hasn't completed/expired/failed yet. More...
 
- Public Member Functions inherited from wrench::Service
void assertServiceIsUp ()
 Throws an exception if the service is not up. More...
 
std::string getHostname ()
 Get the name of the host on which the service is / will be running. More...
 
const WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE & getMessagePayloadList () const
 Get all message payloads and their values of the Service. More...
 
double getMessagePayloadValue (WRENCH_MESSAGEPAYLOAD_TYPE)
 Get a message payload of the Service as a double. More...
 
double getNetworkTimeoutValue ()
 Returns the service's network timeout value. More...
 
std::string getPhysicalHostname ()
 Get the physical name of the host on which the service is / will be running. More...
 
bool getPropertyValueAsBoolean (WRENCH_PROPERTY_TYPE)
 Get a property of the Service as a boolean. More...
 
double getPropertyValueAsDouble (WRENCH_PROPERTY_TYPE)
 Get a property of the Service as a double. More...
 
std::string getPropertyValueAsString (WRENCH_PROPERTY_TYPE)
 Get a property of the Service as a string. More...
 
unsigned long getPropertyValueAsUnsignedLong (WRENCH_PROPERTY_TYPE)
 Get a property of the Service as an unsigned long. More...
 
bool isUp ()
 Returns true if the service is UP, false otherwise. More...
 
void resume ()
 Resume the service. More...
 
void setNetworkTimeoutValue (double value)
 Sets the service's network timeout value. More...
 
void setStateToDown ()
 Set the state of the service to DOWN.
 
void start (std::shared_ptr< Service > this_service, bool daemonize, bool auto_restart)
 Start the service. More...
 
void suspend ()
 Suspend the service.
 
- Public Member Functions inherited from wrench::S4U_Daemon
 S4U_Daemon (std::string hostname, std::string process_name_prefix)
 Constructor (daemon with a mailbox) More...
 
void acquireDaemonLock ()
 Method to acquire the daemon's lock. More...
 
virtual void cleanup (bool has_returned_from_main, int return_value)
 Cleanup function called when the daemon terminates (for whatever reason). The default behavior is to throw an exception if the host is off. This method should be overriden in a daemons implements some fault-tolerant behavior, or is naturally tolerant. More...
 
void createLifeSaver (std::shared_ptr< S4U_Daemon > reference)
 Create a life saver for the daemon. More...
 
std::string getName ()
 Retrieve the process name. More...
 
int getReturnValue ()
 Returns the value returned by main() (if the daemon has returned from main) More...
 
SimulationgetSimulation ()
 Get the service's simulation. More...
 
S4U_Daemon::State getState ()
 Get the daemon's state. More...
 
bool hasReturnedFromMain ()
 Returns true if the daemon has returned from main() (i.e., not brutally killed) More...
 
bool isDaemonized ()
 Return the daemonized status of the daemon. More...
 
bool isSetToAutoRestart ()
 Return the auto-restart status of the daemon. More...
 
std::pair< bool, int > join ()
 Join (i.e., wait for) the daemon. More...
 
void releaseDaemonLock ()
 Method to release the daemon's lock. More...
 
void resumeActor ()
 Resume the daemon/actor.
 
void setSimulation (Simulation *simulation)
 Set the service's simulation. More...
 
void setupOnExitFunction ()
 Sets up the on_exit function for the actor.
 
void startDaemon (bool _daemonized, bool _auto_restart)
 Start the daemon. More...
 
void suspendActor ()
 Suspend the daemon/actor.
 

Protected Member Functions

 JobManager (std::string hostname, simgrid::s4u::Mailbox *creator_mailbox)
 Constructor. More...
 
- Protected Member Functions inherited from wrench::Service
 Service (std::string hostname, std::string process_name_prefix)
 Constructor. More...
 
 ~Service () override
 Destructor.
 
template<class T >
std::shared_ptr< T > getSharedPtr ()
 Method to retrieve the shared_ptr to a service. More...
 
void serviceSanityCheck ()
 Check whether the service is properly configured and running. More...
 
void setMessagePayload (WRENCH_MESSAGEPAYLOAD_TYPE, double)
 Set a message payload of the Service. More...
 
void setMessagePayloads (WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE default_messagepayload_values, WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE overriden_messagepayload_values)
 Set default and user-defined message payloads. More...
 
void setProperties (WRENCH_PROPERTY_COLLECTION_TYPE default_property_values, WRENCH_PROPERTY_COLLECTION_TYPE overriden_property_values)
 Set default and user-defined properties. More...
 
void setProperty (WRENCH_PROPERTY_TYPE, const std::string &)
 Set a property of the Service. More...
 
- Protected Member Functions inherited from wrench::S4U_Daemon
bool killActor ()
 Kill the daemon/actor (does nothing if already dead) More...
 
void runMainMethod ()
 Method that run's the user-defined main method (that's called by the S4U actor class)
 

Additional Inherited Members

- Public Types inherited from wrench::S4U_Daemon
enum  State { UP, DOWN, SUSPENDED }
 Daemon states. More...
 
- Static Public Member Functions inherited from wrench::S4U_Daemon
static simgrid::s4u::Mailbox * getRunningActorRecvMailbox ()
 Return the running actor's recv mailbox. More...
 
- Public Attributes inherited from wrench::S4U_Daemon
std::string hostname
 The name of the host on which the daemon is running.
 
LifeSaver * life_saver = nullptr
 The daemon's life saver.
 
simgrid::s4u::Mailbox * mailbox
 The daemon's mailbox.
 
std::string process_name
 The name of the daemon.
 
simgrid::s4u::Mailbox * recv_mailbox
 The daemon's receive mailbox (to send to another daemon so that that daemon can reply)
 
- Static Public Attributes inherited from wrench::S4U_Daemon
static std::unordered_map< aid_t, simgrid::s4u::Mailbox * > map_actor_to_recv_mailbox
 
- Static Protected Member Functions inherited from wrench::Service
static void assertServiceIsUp (std::shared_ptr< Service > s)
 Assert for the service being up. More...
 
- Protected Attributes inherited from wrench::Service
WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE messagepayload_list
 The service's messagepayload list.
 
std::string name
 The service's name.
 
double network_timeout = 30.0
 The time (in seconds) after which a service that doesn't send back a reply (control) message causes a NetworkTimeOut exception. (default: 30 second; if <0 never timeout)
 
WRENCH_PROPERTY_COLLECTION_TYPE property_list
 The service's property list.
 
bool shutting_down = false
 A boolean that indicates if the service is in the middle of shutting down.
 
- Protected Attributes inherited from wrench::S4U_Daemon
unsigned int num_starts = 0
 The number of time that this daemon has started (i.e., 1 + number of restarts)
 
Simulationsimulation
 a pointer to the simulation object
 
State state
 The service's state.
 

Detailed Description

A helper daemon (co-located with and explicitly started by an execution controller), which is used to handle all job executions.

Constructor & Destructor Documentation

◆ JobManager()

wrench::JobManager::JobManager ( std::string  hostname,
simgrid::s4u::Mailbox *  creator_mailbox 
)
explicitprotected

Constructor.

Parameters
hostnamethe name of host on which the job manager will run
creator_mailboxthe mailbox of the manager's creator

Member Function Documentation

◆ createCompoundJob()

std::shared_ptr< CompoundJob > wrench::JobManager::createCompoundJob ( std::string  name)

Create a Compound job.

Parameters
namethe job's name (if empty, a unique job name will be picked for you)
Returns
the job

◆ createPilotJob()

std::shared_ptr< PilotJob > wrench::JobManager::createPilotJob ( )

Create a pilot job.

Returns
the pilot job
Exceptions
std::invalid_argument

◆ createStandardJob() [1/6]

std::shared_ptr< StandardJob > wrench::JobManager::createStandardJob ( const std::shared_ptr< WorkflowTask > &  task)

Create a standard job.

Parameters
taska task (which must be ready)
Returns
the standard job
Exceptions
std::invalid_argument

◆ createStandardJob() [2/6]

std::shared_ptr< StandardJob > wrench::JobManager::createStandardJob ( const std::shared_ptr< WorkflowTask > &  task,
const std::map< std::shared_ptr< DataFile >, std::shared_ptr< FileLocation >> &  file_locations 
)

Create a standard job.

Parameters
taska task (which must be ready)
file_locationsa map that specifies locations where input/output files should be read/written. When unspecified, it is assumed that the ComputeService's scratch storage space will be used.
Returns
the standard job
Exceptions
std::invalid_argument

◆ createStandardJob() [3/6]

std::shared_ptr< StandardJob > wrench::JobManager::createStandardJob ( const std::shared_ptr< WorkflowTask > &  task,
std::map< std::shared_ptr< DataFile >, std::vector< std::shared_ptr< FileLocation >>>  file_locations 
)

Create a standard job.

Parameters
taska task (which must be ready)
file_locationsa map that specifies, for each file, a list of locations, in preference order, where input/output files should be read/written. When unspecified, it is assumed that the ComputeService's scratch storage space will be used.
Returns
the standard job
Exceptions
std::invalid_argument

◆ createStandardJob() [4/6]

std::shared_ptr< StandardJob > wrench::JobManager::createStandardJob ( const std::vector< std::shared_ptr< WorkflowTask >> &  tasks)

Create a standard job.

Parameters
tasksa list of tasks (which must be either READY, or children of COMPLETED tasks or of tasks also included in the list)
Returns
the standard job
Exceptions
std::invalid_argument

◆ createStandardJob() [5/6]

std::shared_ptr< StandardJob > wrench::JobManager::createStandardJob ( const std::vector< std::shared_ptr< WorkflowTask >> &  tasks,
std::map< std::shared_ptr< DataFile >, std::vector< std::shared_ptr< FileLocation >>>  file_locations 
)

Create a standard job.

Parameters
tasksa list of tasks (which must be either READY, or children of COMPLETED tasks or of tasks also included in the list)
file_locationsa map that specifies, for each file, a list of locations, in preference order, where input/output files should be read/written. When unspecified, it is assumed that the ComputeService's scratch storage space will be used.
Returns
the standard job
Exceptions
std::invalid_argument

◆ createStandardJob() [6/6]

std::shared_ptr< StandardJob > wrench::JobManager::createStandardJob ( const std::vector< std::shared_ptr< WorkflowTask >> &  tasks,
std::map< std::shared_ptr< DataFile >, std::vector< std::shared_ptr< FileLocation >>>  file_locations,
std::vector< std::tuple< std::shared_ptr< DataFile >, std::shared_ptr< FileLocation >, std::shared_ptr< FileLocation > >>  pre_file_copies,
std::vector< std::tuple< std::shared_ptr< DataFile >, std::shared_ptr< FileLocation >, std::shared_ptr< FileLocation > >>  post_file_copies,
std::vector< std::tuple< std::shared_ptr< DataFile >, std::shared_ptr< FileLocation > >>  cleanup_file_deletions 
)

Create a standard job.

Parameters
tasksa list of tasks (which must be either READY, or children of COMPLETED tasks or of tasks also included in the standard job)
file_locationsa map that specifies, for each file, a list of locations, in preference order, where input/output files should be read/written. When unspecified, it is assumed that the ComputeService's scratch storage space will be used.
pre_file_copiesa vector of tuples that specify which file copy operations should be completed before task executions begin. The ComputeService::SCRATCH constant can be used to mean "the scratch storage space of the ComputeService".
post_file_copiesa vector of tuples that specify which file copy operations should be completed after task executions end. The ComputeService::SCRATCH constant can be used to mean "the scratch storage space of the ComputeService".
cleanup_file_deletionsa vector of file tuples that specify file deletion operations that should be completed at the end of the job. The ComputeService::SCRATCH constant can be used to mean "the scratch storage space of the ComputeService".
Returns
the standard job
Exceptions
std::invalid_argument

◆ getCreatorMailbox()

simgrid::s4u::Mailbox * wrench::JobManager::getCreatorMailbox ( )

Return the mailbox of the job manager's creator.

Returns
a mailbox

◆ getNumRunningPilotJobs()

unsigned long wrench::JobManager::getNumRunningPilotJobs ( ) const

Get the list of currently running pilot jobs.

Returns
a set of pilot jobs

◆ stop()

void wrench::JobManager::stop ( )
overridevirtual

Stop the job manager.

Exceptions
ExecutionException
std::runtime_error

Reimplemented from wrench::Service.

◆ submitJob() [1/3]

void wrench::JobManager::submitJob ( const std::shared_ptr< CompoundJob > &  job,
const std::shared_ptr< ComputeService > &  compute_service,
std::map< std::string, std::string >  service_specific_args = {} 
)

Submit a compound job to a compute service.

Parameters
joba compound job
compute_servicea compute service
service_specific_argsarguments specific for compute services:
  • to a BareMetalComputeService: {{"actionID", "[hostname:][num_cores]}, ...}
    • If no value is provided for a task, then the service will choose a host and use as many cores as possible on that host.
    • If a "" value is provided for a task, then the service will choose a host and use as many cores as possible on that host.
    • If a "hostname" value is provided for a task, then the service will run the task on that host, using as many of its cores as possible
    • If a "num_cores" value is provided for a task, then the service will run that task with this many cores, but will choose the host on which to run it.
    • If a "hostname:num_cores" value is provided for a task, then the service will run that task with the specified number of cores on that host.
  • to a BatchComputeService: {{"-t":"<int>" (requested number of minutes)},{"-N":"<int>" (number of requested hosts)},{"-c":"<int>" (number of requested cores per host)}[,{"actionID":"[node_index:]num_cores"}] [,{"-u":"<string>" (username)}]}
  • to a VirtualizedClusterComputeService: {} (jobs should not be submitted directly to the service)}
  • to a CloudComputeService: {} (jobs should not be submitted directly to the service)}
  • to a HTCondorComputeService:
    • For a "grid universe" job that will be submitted to a child BatchComputeService: {{"-universe":"grid", {"-t":"<int>" (requested number of minutes)},{"-N":"<int>" (number of requested hosts)},{"-c":"<int>" (number of requested cores per host)}[,{"-service":"<string>" (BatchComputeService service name)}] [, {"actionID":"[node_index:]num_cores"}] [, {"-u":"<string>" (username)}]}
    • For a "non-grid universe" job that will be submitted to a child BareMetalComputeService: {}
Exceptions
std::invalid_argument
ExecutionException

◆ submitJob() [2/3]

void wrench::JobManager::submitJob ( const std::shared_ptr< PilotJob > &  job,
const std::shared_ptr< ComputeService > &  compute_service,
std::map< std::string, std::string >  service_specific_args = {} 
)

Submit a pilot job to a compute service.

Parameters
joba pilot job
compute_servicea compute service
service_specific_argsarguments specific for compute services:
Exceptions
std::invalid_argument
ExecutionException

◆ submitJob() [3/3]

void wrench::JobManager::submitJob ( const std::shared_ptr< StandardJob > &  job,
const std::shared_ptr< ComputeService > &  compute_service,
std::map< std::string, std::string >  service_specific_args = {} 
)

Submit a standard job to a compute service.

Parameters
joba standard job
compute_servicea compute service
service_specific_argsarguments specific for compute services:
  • to a BareMetalComputeService: {{"taskID", "[hostname:][num_cores]}, ...}
    • If no value is provided for a task, then the service will choose a host and use as many cores as possible on that host.
    • If a "" value is provided for a task, then the service will choose a host and use as many cores as possible on that host.
    • If a "hostname" value is provided for a task, then the service will run the task on that host, using as many of its cores as possible
    • If a "num_cores" value is provided for a task, then the service will run that task with this many cores, but will choose the host on which to run it.
    • If a "hostname:num_cores" value is provided for a task, then the service will run that task with the specified number of cores on that host.
  • to a BatchComputeService: {{"-t":"<int>" (requested number of minutes)},{"-N":"<int>" (number of requested hosts)},{"-c":"<int>" (number of requested cores per host)}[,{"taskID":"[node_index:]num_cores"}] [,{"-u":"<string>" (username)}]}
  • to a VirtualizedClusterComputeService: {} (jobs should not be submitted directly to the service)}
  • to a CloudComputeService: {} (jobs should not be submitted directly to the service)}
  • to a HTCondorComputeService:
    • For a "grid universe" job that will be submitted to a child BatchComputeService: {{"-universe":"grid", {"-t":"<int>" (requested number of minutes)},{"-N":"<int>" (number of requested hosts)},{"-c":"<int>" (number of requested cores per host)}[,{"-service":"<string>" (BatchComputeService service name)}] [, {"taskID":"[node_index:]num_cores"}] [, {"-u":"<string>" (username)}]}
    • For a "non-grid universe" job that will be submitted to a child BareMetalComputeService: {}
Exceptions
std::invalid_argument
ExecutionException

◆ terminateJob() [1/3]

void wrench::JobManager::terminateJob ( const std::shared_ptr< CompoundJob > &  job)

Terminate a compound job that hasn't completed/expired/failed yet.

Parameters
jobthe job to be terminated
Exceptions
ExecutionException
std::invalid_argument
std::runtime_error

◆ terminateJob() [2/3]

void wrench::JobManager::terminateJob ( const std::shared_ptr< PilotJob > &  job)

Terminate a pilot jobthat hasn't completed/expired/failed yet.

Parameters
jobthe job to be terminated
Exceptions
ExecutionException
std::invalid_argument
std::runtime_error

◆ terminateJob() [3/3]

void wrench::JobManager::terminateJob ( const std::shared_ptr< StandardJob > &  job)

Terminate a standard job that hasn't completed/expired/failed yet.

Parameters
jobthe job to be terminated
Exceptions
ExecutionException
std::invalid_argument
std::runtime_error

The documentation for this class was generated from the following files: