Skip to content

Jobbergate Agent Reference

jobbergate_agent

clients

cluster_api

Core module for Jobbergate API clients management

AsyncBackendClient

Bases: AsyncClient

Extends the httpx.AsyncClient class with automatic token acquisition for requests. The token is acquired lazily on the first httpx request issued. This client should be used for most agent actions.

request async
request(*args, **kwargs)

Request wrapper that captures request errors and sends them to Sentry.

This ensures events are sent to Sentry even if the caller handles the exception.

acquire_token
acquire_token(token: Token) -> Token

Retrieves a token from OIDC based on the app settings.

internals

update
self_update_agent async
self_update_agent() -> None

Fetch the upstream version and update the agent if necessary.

In case the agent is updated, the scheduler is shutdown and restarted with the new version.

jobbergate

constants
FileType

Bases: AutoNameEnum

File type enum.

report_health
report_health_status async
report_health_status(interval: int) -> None

Ping the API to report the agent's status.

schemas
ActiveJobSubmission

Bases: BaseModel

Specialized model for the cluster-agent to pull an active job_submission.

JobScript

Bases: BaseModel

Model to match database for the JobScript resource.

JobScriptFile

Bases: BaseModel

Model for the job_script_files field of the JobScript resource.

PendingJobSubmission

Bases: BaseModel

Specialized model for the cluster-agent to pull a pending job_submission along with data from its job_script and application sources.

SlurmJobData

Bases: BaseModel

Specialized model for the cluster-agent to pull job state information from slurm and post the data as an update to the Jobbergate API.

SlurmSubmitError

Bases: BaseModel

Specialized model for error content in a SlurmSubmitResponse.

SlurmSubmitResponse

Bases: BaseModel

Specialized model for the cluster-agent to pull a pending job_submission along with data from its job_script and application sources.

submit
SubprocessAsUserHandler dataclass

Bases: SubprocessHandler

Subprocess handler that runs as a given user.

fetch_pending_submissions async
fetch_pending_submissions() -> list[PendingJobSubmission]

Retrieve a list of pending job_submissions.

get_job_script_file async
get_job_script_file(pending_job_submission: PendingJobSubmission, submit_dir: Path) -> Path

Get the job script file from the backend.

mark_as_rejected async
mark_as_rejected(job_submission_id: int, report_message: str)

Mark job_submission as rejected in the Jobbergate API.

mark_as_submitted async
mark_as_submitted(job_submission_id: int, slurm_job_id: int, slurm_job_data: SlurmJobData)

Mark job_submission as submitted in the Jobbergate API.

process_supporting_files async
process_supporting_files(pending_job_submission: PendingJobSubmission, submit_dir: Path) -> list[Path]

Process the submission support files.

Write the support files to the submit_dir if WRITE_SUBMISSION_FILES is set to True. Reject the submission if there are support files with WRITE_SUBMISSION_FILES set to False.

retrieve_submission_file async
retrieve_submission_file(file: JobScriptFile) -> str

Get a submission file from the backend and return the decoded file content.

submit_job_script async
submit_job_script(pending_job_submission: PendingJobSubmission, user_mapper: SlurmUserMapper) -> int

Submit a Job Script to slurm via the sbatch command.

Parameters:

Name Type Description Default
pending_job_submission PendingJobSubmission

A job_submission with fields needed to submit.

required

Returns:

Type Description
int

The slurm_job_id for the submitted job

submit_pending_jobs async
submit_pending_jobs() -> None

Submit all pending jobs and update them with SUBMITTED status and slurm_job_id.

write_submission_file async
write_submission_file(file_content: str, filename: str, submit_dir: Path) -> Path

Write a decoded file content to the submit_dir.

update
fetch_active_submissions async
fetch_active_submissions() -> List[ActiveJobSubmission]

Retrieve a list of active job_submissions.

update_active_jobs async
update_active_jobs() -> None

Update slurm job state for active jobs.

update_job_data async
update_job_data(job_submission_id: int, slurm_job_data: SlurmJobData) -> None

Update a job submission with the job state

main

settings

Settings

Bases: BaseSettings

compute_extra_settings
compute_extra_settings() -> Self

Compute settings values that are based on other settings values.

tasks

Task definitions for the Jobbergate Agent.

active_submissions_task
active_submissions_task(scheduler: BaseScheduler) -> Job

Schedule a task to handle active jobs every TASK_JOBS_INTERVAL_SECONDS seconds.

garbage_collection_task
garbage_collection_task(scheduler: BaseScheduler) -> Union[Job, None]

Schedule a task to perform garbage collection every dat at a specified time.

pending_submissions_task
pending_submissions_task(scheduler: BaseScheduler) -> Job

Schedule a task to submit pending jobs every TASK_JOBS_INTERVAL_SECONDS seconds.

self_update_task
self_update_task(scheduler: BaseScheduler) -> Job

Schedule a task to self update the agent every TASK_SELF_UPDATE_INTERVAL_SECONDS seconds.

status_report_task
status_report_task(scheduler: BaseScheduler) -> Job

Schedule a task to report the status.

trigger_garbage_collections async
trigger_garbage_collections(interval_between_calls: int = 60) -> None

Trigger maintenance tasks on the Jobbergate API.

utils

exception

Core module for exception related operations

AuthTokenError

Bases: ClusterAgentError

Raise exception when there are connection issues with the backend

ClusterAgentError

Bases: Buzz

Raise exception when execution command returns an error

JobSubmissionError

Bases: ClusterAgentError

Raise exception when a job cannot be submitted raises any error

JobbergateApiError

Bases: ClusterAgentError

Raise exception when communication with Jobbergate API fails

ProcessExecutionError

Bases: ClusterAgentError

Raise exception when execution command returns an error

SbatchError

Bases: ClusterAgentError

Raise exception when sbatch raises any error

SlurmParameterParserError

Bases: ClusterAgentError

Raise exception when Slurm mapper or SBATCH parser face any error

handle_errors_async async
handle_errors_async(message: str, raise_exc_class: Union[Type[Exception], None] = Exception, raise_args: Optional[Iterable[Any]] = None, raise_kwargs: Optional[Mapping[str, Any]] = None, handle_exc_class: Union[Type[Exception], Tuple[Type[Exception], ...]] = Exception, do_finally: Callable[[], None] | Callable[[], Coroutine[Any, Any, None]] = noop, do_except: Callable[[DoExceptParams], None] | Callable[[DoExceptParams], Coroutine[Any, Any, None]] = noop, do_else: Callable[[], None] | Callable[[], Coroutine[Any, Any, None]] = noop) -> AsyncIterator[None]

Async context manager that will intercept exceptions and repackage them with a message attached.

Example
with handle_errors("It didn't work"):
    some_code_that_might_raise_an_exception()

Parameters:

Name Type Description Default
message str

The message to attach to the raised exception.

required
raise_exc_class Union[Type[Exception], None]

The exception type to raise with the constructed message if an exception is caught in the managed context.

Defaults to Exception.

If None is passed, no new exception will be raised and only the do_except, do_else, and do_finally functions will be called.

Exception
raise_args Optional[Iterable[Any]]

Additional positional args (after the constructed message) that will be passed when raising an instance of the raise_exc_class.

None
raise_kwargs Optional[Mapping[str, Any]]

Keyword args that will be passed when raising an instance of the raise_exc_class.

None
handle_exc_class Union[Type[Exception], Tuple[Type[Exception], ...]]

Limits the class of exceptions that will be intercepted. Any other exception types will not be caught and re-packaged. Defaults to Exception (will handle all exceptions). May also be provided as a tuple of multiple exception types to handle.

Exception
do_finally Callable[[], None] | Callable[[], Coroutine[Any, Any, None]]

A function that should always be called at the end of the block. Should take no parameters.

noop
do_except Callable[[DoExceptParams], None] | Callable[[DoExceptParams], Coroutine[Any, Any, None]]

A function that should be called only if there was an exception. Must accept one parameter that is an instance of the DoExceptParams dataclass. Note that the do_except method is passed the original exception.

noop
do_else Callable[[], None] | Callable[[], Coroutine[Any, Any, None]]

A function that should be called only if there were no exceptions encountered.

noop
logging

Core module for logging operations

log_error
log_error(params: DoExceptParams)

Provide a utility function to log a Buzz-based exception and the stack-trace of the error's context.

Parameters:

Name Type Description Default
params DoExceptParams

A DoExceptParams instance containing the original exception, a message describing it, and the stack trace of the error.

required
logger_wraps
logger_wraps(*, entry: bool = True, exit: bool = True, level: str = 'DEBUG')

Decorator to wrap a function with logging statements.

Reference

https://loguru.readthedocs.io/en/stable/resources/recipes.html

plugin

Provide to the agent the ability to load custom plugins that are installed on the same environment.

load_plugins
load_plugins(plugin_name: str) -> Dict[str, Any]

Discover and load plugins available to the agent, allowing for third party ones to be included.

Notice the ones shipped with the agent are also declared on the pyproject.toml file as plugins, even though they could be easily loaded directly from source. This aims to support tests and to demonstrate how to use the plugin system.

Reference

https://packaging.python.org/en/latest/guides/creating-and-discovering-plugins/

scheduler

Provide the task scheduler for the agent and the main loop to run it.

Custom tasks can be added to the agent as installable plugins, which are discovered at runtime.

References

https://github.com/agronholm/apscheduler https://packaging.python.org/en/latest/guides/creating-and-discovering-plugins

JobbergateTask

Bases: Protocol

Protocol to be implemented by any task that is expected to run on the scheduler.

__call__
__call__(scheduler: BaseScheduler) -> Union[Job, None]

Specify a callable used to schedule a task and return the resulting job.

This is handled to client code to give them the opportunity to handle their own configuration and to access the rich flexibility of the scheduler API.

None can also be returned if no task is going to be scheduled due to internal business logic.

init_scheduler
init_scheduler() -> BaseScheduler

Initialize the scheduler and schedule all tasks.

schedule_tasks
schedule_tasks(scheduler: BaseScheduler) -> None

Discovery and schedule all tasks to be run by the agent.

shut_down_scheduler
shut_down_scheduler(scheduler: BaseScheduler, wait: bool = True) -> None

Shutdown the scheduler.

user_mapper

Provide to the agent a way to map email addresses from Jobbergate local Slurm users.

Custom mappers can be added to the agent as installable plugins, which are discovered at runtime.

SlurmUserMapper module-attribute
SlurmUserMapper = Mapping[str, str]

Slurm user mappers are mappings from email addresses to local Slurm users.

SingleUserMapper dataclass

Bases: Mapping

A user mapper that always returns the same user.

__post_init__
__post_init__()

Validate the user mapper by asserting it is not an empty string.

SlurmUserMapperFactory

Bases: Protocol

Protocol to be implemented by plugins on client code.

A callable with no arguments is expected in order to handle to client code the configuration and initialization of any custom user mapper. Any object that implements the Mapping protocol can be returned.

__call__
__call__() -> SlurmUserMapper

Specify the signature to build a user mapper.

manufacture
manufacture() -> SlurmUserMapper

Create an instance of a Slurm user mapper given the app configuration.