5. Plugin: globus¶
The globus plugin stages files via Globus Online (the Transfer API). Globus moves data
collection-to-collection out of band, so the plugin is an orchestrator:
it submits transfers between two Globus collections and monitors task state.
Bytes never flow through the client, endpoint, or bridge — which distinguishes it
from the byte-streaming staging plugin.
The plugin is endpoint-side (it is not loaded on the bridge) and is only enabled when globus-sdk is importable.
5.1. Architecture¶
GlobusSessionHolds one
globus_sdk.TransferClientbound to the supplied credential, tracks submitted tasks, and runs a background poller (~10 s) that emitstransfer_statusnotifications on state changes.globus-sdkis synchronous, so each Transfer call is offloaded withasyncio.to_threadto keep the endpoint event loop responsive.GlobusClientApplication-side helper mirroring the session methods.
PluginGlobusEndpoint plugin that registers the REST routes and per-client sessions.
5.2. Authentication¶
A Globus Transfer token is supplied at register_session time, as one of:
access_token— wrapped in anAccessTokenAuthorizer. Access tokens expire (~48 h) and are not renewed; the client re-registers with a fresh token when one lapses.refresh_token+client_id— wrapped in aRefreshTokenAuthorizer, which transparently renews access tokens, so long-running transfers survive expiry.
The credential lives in the endpoint process memory (inside the TransferClient)
for the lifetime of the session and is never written to disk.
Acquire a token via Globus Auth (for example with the Globus CLI or your own helper script), then pass it to register_session.
.. note:
For Globus Connect Server **mapped collections**, the token must already
carry the per-collection ``data_access`` dependent scope. Otherwise the
plugin surfaces a clear ``401`` (``ConsentRequired``) telling the caller to
re-acquire a token with that scope, rather than an opaque error.
5.3. Collections¶
Collections are identified by UUID and passed explicitly on the wire. The
literal string "local" (or an omitted collection) resolves to the endpoint’s
configured local collection, discovered at plugin start-up in this order:
RADICAL_ORBIT_GLOBUS_COLLECTIONenvironment variable;Globus Connect Personal (
globus_sdk.LocalGlobusConnectPersonal);the config file
~/.radical/orbit/globus.jsonwith a{"local_collection": "<uuid>"}key;otherwise
None(an explicit UUID must then be supplied).
A per-session local_collection passed to register_session overrides the
auto-detected default.
Note
Facility GCS/DTN collections are generally not locally discoverable — only Globus Connect Personal exposes its UUID. On such hosts, set the environment variable or the config file.
5.4. Client API¶
from radical.orbit import BridgeClient
bc = BridgeClient()
ec = bc.get_endpoint_client(bc.list_endpoints()[0])
globus = ec.get_plugin('globus')
# access token, or refresh_token=… + client_id=…
globus.register_session(access_token='…', local_collection='…')
sub = globus.submit_transfer(
source='<src-uuid>', destination='local',
items=[{'source': '/data/in/', 'destination': '/~/out/',
'recursive': True}],
label='my transfer', sync_level='checksum')
globus.task_wait(sub['task_id'], timeout=60)
print(globus.get_task(sub['task_id'])['status'])
Methods:
register_session(access_token=None, refresh_token=None, client_id=None, local_collection=None)Open a session with a Globus credential.
submit_transfer(source, destination, items, label=None, sync_level=None)Submit a transfer.
itemsis a list of{"source", "destination", "recursive"}dicts. Returns{task_id, submission_id, status}.
get_task(task_id) / task_wait(task_id, timeout=60, polling_interval=10) /
cancel_task(task_id) / list_tasks(limit=100)
Task monitoring and control.
ls(collection, path=None) / mkdir(collection, path) /
rename(collection, oldpath, newpath) / delete(collection, paths, recursive=False, label=None)
Filesystem operations on a collection (
deletesubmits a Globus delete task).
endpoint_search(filter_text=None, limit=25)/get_endpoint(endpoint_id)Collection discovery and metadata.
5.5. Notifications¶
Topic transfer_status is emitted on task state change:
{task_id, status, label, bytes_transferred, files_transferred, nice_status}
where status is the Globus task status
(ACTIVE / SUCCEEDED / FAILED).
5.6. Example¶
See examples/example_globus.py for a runnable end-to-end transfer (it
defaults to the public Globus Tutorial Collections).