.. _plugin_globus: ################### Plugin: ``globus`` ################### The ``globus`` plugin stages files via `Globus Online `_ (the Transfer API). Globus moves data **collection-to-collection** out of band, so the plugin is an *orchestrator*: it submits transfers between two Globus collections and monitors task state. Bytes never flow through the client, endpoint, or bridge — which distinguishes it from the byte-streaming :ref:`staging ` plugin. The plugin is **endpoint-side** (it is not loaded on the bridge) and is only enabled when `globus-sdk `_ is importable. Architecture ============ ``GlobusSession`` Holds one ``globus_sdk.TransferClient`` bound to the supplied credential, tracks submitted tasks, and runs a background poller (~10 s) that emits ``transfer_status`` notifications on state changes. ``globus-sdk`` is synchronous, so each Transfer call is offloaded with ``asyncio.to_thread`` to keep the endpoint event loop responsive. ``GlobusClient`` Application-side helper mirroring the session methods. ``PluginGlobus`` Endpoint plugin that registers the REST routes and per-client sessions. Authentication ============== A Globus Transfer token is supplied at ``register_session`` time, as **one of**: * ``access_token`` — wrapped in an ``AccessTokenAuthorizer``. Access tokens expire (~48 h) and are **not** renewed; the client re-registers with a fresh token when one lapses. * ``refresh_token`` + ``client_id`` — wrapped in a ``RefreshTokenAuthorizer``, which transparently renews access tokens, so long-running transfers survive expiry. The credential lives in the endpoint process memory (inside the ``TransferClient``) for the lifetime of the session and is **never** written to disk. Acquire a token via Globus Auth (for example with the Globus CLI or your own helper script), then pass it to ``register_session``. .. note:: For Globus Connect Server **mapped collections**, the token must already carry the per-collection ``data_access`` dependent scope. Otherwise the plugin surfaces a clear ``401`` (``ConsentRequired``) telling the caller to re-acquire a token with that scope, rather than an opaque error. Collections =========== Collections are identified by **UUID** and passed explicitly on the wire. The literal string ``"local"`` (or an omitted collection) resolves to the endpoint's configured *local collection*, discovered at plugin start-up in this order: #. ``RADICAL_ORBIT_GLOBUS_COLLECTION`` environment variable; #. Globus Connect Personal (``globus_sdk.LocalGlobusConnectPersonal``); #. the config file ``~/.radical/orbit/globus.json`` with a ``{"local_collection": ""}`` key; #. otherwise ``None`` (an explicit UUID must then be supplied). A per-session ``local_collection`` passed to ``register_session`` overrides the auto-detected default. .. note:: Facility GCS/DTN collections are generally **not** locally discoverable — only Globus Connect Personal exposes its UUID. On such hosts, set the environment variable or the config file. Client API ========== .. code-block:: python from radical.orbit import BridgeClient bc = BridgeClient() ec = bc.get_endpoint_client(bc.list_endpoints()[0]) globus = ec.get_plugin('globus') # access token, or refresh_token=… + client_id=… globus.register_session(access_token='…', local_collection='…') sub = globus.submit_transfer( source='', destination='local', items=[{'source': '/data/in/', 'destination': '/~/out/', 'recursive': True}], label='my transfer', sync_level='checksum') globus.task_wait(sub['task_id'], timeout=60) print(globus.get_task(sub['task_id'])['status']) Methods: ``register_session(access_token=None, refresh_token=None, client_id=None, local_collection=None)`` Open a session with a Globus credential. ``submit_transfer(source, destination, items, label=None, sync_level=None)`` Submit a transfer. ``items`` is a list of ``{"source", "destination", "recursive"}`` dicts. Returns ``{task_id, submission_id, status}``. ``get_task(task_id)`` / ``task_wait(task_id, timeout=60, polling_interval=10)`` / ``cancel_task(task_id)`` / ``list_tasks(limit=100)`` Task monitoring and control. ``ls(collection, path=None)`` / ``mkdir(collection, path)`` / ``rename(collection, oldpath, newpath)`` / ``delete(collection, paths, recursive=False, label=None)`` Filesystem operations on a collection (``delete`` submits a Globus delete task). ``endpoint_search(filter_text=None, limit=25)`` / ``get_endpoint(endpoint_id)`` Collection discovery and metadata. Notifications ============= Topic ``transfer_status`` is emitted on task state change:: {task_id, status, label, bytes_transferred, files_transferred, nice_status} where ``status`` is the Globus task status (``ACTIVE`` / ``SUCCEEDED`` / ``FAILED``). Example ======= See ``examples/example_globus.py`` for a runnable end-to-end transfer (it defaults to the public Globus Tutorial Collections).