Skip to content

Jobbergate Agent Reference

jobbergate_agent

clients

cluster_api

Core module for Jobbergate API clients management

AsyncBackendClient

Bases: AsyncClient

Extends the httpx.AsyncClient class with automatic token acquisition for requests. The token is acquired lazily on the first httpx request issued. This client should be used for most agent actions.

request async
request(*args, **kwargs)

Request wrapper that captures request errors and sends them to Sentry.

This ensures events are sent to Sentry even if the caller handles the exception.

acquire_token
acquire_token(token: Token) -> Token

Retrieves a token from OIDC based on the app settings.

influx

Core module for defining the InfluxDB client.

initialize_influx_client
initialize_influx_client() -> None | InfluxDBClient

Initialize the InfluxDB client.

internals

update
self_update_agent async
self_update_agent() -> None

Fetch the upstream version and update the agent if necessary.

In case the agent is updated, the scheduler is shutdown and restarted with the new version.

jobbergate

constants
FileType

Bases: AutoNameEnum

File type enum.

report_health
report_health_status async
report_health_status(interval: int) -> None

Ping the API to report the agent's status.

schemas
ActiveJobSubmission

Bases: BaseModel

Specialized model for the cluster-agent to pull an active job_submission.

InfluxDBMeasurementDict

Bases: TypedDict

Map each entry in the list returned by InfluxDBClient(...).get_list_measurements(...).

InfluxDBPointDict

Bases: TypedDict

Map each entry in the generator returned by InfluxDBClient(...).query(...).get_points().

JobScript

Bases: BaseModel

Model to match database for the JobScript resource.

JobScriptFile

Bases: BaseModel

Model for the job_script_files field of the JobScript resource.

JobSubmissionMetricsMaxResponse

Bases: BaseModel

Model for the response of the /jobbergate/job-submissions/agent/metrics/{job_submission_id} endpoint.

JobSubmissionMetricsMaxTime

Bases: BaseModel

Model for the max_times field of the JobSubmissionMetricsMaxResponse.

PendingJobSubmission

Bases: BaseModel

Specialized model for the cluster-agent to pull a pending job_submission along with data from its job_script and application sources.

SlurmJobData

Bases: BaseModel

Specialized model for the cluster-agent to pull job state information from slurm and post the data as an update to the Jobbergate API.

validate_job_state classmethod
validate_job_state(value: str | list[str] | None) -> str | None

Validate the job_state field.

SlurmSubmitError

Bases: BaseModel

Specialized model for error content in a SlurmSubmitResponse.

SlurmSubmitResponse

Bases: BaseModel

Specialized model for the cluster-agent to pull a pending job_submission along with data from its job_script and application sources.

submit
SubprocessAsUserHandler dataclass

Bases: SubprocessHandler

Subprocess handler that runs as a given user.

fetch_pending_submissions async
fetch_pending_submissions() -> list[PendingJobSubmission]

Retrieve a list of pending job_submissions.

get_job_script_file async
get_job_script_file(pending_job_submission: PendingJobSubmission, submit_dir: Path) -> Path

Get the job script file from the backend.

mark_as_rejected async
mark_as_rejected(job_submission_id: int, report_message: str)

Mark job_submission as rejected in the Jobbergate API.

mark_as_submitted async
mark_as_submitted(job_submission_id: int, slurm_job_id: int, slurm_job_data: SlurmJobData)

Mark job_submission as submitted in the Jobbergate API.

process_supporting_files async
process_supporting_files(pending_job_submission: PendingJobSubmission, submit_dir: Path) -> list[Path]

Process the submission support files.

Write the support files to the submit_dir if WRITE_SUBMISSION_FILES is set to True. Reject the submission if there are support files with WRITE_SUBMISSION_FILES set to False.

retrieve_submission_file async
retrieve_submission_file(file: JobScriptFile) -> str

Get a submission file from the backend and return the decoded file content.

submit_job_script async
submit_job_script(pending_job_submission: PendingJobSubmission, user_mapper: SlurmUserMapper) -> int

Submit a Job Script to slurm via the sbatch command.

Parameters:

Name Type Description Default
pending_job_submission PendingJobSubmission

A job_submission with fields needed to submit.

required

Returns:

Type Description
int

The slurm_job_id for the submitted job

submit_pending_jobs async
submit_pending_jobs() -> None

Submit all pending jobs and update them with SUBMITTED status and slurm_job_id.

validate_submit_dir
validate_submit_dir(submit_dir: Path, subprocess_handler: SubprocessAsUserHandler) -> None

Validate the submission directory.

The directory must exist and be writable by the user, so this verification is delegated to the subprocess handler as the user that will run the sbatch command.

This is needed since submit_dir.exists() would run as the agent user, which may face permission errors.

write_submission_file async
write_submission_file(file_content: str, filename: str, submit_dir: Path) -> Path

Write a decoded file content to the submit_dir.

update
fetch_active_submissions async
fetch_active_submissions() -> List[ActiveJobSubmission]

Retrieve a list of active job_submissions.

fetch_influx_data async
fetch_influx_data(job: int, measurement: INFLUXDB_MEASUREMENT, *, time: int | None = None, host: str | None = None, step: int | None = None, task: int | None = None) -> list[InfluxDBPointDict]

Fetch data from InfluxDB for a given host, step and task.

fetch_influx_measurements
fetch_influx_measurements() -> list[InfluxDBMeasurementDict]

Fetch measurements from InfluxDB.

update_active_jobs async
update_active_jobs() -> None

Update slurm job state for active jobs.

update_job_data async
update_job_data(job_submission_id: int, slurm_job_data: SlurmJobData) -> None

Update a job submission with the job state

update_job_metrics async
update_job_metrics(active_job_submittion: ActiveJobSubmission) -> None

Update job metrics for a job submission.

This function fetches the metrics from InfluxDB and sends to the API.

main

settings

Settings

Bases: BaseSettings

compute_extra_settings
compute_extra_settings() -> Self

Compute settings values that are based on other settings values.

tasks

Task definitions for the Jobbergate Agent.

active_submissions_task
active_submissions_task(scheduler: BaseScheduler) -> Job

Schedule a task to handle active jobs every TASK_JOBS_INTERVAL_SECONDS seconds.

garbage_collection_task
garbage_collection_task(scheduler: BaseScheduler) -> Union[Job, None]

Schedule a task to perform garbage collection every dat at a specified time.

pending_submissions_task
pending_submissions_task(scheduler: BaseScheduler) -> Job

Schedule a task to submit pending jobs every TASK_JOBS_INTERVAL_SECONDS seconds.

self_update_task
self_update_task(scheduler: BaseScheduler) -> Job

Schedule a task to self update the agent every TASK_SELF_UPDATE_INTERVAL_SECONDS seconds.

status_report_task
status_report_task(scheduler: BaseScheduler) -> Job

Schedule a task to report the status.

trigger_garbage_collections async
trigger_garbage_collections(interval_between_calls: int = 60) -> None

Trigger maintenance tasks on the Jobbergate API.

utils

compute

Core module for compute related functions.

aggregate_influx_measures
aggregate_influx_measures(data_points: Iterator[InfluxDBPointDict]) -> JobMetricData

Aggregate the list of data points by time, host, step and task.

The output data is a list of tuples with the following format: [ (time, host, step, task, CPUFrequency, CPUTime, CPUUtilization, GPUMemMB, GPUUtilization, Pages, RSS, VMSize, ReadMB, WriteMB), ... ]

measure_memory_usage
measure_memory_usage(func: Callable) -> Callable

Decorator to measure the memory usage of a function.

Parameters:

Name Type Description Default
func Callable

Function whose memory usage should be measured.

required

Returns:

Type Description
Callable

Decorated function.

exception

Core module for exception related operations

AuthTokenError

Bases: JobbergateAgentError

Raise exception when there are connection issues with the backend

JobSubmissionError

Bases: JobbergateAgentError

Raise exception when a job cannot be submitted raises any error

JobbergateAgentError

Bases: Buzz

Raise exception when execution command returns an error

JobbergateApiError

Bases: JobbergateAgentError

Raise exception when communication with Jobbergate API fails

ProcessExecutionError

Bases: JobbergateAgentError

Raise exception when execution command returns an error

SbatchError

Bases: JobbergateAgentError

Raise exception when sbatch raises any error

SlurmParameterParserError

Bases: JobbergateAgentError

Raise exception when Slurm mapper or SBATCH parser face any error

logging

Core module for logging operations

log_error
log_error(params: DoExceptParams)

Provide a utility function to log a Buzz-based exception and the stack-trace of the error's context.

Parameters:

Name Type Description Default
params DoExceptParams

A DoExceptParams instance containing the original exception, a message describing it, and the stack trace of the error.

required
logger_wraps
logger_wraps(*, entry: bool = True, exit: bool = True, level: str = 'DEBUG')

Decorator to wrap a function with logging statements.

Reference

https://loguru.readthedocs.io/en/stable/resources/recipes.html

plugin

Provide to the agent the ability to load custom plugins that are installed on the same environment.

load_plugins
load_plugins(plugin_name: str) -> Dict[str, Any]

Discover and load plugins available to the agent, allowing for third party ones to be included.

Notice the ones shipped with the agent are also declared on the pyproject.toml file as plugins, even though they could be easily loaded directly from source. This aims to support tests and to demonstrate how to use the plugin system.

Reference

https://packaging.python.org/en/latest/guides/creating-and-discovering-plugins/

scheduler

Provide the task scheduler for the agent and the main loop to run it.

Custom tasks can be added to the agent as installable plugins, which are discovered at runtime.

References

https://github.com/agronholm/apscheduler https://packaging.python.org/en/latest/guides/creating-and-discovering-plugins

JobbergateTask

Bases: Protocol

Protocol to be implemented by any task that is expected to run on the scheduler.

__call__
__call__(scheduler: BaseScheduler) -> Union[Job, None]

Specify a callable used to schedule a task and return the resulting job.

This is handled to client code to give them the opportunity to handle their own configuration and to access the rich flexibility of the scheduler API.

None can also be returned if no task is going to be scheduled due to internal business logic.

init_scheduler
init_scheduler() -> BaseScheduler

Initialize the scheduler and schedule all tasks.

schedule_tasks
schedule_tasks(scheduler: BaseScheduler) -> None

Discovery and schedule all tasks to be run by the agent.

shut_down_scheduler
shut_down_scheduler(scheduler: BaseScheduler, wait: bool = True) -> None

Shutdown the scheduler.

user_mapper

Provide to the agent a way to map email addresses from Jobbergate local Slurm users.

Custom mappers can be added to the agent as installable plugins, which are discovered at runtime.

SlurmUserMapper module-attribute
SlurmUserMapper = Mapping[str, str]

Slurm user mappers are mappings from email addresses to local Slurm users.

SingleUserMapper dataclass

Bases: Mapping

A user mapper that always returns the same user.

__post_init__
__post_init__()

Validate the user mapper by asserting it is not an empty string.

SlurmUserMapperFactory

Bases: Protocol

Protocol to be implemented by plugins on client code.

A callable with no arguments is expected in order to handle to client code the configuration and initialization of any custom user mapper. Any object that implements the Mapping protocol can be returned.

__call__
__call__() -> SlurmUserMapper

Specify the signature to build a user mapper.

manufacture
manufacture() -> SlurmUserMapper

Create an instance of a Slurm user mapper given the app configuration.