4. Plugin API Reference¶
This section documents all available plugins in the ORBIT system.
4.1. Base Classes¶
4.1.1. PluginClient¶
4.1.2. ClientManagedPlugin¶
4.1.3. Plugin¶
- class radical.orbit.plugin_base.Plugin(app: FastAPI, instance_name: str)[source]¶
Bases:
objectBase class for Endpoint plugins.
Each plugin gets its own URL namespace and built-in session management. Routes are added with add_route_post / add_route_get.
plugin_name vs instance_name
plugin_nameis a class-level attribute that uniquely identifies the plugin type (e.g."psij","queue_info"). It is the key used in the globalPlugin._registryand in client-side lookups (endpoint.get_plugin("psij")).instance_nameis set at construction time (defaults toplugin_namewhen only one instance is needed) and drives the URL namespace:/{instance_name}/…. Multiple instances of the same plugin type on the same endpoint must be given distinct instance names.Subclasses that define a plugin_name class attribute will be automatically registered in the global plugin registry.
- Subclasses must define:
session_class: The session class to instantiate (must inherit from PluginSession)
- Subclasses may define:
client_class: The local helper class for the application-side client. version: The version string for the plugin. session_ttl: Session timeout in seconds (default: 3600 = 1 hour, 0 = no timeout) ui_config: UI configuration dict for portal rendering (see ui_schema.py)
4. Notifications¶
Plugins can send real-time notifications to clients via Server-Sent Events (SSE). The notification flow is: Session -> Plugin -> EndpointService -> Bridge -> SSE clients.
Sending notifications from a session:
# In your PluginSession subclass method: if self._plugin:
self._plugin._dispatch_notify(“my_topic”, {“key”: “value”, “status”: “running”})
The _plugin reference is automatically injected into sessions by the plugin. _dispatch_notify works from both sync and async contexts, including background threads.
Sending notifications from a plugin:
# In your Plugin subclass method: await self.send_notification(“my_topic”, {“key”: “value”})
Subscribing to notifications (browser/JavaScript):
const eventSource = new EventSource(‘/events’); eventSource.onmessage = (event) => {
const msg = JSON.parse(event.data); if (msg.topic === ‘notification’) {
const {endpoint, plugin, topic, data} = msg.data; console.log(${endpoint}/${plugin}: ${topic}, data);
}
};
Subscribing to notifications (Python client):
import sseclient import requests
response = requests.get(’http://bridge:8000/events’, stream=True) client = sseclient.SSEClient(response) for event in client.events():
msg = json.loads(event.data) if msg[‘topic’] == ‘notification’:
print(msg[‘data’])
4. Topology Updates¶
Plugins can receive notifications when endpoints connect or disconnect by overriding the on_topology_change method:
- async def on_topology_change(self, endpoints: dict):
‘’’Called when endpoints connect/disconnect.
- Args:
- endpoints: Dict mapping endpoint names to their plugin info.
Example: {“endpoint1”: {“plugins”: [“sysinfo”, “psij”]}}
‘’’ for endpoint_name, info in endpoints.items():
print(f”Endpoint {endpoint_name} has plugins: {info.get(‘plugins’, [])}”)
- classmethod get_plugin_class(name: str) Type | None[source]¶
Look up a registered plugin class by name.
- property is_login_node: bool¶
True on an HPC login node — a real scheduler is installed but no allocation is active.
- async register_session(request: Request) dict[source]¶
Register a new session and return its unique session ID.
- async unregister_session(request: Request) dict[source]¶
Unregister a session by its session ID and close it.
- async get_ui_config(request: Request) dict[source]¶
Return UI configuration for portal rendering.
External plugins can define ui_config to describe their forms, monitors, and notification handlers, enabling seamless portal integration.
- async health_check(request: Request) dict[source]¶
Health check endpoint for monitoring.
Returns plugin status including: - Plugin name and version - Uptime in seconds - Number of active sessions - Whether the plugin is healthy
- classmethod is_enabled(app: FastAPI) bool[source]¶
Return False to skip loading this plugin on this host.
Checked before instantiation so no routes are registered when the plugin is not applicable. Override in subclasses to gate on host type (bridge vs endpoint) or runtime conditions (e.g. scheduler presence). Default: always load.
4.2. Plugins¶
4.2.1. PluginLucid¶
4.2.2. PluginXGFabric¶
XGFabric Plugin for ORBIT.
Orchestrates CFDaAI workflows across multiple HPC clusters. Provides: - Configuration management (load/save workflow configs) - Workflow execution (start/stop/status) - Real-time progress notifications via SSE
The plugin runs on a local endpoint and communicates with remote endpoints (UCSB, Perlmutter) via the bridge.
- class radical.orbit.plugin_xgfabric.ResourceConfig(name: str = 'default', bridge_url: str = 'https://localhost:8000', bridge_cert: str | None = None, cluster_configs: ~typing.Dict[str, ~typing.Dict] = <factory>)[source]¶
Bases:
objectResource configuration — bridge connection and per-cluster scheduler settings.
- radical.orbit.plugin_xgfabric.dict_to_resource_config(d: Dict) ResourceConfig[source]¶
Convert dict to ResourceConfig, ignoring unknown fields.
- class radical.orbit.plugin_xgfabric.WorkflowConfig(name: str = 'default', description: str = '', local_workspace: str = '/tmp/xgfabric_workspace', cspot_woof_url: str = 'woof://128.111.45.61/davisstations/daviscupsout', cspot_limit: int = 10, num_simulations: int = 16, batch_size: int = 4, train_models: ~typing.List[str] = <factory>, simulation_task: ~typing.Dict | None = None, training_tasks: ~typing.Dict[str, ~typing.Dict] = <factory>, evaluation_task: ~typing.Dict | None = None, mock_sensor_data: bool = False)[source]¶
Bases:
objectWorkflow configuration — task templates and execution parameters.
- radical.orbit.plugin_xgfabric.config_to_dict(cfg: WorkflowConfig) Dict[source]¶
Convert config to JSON-serializable dict.
- radical.orbit.plugin_xgfabric.dict_to_config(d: Dict) WorkflowConfig[source]¶
Convert dict to WorkflowConfig, filtering unknown fields.
- class radical.orbit.plugin_xgfabric.ClusterStatus(name: str, endpoint_name: str, cluster_type: str, has_gpu: bool = False, online: bool = False, tasks_running: int = 0, pilot_job_id: str | None = None, pilot_status: str | None = None)[source]¶
Bases:
objectStatus of a single cluster.
- class radical.orbit.plugin_xgfabric.WorkflowState(status: str = 'idle', phase: str = '', progress: int = 0, message: str = '', start_time: str | None = None, end_time: str | None = None, error: str | None = None, active_cluster: str | None = None, completed_simulations: int = 0, total_simulations: int = 0, current_batch: int = 0, total_batches: int = 0, pilot_jobs: ~typing.Dict[str, str] = <factory>, immediate_clusters: ~typing.List[~typing.Dict] = <factory>, allocate_clusters: ~typing.List[~typing.Dict] = <factory>, log: ~typing.List[~typing.Dict] = <factory>, config_name: str | None = None, config_dir: str | None = None)[source]¶
Bases:
objectRuntime state of a workflow execution.
- class radical.orbit.plugin_xgfabric.XGFabricSession(sid: str, workdir: str | None = None, endpoint_name: str | None = None, bridge_url: str | None = None, bridge_cert: str | None = None)[source]¶
Bases:
PluginSessionXGFabric session - manages workflow configuration and execution.
- update_connected_endpoints(endpoints: Dict[str, Any])[source]¶
Update the cached list of connected endpoints.
- async load_config(name: str) Dict[source]¶
Load a workflow config by name, path, or builtin alias (‘default’, ‘test’).
- class radical.orbit.plugin_xgfabric.XGFabricClient(http_client: Client, base_url: str, bridge_client: BridgeClient | None = None, endpoint_id: str | None = None, plugin_name: str | None = None)[source]¶
Bases:
PluginClientClient-side interface for the XGFabric plugin.
- class radical.orbit.plugin_xgfabric.PluginXGFabric(app: FastAPI, workdir: str | None = None)[source]¶
Bases:
PluginXGFabric plugin for ORBIT.
Orchestrates CFDaAI workflows across multiple HPC clusters. Provides configuration management and workflow execution via REST API.
- plugin_name = 'xgfabric'¶
- session_class¶
alias of
XGFabricSession
- client_class¶
alias of
XGFabricClient
- ui_config: Dict | UIConfig | None = {'custom_template': True, 'description': 'CFDaAI workflow orchestrator for HPC clusters.', 'icon': '🌊', 'title': 'XGFabric Workflow'}¶
4.2.3. PluginQueueInfo¶
- class radical.orbit.plugin_queue_info.QueueInfoSession(sid: str, backend: QueueInfo)[source]¶
Bases:
PluginSessionQueueInfo session with shared backend.
All sessions share a single backend instance for cache efficiency.
- async close() dict[source]¶
Close this session.
Note: Backend is shared and not cleaned up here.
- Returns:
dict: An empty dictionary indicating successful closure.
- async get_info(user=None, force=False)[source]¶
Return queue/partition info.
- Args:
- user (str): User to filter partitions for. When None (default),
defaults to the current user. Pass user=’*’ to return all partitions (admin view).
force (bool): Bypass cache if True.
- Returns:
dict: Queue information from the backend.
- async list_jobs(queue, user=None, force=False)[source]¶
List jobs in a queue.
- Args:
queue (str): Partition name. user (str): User to filter jobs for. When None (default),
defaults to the current user. Pass user=’*’ to return all jobs (admin view).
force (bool): Bypass cache if True.
- Returns:
dict: Job listing from the backend.
- async list_all_jobs(user=None, force=False)[source]¶
List all jobs for the user across all partitions.
- Args:
- user (str): User to filter jobs for. When None (default),
defaults to the current user. Pass user=’*’ to return all jobs (admin view).
force (bool): Bypass cache if True.
- Returns:
dict: Job listing from the backend.
- class radical.orbit.plugin_queue_info.QueueInfoClient(http_client: Client, base_url: str, bridge_client: BridgeClient | None = None, endpoint_id: str | None = None, plugin_name: str | None = None)[source]¶
Bases:
PluginClientClient-side interface for the QueueInfo plugin.
- get_info(user: str | None = None, force: bool = False) dict[source]¶
Return queue/partition information.
- Args:
- user (str): User to filter partitions for. When None (default),
uses the endpoint service user. Pass user=’*’ to return all partitions (admin view).
force (bool): Bypass cache if True.
- Returns:
dict: Queue information filtered by user access.
- list_jobs(queue: str, user: str | None = None, force: bool = False) dict[source]¶
List jobs in a specified queue/partition.
- Args:
queue (str): Partition name to list jobs for. user (str): User to filter jobs for. When None (default),
uses the endpoint service user. Pass user=’*’ to return all jobs (admin view).
force (bool): Bypass cache if True.
- Returns:
dict: Job listing filtered by user.
- list_all_jobs(user: str | None = None, force: bool = False) dict[source]¶
List all jobs for the user across all partitions.
- Args:
user (str): User to filter jobs for. force (bool): Bypass cache if True.
- Returns:
dict: Job listing.
- list_allocations(user: str | None = None, force: bool = False) dict[source]¶
List allocations/projects.
- job_allocation() dict | None[source]¶
Return endpoint job allocation info, or None if not inside a batch job.
No session is required. The information reflects the environment of the endpoint process, not the client.
- Returns:
None: Endpoint is running on a login node. dict: Allocation summary with keys
job_id,partition,n_nodes,nodelist,cpus_per_node,gpus_per_node,account,job_name,runtime.- Raises:
- RuntimeError: Endpoint is inside an allocation but the scheduler did
not provide enough info to summarise it.
- nodelist() list[source]¶
Return the expanded list of hostnames in the endpoint’s allocation.
No session is required. Hostnames are returned one per node, in scheduler-reported order. Empty list when the endpoint is on a login node, when no batch backend is detected, or when the scheduler doesn’t expose the info.
- Returns:
list[str]: Allocated compute-node hostnames.
- class radical.orbit.plugin_queue_info.PluginQueueInfo(app: FastAPI, instance_name='queue_info', backend_conf=None, slurm_conf=None)[source]¶
Bases:
PluginQueueInfo plugin for ORBIT.
This plugin exposes batch system queue information, job listings, and allocation data via REST endpoints.
is_enabled()prevents loading on endpoints where no recognised batch system is installed.Backend selection is automatic: the plugin uses
make_queue_info()which dispatches toQueueInfoSlurm,QueueInfoPBSPro, orQueueInfoNonebased on what’s available.- plugin_name = 'queue_info'¶
- session_class¶
alias of
QueueInfoSession
- client_class¶
alias of
QueueInfoClient
- ui_config: Dict | UIConfig | None = {'description': 'Inspect batch partitions, jobs and allocations.', 'icon': '📋', 'monitors': [{'auto_load': 'get_info/{sid}', 'css_class': 'queueinfo-content', 'id': 'partitions', 'title': 'Partitions / Queues', 'type': 'table'}], 'refresh_button': True, 'title': 'Queue Info'}¶
- classmethod is_enabled(app: FastAPI) bool[source]¶
Load on endpoints with a recognised batch system (SLURM or PBSPro).
- get_job_allocation() dict | None[source]¶
Return endpoint job allocation info, or None if not inside a job.
Delegates to the active
BatchSystem.- Returns:
None: Endpoint is running on a login node. dict: Allocation details (see
BatchSystem.job_allocation).- Raises:
- RuntimeError: Endpoint is inside an allocation but the scheduler did
not provide enough info to summarise it.
- async backend_endpoint(request: Request) dict[source]¶
Session-less endpoint: report which batch backend is active.
Response:
{"backend": "slurm" | "pbs" | "none"}
- async job_allocation_endpoint(request: Request) dict[source]¶
Session-less endpoint: returns current endpoint job allocation info.
Response:
{"allocation": null} # login node {"allocation": {"n_nodes": 4, "runtime": 3600}} # inside a job {"allocation": {"n_nodes": 4, "runtime": null}} # unlimited walltime
- async nodelist_endpoint(request: Request) dict[source]¶
Session-less endpoint: expanded hostnames in this endpoint’s allocation.
Response:
{"nodelist": []} # login node / no scheduler {"nodelist": ["nid001234", "nid001235", ...]} # inside a job
Implementation note: nodelist lives on the
BatchSystemhierarchy (detect_batch_system()), not onQueueInfo(self._backend). Mirrors theget_job_allocationpattern a few methods up.
4.2.4. PluginSysInfo¶
- class radical.orbit.plugin_sysinfo.SysInfoProvider[source]¶
Bases:
objectHelper class to gather system information using psutil and standard tools.
- class radical.orbit.plugin_sysinfo.SysInfoSession(sid: str, provider: SysInfoProvider)[source]¶
Bases:
PluginSessionSysInfo session (Service-side).
Provides methods to gather system metrics.
- class radical.orbit.plugin_sysinfo.SysInfoClient(http_client: Client, base_url: str, bridge_client: BridgeClient | None = None, endpoint_id: str | None = None, plugin_name: str | None = None)[source]¶
Bases:
PluginClientClient-side interface for the SysInfo plugin.
- homedir() str[source]¶
Return the home directory of the endpoint-side process.
No session is required.
- class radical.orbit.plugin_sysinfo.PluginSysInfo(app: FastAPI)[source]¶
Bases:
PluginSysInfo plugin for ORBIT.
Provides system hardware configuration and resource utilization metrics.
- plugin_name = 'sysinfo'¶
- session_class¶
alias of
SysInfoSession
- client_class¶
alias of
SysInfoClient
- ui_config: Dict | UIConfig | None = {'description': 'Live CPU, memory, disk, network and GPU metrics.', 'icon': '🖥️', 'monitors': [{'auto_load': 'metrics/{sid}', 'css_class': 'sysinfo-content', 'id': 'metrics', 'title': 'System Metrics', 'type': 'metrics'}], 'refresh_button': True, 'title': 'System Info'}¶
- async homedir_endpoint(request: Request) dict[source]¶
Return the home directory of the endpoint-side process.
- async host_role_endpoint(request: Request) dict[source]¶
Return the role of the host this endpoint runs on.
Returned fields:
role—bridge/login/compute/standalone.
scheduler— the batch system’s full name (e.g.'slurm','pbs','pbs-aurora','none'); may be a site-specific subclass identifier.
psij_executor— the corresponding PsiJ executor name(
'slurm','pbs','local'). Use this when actually submitting via PsiJ;schedulermay be more specific.
job_id— current allocation id on compute nodes,Noneeverywhere else.
Detection logic lives in
utils.host_role(); this route is just a wire surface for it.
4.2.5. PluginPSIJ¶
PsiJ plugin for ORBIT — HPC job submission.
4.2.5.1. Three-class pattern¶
- PSIJSession Endpoint-side session: holds one PsiJ
Executorper submit call, manages job state via callbacks and background polling, streams stdout/stderr incrementally.
- PSIJClient Application-side thin HTTP wrapper: delegates to the endpoint service
over the bridge (
submit_job,get_job_status,list_jobs,cancel_job,submit_tunneled,tunnel_status).- PluginPSIJ Registers the plugin with the endpoint, adds URL routes, and wires
requests to the correct PSIJSession via
_forward().
- class radical.orbit.plugin_psij.PSIJSession(sid: str, **kwargs: Any)[source]¶
Bases:
PluginSessionSession-specific PSIJ state.
- poll_interval = 5.0¶
- async submit_job(job_spec_dict: Dict[str, Any], executor_name: str = 'local') Dict[str, Any][source]¶
Submit a job via PSIJ.
- async get_job_status(job_id: str, stdout_offset: int = 0, stderr_offset: int = 0) Dict[str, Any][source]¶
Get job status with metadata and optional stdout/stderr offset.
- class radical.orbit.plugin_psij.PSIJClient(http_client: Client, base_url: str, bridge_client: BridgeClient | None = None, endpoint_id: str | None = None, plugin_name: str | None = None)[source]¶
Bases:
PluginClientClient-side interface for the PSIJ plugin.
- submit_job(job_spec: Dict[str, Any], executor: str = 'local') Dict[str, Any][source]¶
Submit a job.
- Args:
job_spec (dict): The job specification. executor (str): The executor to use.
- Returns:
dict: Job submission result (job_id, native_id).
- get_job_status(job_id: str, stdout_offset: int = 0, stderr_offset: int = 0) Dict[str, Any][source]¶
Get the status of a job.
- Args:
job_id: The job ID to query. stdout_offset: Byte offset for stdout (0 = full). stderr_offset: Byte offset for stderr (0 = full).
- Returns:
Job status info including metadata and stdout/stderr.
- cancel_job(job_id: str) Dict[str, Any][source]¶
Cancel a job.
- Args:
job_id: The job ID to cancel.
- Returns:
Cancellation result.
- submit_tunneled(job_spec: Dict[str, Any], executor: str = 'local', tunnel: str = 'none') Dict[str, Any][source]¶
Submit a job that launches a child Endpoint service on a compute node.
The
job_spec.argumentslist must contain-n <endpoint_name>or--name <endpoint_name>so the child endpoint can register under the correct name.- Args:
- job_spec: PsiJ job specification dict.
argumentsmust include
-n <endpoint_name>.
executor: PsiJ executor name (default:
"local"). tunnel: SSH tunnel mode for the child’s bridge connection.One of:
'none'— child connects directly to thebridge. No SSH spawned anywhere.
'forward'— child opens its own outboundssh -Lto the login host (compute → login). Suitable where outbound SSH from compute is permitted and login → compute is blocked (Aurora, Perlmutter).
'reverse'— login-side parent opensssh -Rto the compute host (login → compute). Suitable where compute → login SSH is blocked but login → compute works (Odo).
Hard-rejects any other value (including
True/False) — there is no boolean back-compat.- job_spec: PsiJ job specification dict.
- Returns:
dict with
job_id,native_id, andendpoint_name.- Raises:
ValueError: If tunnel is not one of the three string values. RuntimeError: If the server returns an error response.
- tunnel_status(endpoint_name: str) Dict[str, Any][source]¶
Return the current tunnel status for a named endpoint.
This endpoint is session-less (no session required).
- Args:
endpoint_name: The logical name of the child endpoint service.
- Returns:
dict with fields:
endpoint_name— echoed back.status— one of"pending","active","failed","done", or"no_tunnel".port— assigned tunnel port (int) once active, else null.pid— SSH process PID, once spawned, else null.
- class radical.orbit.plugin_psij.PluginPSIJ(app: FastAPI, instance_name: str = 'psij')[source]¶
Bases:
PluginPSIJ plugin for ORBIT.
This plugin provides an interface to submit and manage jobs via the psij-python library.
- plugin_name = 'psij'¶
- session_class¶
alias of
PSIJSession
- client_class¶
alias of
PSIJClient
- ui_config: Dict | UIConfig | None = {'description': 'Submit and monitor HPC batch jobs via PsiJ.', 'forms': [{'fields': [{'column': 0, 'css_class': 'p-exec', 'default': 'radical-orbit-endpoint-wrapper.sh', 'label': 'Executable', 'name': 'exec', 'type': 'text'}, {'column': 0, 'css_class': 'p-args', 'label': 'Arguments (space-separated)', 'name': 'args', 'placeholder': 'auto-filled with --url and --name', 'type': 'text'}, {'column': 0, 'css_class': 'p-executor', 'label': 'Executor', 'name': 'executor', 'options': ['local', 'slurm', 'pbs', 'lsf'], 'type': 'select'}, {'column': 1, 'css_class': 'p-queue', 'label': 'Queue / Partition', 'name': 'queue', 'placeholder': 'optional', 'required': False, 'type': 'text'}, {'column': 1, 'css_class': 'p-account', 'label': 'Account / Project', 'name': 'account', 'placeholder': 'optional', 'required': False, 'type': 'text'}, {'column': 1, 'css_class': 'p-duration', 'label': 'Duration (seconds)', 'name': 'duration', 'placeholder': 'e.g. 600', 'required': False, 'type': 'text'}, {'column': 1, 'css_class': 'p-node-count', 'label': 'Number of Nodes', 'name': 'node_count', 'placeholder': 'e.g. 1', 'required': False, 'type': 'number'}, {'column': 1, 'css_class': 'p-custom-attr', 'label': '🔧 Custom Attributes', 'name': 'custom', 'required': False, 'type': 'custom_attributes'}], 'id': 'submit', 'layout': 'grid2', 'submit': {'label': '🚀 Submit Job', 'style': 'success'}, 'title': '📝 Submit Job'}], 'icon': '🚀', 'monitors': [{'css_class': 'psij-output', 'empty_text': 'No jobs submitted yet.', 'id': 'jobs', 'title': '📊 Job Monitor', 'type': 'task_list'}], 'notifications': {'id_field': 'job_id', 'state_field': 'state', 'topic': 'job_status'}, 'title': 'PsiJ Jobs'}¶
- classmethod is_enabled(app: FastAPI) bool[source]¶
PsiJ loads on endpoint nodes (login or compute) — not on the bridge.
- async submit_tunneled(request: Request) dict[source]¶
Submit a job that starts a new Endpoint service on a compute node.
The job must pass
-n/--name <endpoint_name>in its arguments so the child endpoint service can register under the correct name.Tunnel direction is selected by the
tunnelfield:'none'— no SSH tunnel; child connects directly to the bridge.'forward'— child opens its own outboundssh -Lback tothis login node (compute → login). We inject
--tunnel forwardand--tunnel-via <login>into the child’s argv. The child writes the rendezvous file itself; the parent watcher only observes job state.
'reverse'— parent (this plugin) opensssh -Rto thecompute node once the job reaches RUNNING and writes the rendezvous file with the remote port allocated by the compute-side sshd. We inject only
--tunnel reverseso the child waits for the rendezvous file.
Request body JSON fields:
job_spec(dict) — PsiJ job specification.executor(str) — PsiJ executor name (default:"local").tunnel(str) — One of'none','forward','reverse'(default:
'none'). Boolean values are not accepted.
- Returns:
JSON with
job_id,native_id, andendpoint_name.- Raises:
400 if
tunnelis not one of the three string values. 422 if-n/--nameis missing fromjob_spec.arguments. 409 if a tunnel watcher for the same endpoint name is already active.
- async tunnel_status(request: Request) dict[source]¶
Return the current tunnel status for a named endpoint.
Path param:
endpoint_nameReturns a JSON object with fields:
endpoint_name— echoed back.status— one of"pending","active","failed","done", or"no_tunnel".
port— allocated tunnel port (int) once the child endpointhas published it, else null.
pid— SSH process PID on the compute node (read fromthe pid rendezvous file) once active, else null.
4.2.6. PluginRhapsody¶
Rhapsody Plugin for ORBIT.
Exposes the RHAPSODY Session/Task API so that remote clients can submit and monitor compute / AI tasks on endpoint nodes.
- class radical.orbit.plugin_rhapsody.RhapsodySession(sid: str, backend_names: list[str] | None = None, allow_pickled_tasks: bool = True, notify_batch_window: float = 0.25, notify_batch_size: int = 1024)[source]¶
Bases:
PluginSessionRhapsody session (service-side).
Wraps a
rhapsody.Sessioninstance, forwarding task submission, monitoring, cancellation and statistics queries.- property prof: Profiler¶
- async submit_tasks(task_dicts: list[dict], pre_expanded: bool = False) list[dict][source]¶
Submit a list of tasks.
Each dict is converted to a
ComputeTaskorAITaskviaBaseTask.from_dict(). Function fields encoded as cloudpickle blobs or import-path strings are deserialized first.Uses a pipeline: deserialization of chunk N+1 runs concurrently with backend submission of chunk N, so the two dominant costs overlap.
- Returns:
list[dict]: Minimal ack dicts
{uid, state}.
- async wait_tasks(uids: list[str], timeout: float | None = None) list[dict][source]¶
Return current task states (non-blocking snapshot).
This method no longer blocks until tasks complete. Clients should rely on SSE
task_statusnotifications for real-time completion events, and call this endpoint only to fetch the current state snapshot.- Args:
uids (list[str]): Task UIDs to query. timeout (float | None): Ignored (kept for API compat).
- Returns:
list[dict]: Current task state dicts.
- class radical.orbit.plugin_rhapsody.RhapsodyClient(*args, **kwargs)[source]¶
Bases:
PluginClientClient-side interface for the Rhapsody plugin.
- register_session(backends: list[str] | None = None, init_timeout: float = 120, notify_batch_window: float | None = None, notify_batch_size: int | None = None)[source]¶
Register a session, optionally specifying backend names.
The endpoint initializes the session asynchronously. This method blocks until a
session_statusSSE notification confirms that the session is ready (or until init_timeout seconds).Falls back to polling when no
BridgeClientis available.- Args:
- backends: List of backend names (e.g.
['dragon_v3']). Defaults to
['dragon_v3']on the server side.
init_timeout: Seconds to wait for session init (default 120). notify_batch_window: Seconds to accumulate notifications
before flushing (endpoint-side).
notify_batch_size: Max notifications per flush (endpoint-side).
- backends: List of backend names (e.g.
- submit_tasks(task_dicts: list[dict]) list[dict][source]¶
Submit tasks to the endpoint.
Large batches are automatically split so each payload stays within the WebSocket frame limit. Batches are submitted concurrently via a thread pool so that network round-trips overlap (pipelining).
UIDs are assigned client-side (if absent) so the caller can start waiting for SSE notifications immediately.
- Args:
task_dicts: List of task specification dicts.
- Returns:
list[dict]: Submitted task info (uid, state).
- wait_tasks(uids: list[str], timeout: float | None = None) list[dict][source]¶
Wait for tasks to reach terminal state via SSE notifications.
Purely client-side: the persistent
_on_task_donecallback (registered at session init) accumulates completions intoself._completed. This method checks the accumulator and blocks only until every requested UID appears there.Falls back to periodic polling when no
BridgeClientis available (e.g. direct construction in tests).- Args:
uids: Task UIDs to wait for. timeout: Seconds to wait (None = forever).
- Returns:
list[dict]: Completed task dicts.
- class radical.orbit.plugin_rhapsody.PluginRhapsody(app: FastAPI, instance_name: str = 'rhapsody')[source]¶
Bases:
PluginRhapsody plugin for ORBIT.
Exposes the RHAPSODY Session / Task API via REST endpoints:
POST /rhapsody/register_session – create session
POST /rhapsody/submit/{sid} – submit tasks
POST /rhapsody/wait/{sid} – query task states
GET /rhapsody/list_tasks/{sid} – list all tasks
GET /rhapsody/task/{sid}/{uid} – get single task
POST /rhapsody/cancel/{sid}/{uid} – cancel single task
POST /rhapsody/cancel_all/{sid} – cancel all tasks
Notification topics:
session_status,task_status,task_status_batch.- plugin_name = 'rhapsody'¶
- session_class¶
alias of
RhapsodySession
- client_class¶
alias of
RhapsodyClient
- ui_config: Dict | UIConfig | None = {'description': 'Submit compute tasks, wait for results, view stdout/stderr.', 'forms': [{'fields': [{'css_class': 'rh-exec', 'default': '/bin/echo', 'label': 'Executable', 'name': 'exec', 'type': 'text'}, {'css_class': 'rh-args', 'default': 'hello from rhapsody', 'label': 'Arguments (space-separated)', 'name': 'args', 'type': 'text'}, {'css_class': 'rh-backends', 'label': 'Backend', 'name': 'backends', 'options': ['dragon_v3', 'concurrent'], 'type': 'select'}, {'css_class': 'rh-timeout', 'default': '', 'label': 'Timeout (s)', 'name': 'timeout', 'type': 'number'}, {'css_class': 'rh-ranks', 'default': '', 'label': 'MPI Ranks', 'name': 'ranks', 'type': 'number'}, {'css_class': 'rh-type', 'label': 'Task Type', 'name': 'type', 'options': ['', 'mpi'], 'type': 'select'}, {'css_class': 'rh-cwd', 'default': '', 'label': 'Working Dir', 'name': 'cwd', 'type': 'text'}], 'id': 'submit', 'layout': 'single', 'submit': {'label': '▶ Submit Task', 'style': 'success'}, 'title': '📝 Submit Task'}], 'icon': '🎼', 'monitors': [{'css_class': 'rh-output', 'empty_text': 'No tasks submitted yet.', 'id': 'tasks', 'title': '📊 Task Monitor', 'type': 'task_list'}], 'notifications': {'id_field': 'uid', 'state_field': 'state', 'topic': 'task_status'}, 'title': 'Rhapsody Tasks'}¶
- classmethod is_enabled(app: FastAPI) bool[source]¶
Rhapsody loads on compute nodes (inside an allocation) and on standalone hosts (no batch system at all). Both can host Dragon workers; bridges and login nodes deliberately don’t load Rhapsody.
- async register_session(request: Request) dict[source]¶
Register a new Rhapsody session.
Accepts an optional JSON body with
{"backends": ["name", ...]}.Session initialization happens asynchronously in the background. The SID is returned immediately. The client should wait for a
session_statusSSE notification (status: "ready") before submitting tasks, or handle HTTP 409 on early requests.
4.3. Queue Info Backend¶
4.3.1. QueueInfo¶
QueueInfo abstract base + shared helpers + factory.
Backend implementations live in queue_info_slurm.py and queue_info_pbs.py.
- class radical.orbit.queue_info.QueueInfo[source]¶
Bases:
ABCAbstract base class for batch system queue information backends.
Subclasses implement _collect_info, _collect_jobs, _collect_allocations to gather data from a specific batch system. Results are cached with a configurable TTL.
- backend_name = 'none'¶
- start_prefetch()[source]¶
Start background threads to prefetch queue info and allocations in parallel so both caches are warm as quickly as possible.
- get_info(user=None, force=False)[source]¶
Return queue/partition info. force=True bypasses cache.
- Args:
- user (str): User to filter partitions for. When None (default),
defaults to the current user. Pass user=’*’ to return all partitions (admin view).
force (bool): Bypass cache if True.
- Returns:
dict: {“queues”: {<partition_name>: {…}, …}}
- list_jobs(queue, user=None, force=False)[source]¶
List jobs in a queue.
- Args:
queue (str): Partition name to list jobs for. user (str): User to filter jobs for. When None (default),
defaults to the current user. Pass user=’*’ to return all jobs.
force (bool): Bypass cache if True.
- Returns:
dict: {“jobs”: [<job_dict>, …]}
- radical.orbit.queue_info.make_queue_info(batch=None, conf_path=None) QueueInfo[source]¶
Factory: return a QueueInfo subclass matching the active scheduler.
- Args:
- batch: Optional pre-detected BatchSystem instance. If None, calls
batch_system.detect_batch_system().- conf_path: Optional path to the scheduler’s configuration file
(forwarded to the backend; only SLURM uses it today).
- Returns:
- QueueInfo: a QueueInfoSlurm, QueueInfoPBSPro, or QueueInfoNone
instance depending on what the local system supports.