Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.coreweave.com/llms.txt

Use this file to discover all available pages before exploring further.

Source: src/cwsandbox/_session.py:37
class Session(defaults: SandboxDefaults | Mapping[str, Any] | None = None, report_to: list[str] | None = None)
Manages sandbox lifecycle and provides function execution. Use a session when:
  • Creating multiple sandboxes with shared configuration
  • Executing Python functions in sandboxes
  • You want automatic cleanup of orphaned sandboxes
Metrics are automatically tracked when exec() completes on any sandbox associated with this session. Use log_metrics(step=N) to log metrics at specific training steps.

Properties

sandbox_count

@property
def sandbox_count(self) -> int
Number of sandboxes currently tracked by this session.

Methods

log_metrics

log_metrics(step: int | None = None, *, reset: bool = True) -> bool
Log accumulated sandbox metrics to wandb. Call this during training to correlate sandbox usage with training steps. Metrics are automatically tracked when exec() completes, so users only need to call log_metrics() for step correlation. Metrics are also logged automatically on session close. Parameters
  • step (int | None): Training step to associate with metrics. If provided, metrics are logged at this step number in wandb.
  • reset (bool): If True (default), reset accumulated metrics after a successful log. Metrics are preserved if log() fails (no active wandb run). Set to False to keep accumulating regardless.
Returns
  • bool: True if metrics were logged, False if no reporter configured
  • bool: or no active wandb run.
Examples
with Session(defaults, report_to=["wandb"]) as session:
    for step in range(100):
        sb = session.sandbox(...)
        result = sb.exec(...).result()  # Metrics tracked automatically
        session.log_metrics(step=step)  # Log at each step

get_metrics

get_metrics() -> dict[str, Any]
Get current accumulated metrics. Returns
  • dict[str, Any]: Dictionary with cwsandbox/* prefixed metric names and values.
  • dict[str, Any]: Empty dict if no reporter is configured.

close

close() -> OperationRef[None]
Stop all managed sandboxes, return OperationRef immediately. Returns
  • OperationRef[None]: OperationRef[None]: Use .result() to block until all sandboxes stopped.
Raises
  • SandboxError: If one or more running sandboxes failed to stop.
Examples
session.close().result()  # Block until all sandboxes stopped

sandbox

sandbox(*, command: str | None = None, args: list[str] | None = None, container_image: str | None = None, tags: list[str] | None = None, profile_ids: list[str] | None = None, runner_ids: list[str] | None = None, resources: dict[str, Any] | None = None, mounted_files: list[dict[str, Any]] | None = None, s3_mount: dict[str, Any] | None = None, ports: list[dict[str, Any]] | None = None, network: NetworkOptions | dict[str, Any] | None = None, max_timeout_seconds: int | None = None, environment_variables: dict[str, str] | None = None, annotations: dict[str, str] | None = None, secrets: Sequence[Secret | dict[str, Any]] | None = None) -> Sandbox
Create an unstarted sandbox with session defaults. Returns immediately without any network calls. The sandbox auto-starts on first operation (exec, read_file, write_file, wait), or can be started explicitly with start().result(). Parameters
  • command (str | None): Command to run in sandbox
  • args (list[str] | None): Arguments for the command
  • container_image (str | None): Container image to use
  • tags (list[str] | None): Tags for the sandbox (merged with session defaults)
  • profile_ids (list[str] | None): Optional list of profile IDs
  • runner_ids (list[str] | None): Optional list of runner IDs
  • resources (dict[str, Any] | None): Resource requests (CPU, memory, GPU)
  • mounted_files (list[dict[str, Any]] | None): Files to mount into the sandbox
  • s3_mount (dict[str, Any] | None): S3 bucket mount configuration
  • ports (list[dict[str, Any]] | None): Port mappings for the sandbox
  • network (NetworkOptions | dict[str, Any] | None): Network configuration (NetworkOptions dataclass)
  • max_timeout_seconds (int | None): Maximum timeout for sandbox operations
  • environment_variables (dict[str, str] | None): Environment variables to inject into the sandbox. Merges with and overrides matching keys from the session defaults. Use for non-sensitive config only.
  • annotations (dict[str, str] | None): Kubernetes pod annotations for the sandbox. Merges with and overrides matching keys from the session defaults. Use for non-sensitive metadata only.
  • secrets (Sequence[Secret | dict[str, Any]] | None): Secrets to inject as environment variables. Merged with session defaults (defaults first, then this list).
Returns
  • Sandbox: An unstarted Sandbox registered with the session.
Raises
  • SandboxError: If the session has been closed.
Examples
with Session(defaults) as session:
    # Auto-start: sandbox starts on first exec()
    sb = session.sandbox(command="sleep", args=["infinity"])
    result = sb.exec(["echo", "hello"]).result()

    # Explicit start for control over timing
    sb2 = session.sandbox(command="sleep", args=["infinity"])
    sb2.start().result()
    sb2.wait()
    result = sb2.exec(["echo", "hello"]).result()

list

list(*, tags: list[str] | None = None, status: str | None = None, profile_ids: list[str] | None = None, runner_ids: list[str] | None = None, include_stopped: bool = False, adopt: bool = False) -> OperationRef[list[Sandbox]]
List sandboxes, optionally adopting them into this session. Automatically includes the session’s default tags in the filter. This makes it easy to find sandboxes created by this session or a previous run with the same defaults. By default, only active (non-terminal) sandboxes are returned. Set include_stopped=True to widen the search to include terminal sandboxes (completed, failed, terminated). A terminal status filter (e.g. status="completed") also widens the search automatically. Parameters
  • tags (list[str] | None): Additional tags to filter by (merged with session’s default tags)
  • status (str | None): Filter by status
  • profile_ids (list[str] | None): Filter by profile IDs (defaults to session’s profile_ids if set)
  • runner_ids (list[str] | None): Filter by runner IDs (defaults to session’s runner_ids if set)
  • include_stopped (bool): If True, include terminal sandboxes (completed, failed, terminated). Defaults to False.
  • adopt (bool): If True, register discovered sandboxes with this session so they are stopped when the session closes
Returns
  • OperationRef[list[Sandbox]]: OperationRef[list[Sandbox]]: Use .result() to block for results,
  • OperationRef[list[Sandbox]]: or await directly in async contexts.
Examples
# Session defaults include a tag for this application/run
defaults = SandboxDefaults(tags=("my-app", "run-abc123"))

with Session(defaults) as session:
    # Sync usage - automatically filters by ["my-app", "run-abc123"]
    orphans = session.list(adopt=True).result()

    # Can add additional filters
    running = session.list(status="running").result()

    # Include stopped sandboxes for cleanup or audit
    all_sandboxes = session.list(include_stopped=True).result()

# Async usage
async with Session(defaults) as session:
    orphans = await session.list(adopt=True)

from_id

from_id(sandbox_id: str, *, adopt: bool = True) -> OperationRef[Sandbox]
Attach to an existing sandbox, optionally adopting it into this session. Parameters
  • sandbox_id (str): The ID of the existing sandbox
  • adopt (bool): If True (default), register the sandbox with this session
Returns
  • OperationRef[Sandbox]: OperationRef[Sandbox]: Use .result() to block for the Sandbox instance,
  • OperationRef[Sandbox]: or await directly in async contexts.
Examples
with Session(defaults) as session:
    # Sync usage - reconnect to a sandbox
    sb = session.from_id("sandbox-abc123").result()
    result = sb.exec(["echo", "hello"]).result()
# sb is stopped when session exits

# Async usage
async with Session(defaults) as session:
    sb = await session.from_id("sandbox-abc123")
    result = await sb.exec(["echo", "hello"])

adopt

adopt(sandbox: Sandbox) -> None
Adopt an existing Sandbox instance into this session for cleanup tracking. Use this when you have a Sandbox from Sandbox.list() or Sandbox.from_id() that you want to be automatically stopped when the session closes. Parameters
  • sandbox (Sandbox): A Sandbox instance to track
Raises
  • SandboxError: If the session is closed
  • ValueError: If the sandbox has no sandbox_id
Examples
with Session(defaults) as session:
    # Get sandboxes via class method
    sandboxes = Sandbox.list(tags=["my-job"]).result()

    # Adopt them into the session
    for sb in sandboxes:
        session.adopt(sb)

    # Now they'll be stopped when session closes

function

function(*, container_image: str | None = None, serialization: Serialization = Serialization.JSON, temp_dir: str | None = None, profile_ids: list[str] | None = None, runner_ids: list[str] | None = None, resources: dict[str, Any] | None = None, mounted_files: Sequence[dict[str, Any]] | None = None, s3_mount: dict[str, Any] | None = None, ports: Sequence[dict[str, Any]] | None = None, network: NetworkOptions | dict[str, Any] | None = None, max_timeout_seconds: int | None = None, environment_variables: dict[str, str] | None = None, annotations: dict[str, str] | None = None) -> Callable[[Callable[P, R]], RemoteFunction[P, R]]
Decorator to execute a Python function in a sandbox. Each function call creates an ephemeral sandbox, executes the function, and returns the result. The sandbox is automatically cleaned up. The decorated function must be synchronous. Async functions are not supported. Parameters
  • container_image (str | None): Override session’s default image for this function
  • serialization (Serialization): How to serialize arguments and return values. Defaults to JSON for safety. Use PICKLE for complex types, but only in trusted environments.
  • temp_dir (str | None): Override temp directory for payload/result files in sandbox. Defaults to session default. Created if missing.
  • profile_ids (list[str] | None): Optional list of profile IDs
  • runner_ids (list[str] | None): Optional list of runner IDs
  • resources (dict[str, Any] | None): Resource requests (CPU, memory, GPU)
  • mounted_files (Sequence[dict[str, Any]] | None): Files to mount into the sandbox
  • s3_mount (dict[str, Any] | None): S3 bucket mount configuration
  • ports (Sequence[dict[str, Any]] | None): Port mappings for the sandbox
  • network (NetworkOptions | dict[str, Any] | None): Network configuration (NetworkOptions dataclass)
  • max_timeout_seconds (int | None): Maximum timeout for sandbox operations
  • environment_variables (dict[str, str] | None): Environment variables to inject into the sandbox. Merges with and overrides matching keys from the session defaults. Use for non-sensitive config only.
  • annotations (dict[str, str] | None): Kubernetes pod annotations for the sandbox. Merges with and overrides matching keys from the session defaults. Use for non-sensitive metadata only.
Returns
  • Callable[[Callable[P, R]], RemoteFunction[P, R]]: A decorator that wraps a function as a RemoteFunction
Examples
with Session(defaults) as session:
    @session.function()
    def compute(x: int, y: int) -> int:
        return x + y

    @session.function(serialization=Serialization.PICKLE)
    def process_complex(data: MyClass) -> MyClass:
        return data.transform()

    # Call .remote() to execute in sandbox
    ref = compute.remote(2, 3)  # Returns OperationRef immediately
    result = ref.result()       # Block for result
    print(result)  # 5

    # Or use await in async context
    result = await compute.remote(2, 3)

    # Execute locally for testing
    result = compute.local(2, 3)

    # Map over multiple inputs in parallel
    refs = compute.map([(1, 2), (3, 4), (5, 6)])
    results = [ref.result() for ref in refs]
Last modified on April 21, 2026