Documentation Index
Fetch the complete documentation index at: https://docs.coreweave.com/llms.txt
Use this file to discover all available pages before exploring further.
Source: src/cwsandbox/_sandbox.py:292
class Sandbox(*, command: str | None = None, args: list[str] | None = None, defaults: SandboxDefaults | None = None, container_image: str | None = None, tags: list[str] | None = None, base_url: str | None = None, request_timeout_seconds: float | None = None, max_lifetime_seconds: float | None = None, profile_ids: list[str] | None = None, runner_ids: list[str] | None = None, resources: dict[str, Any] | None = None, mounted_files: list[dict[str, Any]] | None = None, s3_mount: dict[str, Any] | None = None, ports: list[dict[str, Any]] | None = None, network: NetworkOptions | dict[str, Any] | None = None, max_timeout_seconds: int | None = None, environment_variables: dict[str, str] | None = None, annotations: dict[str, str] | None = None, secrets: Sequence[Secret | dict[str, Any]] | None = None)
CWSandbox client with sync/async hybrid API.
All methods return immediately and can be used in both sync and async contexts.
Operations are executed in a background event loop managed by _LoopManager.
Properties
sandbox_id
@property
def sandbox_id(self) -> str | None
The unique sandbox ID, or None if not yet started.
returncode
@property
def returncode(self) -> int | None
Exit code if sandbox has completed, None if still running.
runner_id
@property
def runner_id(self) -> str | None
Runner where sandbox is running, or None if not started.
profile_id
@property
def profile_id(self) -> str | None
Profile where sandbox is running, or None if not started.
status
@property
def status(self) -> SandboxStatus | None
Last known status of the sandbox.
status_updated_at
@property
def status_updated_at(self) -> datetime | None
Timestamp when status was last confirmed.
started_at
@property
def started_at(self) -> datetime | None
Timestamp when the sandbox was started.
runner_group_id
@property
def runner_group_id(self) -> str | None
Runner group ID where the sandbox is running.
service_address
@property
def service_address(self) -> str | None
External address for accessing sandbox services.
exposed_ports
@property
def exposed_ports(self) -> tuple[tuple[int, str], ...] | None
Exposed ports for the sandbox.
applied_ingress_mode
@property
def applied_ingress_mode(self) -> str | None
The ingress mode applied by the backend (set after start).
applied_egress_mode
@property
def applied_egress_mode(self) -> str | None
The egress mode applied by the backend (set after start).
exec_stats
@property
def exec_stats(self) -> dict[str, int]
Execution statistics for this sandbox.
Methods
run
run(*args: str = (), container_image: str | None = None, defaults: SandboxDefaults | None = None, request_timeout_seconds: float | None = None, max_lifetime_seconds: float | None = None, tags: list[str] | None = None, profile_ids: list[str] | None = None, runner_ids: list[str] | None = None, resources: dict[str, Any] | None = None, mounted_files: list[dict[str, Any]] | None = None, s3_mount: dict[str, Any] | None = None, ports: list[dict[str, Any]] | None = None, network: NetworkOptions | dict[str, Any] | None = None, max_timeout_seconds: int | None = None, environment_variables: dict[str, str] | None = None, annotations: dict[str, str] | None = None, secrets: Sequence[Secret | dict[str, Any]] | None = None) -> Sandbox
Create and start a sandbox, return immediately once backend accepts.
Does NOT wait for RUNNING status. Use .wait() to block until ready.
If positional args are provided, the first is the command and the rest
are its arguments. If no args are provided, uses defaults (tail -f /dev/null).
Parameters
*args (str): Optional command and arguments (e.g., “echo”, “hello”, “world”). If omitted, uses default command from SandboxDefaults.
container_image (str | None): Container image to use
defaults (SandboxDefaults | None): Optional SandboxDefaults to apply
request_timeout_seconds (float | None): Timeout for API requests (client-side)
max_lifetime_seconds (float | None): Max sandbox lifetime (server-side)
tags (list[str] | None): Optional tags for the sandbox
profile_ids (list[str] | None): Optional list of profile IDs
runner_ids (list[str] | None): Optional list of runner IDs
resources (dict[str, Any] | None): Resource requests (CPU, memory, GPU)
mounted_files (list[dict[str, Any]] | None): Files to mount into the sandbox
s3_mount (dict[str, Any] | None): S3 bucket mount configuration
ports (list[dict[str, Any]] | None): Port mappings for the sandbox
network (NetworkOptions | dict[str, Any] | None): Network configuration (NetworkOptions dataclass)
max_timeout_seconds (int | None): Maximum timeout for sandbox operations
environment_variables (dict[str, str] | None): Environment variables to inject into the sandbox. Merges with and overrides matching keys from the session defaults. Use for non-sensitive config only.
annotations (dict[str, str] | None): Kubernetes pod annotations for the sandbox. Merges with and overrides matching keys from the session defaults. Use for non-sensitive metadata only.
secrets (Sequence[Secret | dict[str, Any]] | None): Secrets to inject as environment variables. Merged with defaults (defaults first, then this list).
Returns:
A Sandbox instance (start request sent, but may still be starting)
Examples
# Using defaults (tail -f /dev/null)
sb = Sandbox.run()
# Fire and forget style
sb = Sandbox.run("echo", "hello")
# sb.sandbox_id is set, but sandbox may still be starting
# Wait for ready if needed
sb = Sandbox.run("sleep", "infinity").wait()
result = sb.exec(["echo", "hello"]).result()
# Or use context manager for automatic cleanup
with Sandbox.run("sleep", "infinity") as sb:
result = sb.exec(["echo", "hello"]).result()
session
session(defaults: SandboxDefaults | Mapping[str, Any] | None = None) -> Session
Create a session for managing multiple sandboxes.
Sessions provide:
-
Shared configuration via defaults
-
Automatic cleanup of orphaned sandboxes
-
Function execution via @session.function() decorator
Parameters
-
defaults (SandboxDefaults | Mapping[str, Any] | None): Optional defaults to apply to sandboxes created via session
Returns
Session: A Session instance
Examples
session = Sandbox.session(defaults)
sb = session.create(command="sleep", args=["infinity"])
@session.function()
def compute(x, y):
return x + y
await session.close()
list
list(*, tags: list[str] | None = None, status: str | None = None, profile_ids: list[str] | None = None, runner_ids: list[str] | None = None, include_stopped: bool = False, base_url: str | None = None, timeout_seconds: float | None = None) -> OperationRef[list[Sandbox]]
List existing sandboxes with optional filters.
Returns OperationRef that resolves to Sandbox instances usable for
operations like exec(), stop(), get_status(), read_file(), write_file().
By default, only active (non-terminal) sandboxes are returned.
Set include_stopped=True to widen the search to include terminal
sandboxes (completed, failed, terminated).
A terminal status filter (e.g. status="completed") also widens
the search automatically.
Parameters
tags (list[str] | None): Filter by tags (sandboxes must have ALL specified tags)
status (str | None): Filter by status (“running”, “completed”, “failed”, etc.)
profile_ids (list[str] | None): Filter by profile IDs
runner_ids (list[str] | None): Filter by runner IDs
include_stopped (bool): If True, include terminal sandboxes (completed, failed, terminated). Defaults to False.
base_url (str | None): Override API URL (default: CWSANDBOX_BASE_URL env or default)
timeout_seconds (float | None): Request timeout (default: 300s)
Returns
OperationRef[list[Sandbox]]: OperationRef[list[Sandbox]]: Use .result() to block for results,
OperationRef[list[Sandbox]]: or await directly in async contexts.
Examples
# Sync usage - active sandboxes only (default)
sandboxes = Sandbox.list(tags=["my-batch-job"]).result()
for sb in sandboxes:
print(f"{sb.sandbox_id}: {sb.status}")
sb.stop().result()
# Include stopped sandboxes
all_sandboxes = Sandbox.list(
tags=["my-batch-job"], include_stopped=True
).result()
# Async usage
sandboxes = await Sandbox.list(status="running")
for sb in sandboxes:
result = await sb.exec(["echo", "hello"])
from_id
from_id(sandbox_id: str, *, base_url: str | None = None, timeout_seconds: float | None = None) -> OperationRef[Sandbox]
Attach to an existing sandbox by ID.
Creates a Sandbox instance connected to an existing sandbox,
allowing operations like exec(), stop(), get_status(), etc.
Parameters
sandbox_id (str): The ID of the existing sandbox
base_url (str | None): Override API URL (default: CWSANDBOX_BASE_URL env or default)
timeout_seconds (float | None): Request timeout (default: 300s)
Returns
OperationRef[Sandbox]: OperationRef[Sandbox]: Use .result() to block for the Sandbox instance,
OperationRef[Sandbox]: or await directly in async contexts.
Raises
SandboxNotFoundError: If sandbox doesn’t exist
Examples
# Sync usage
sb = Sandbox.from_id("sandbox-abc123").result()
result = sb.exec(["python", "-c", "print('hello')"]).result()
sb.stop().result()
# Async usage
sb = await Sandbox.from_id("sandbox-abc123")
result = await sb.exec(["python", "-c", "print('hello')"])
delete
delete(sandbox_id: str, *, base_url: str | None = None, timeout_seconds: float | None = None, missing_ok: bool = False) -> OperationRef[None]
Delete a sandbox by ID without creating a Sandbox instance.
This is a convenience method for cleanup scenarios where you
don’t need to perform other operations on the sandbox.
Parameters
sandbox_id (str): The sandbox ID to delete
base_url (str | None): Override API URL (default: CWSANDBOX_BASE_URL env or default)
timeout_seconds (float | None): Request timeout (default: 300s)
missing_ok (bool): If True, suppress SandboxNotFoundError when sandbox doesn’t exist.
Returns
OperationRef[None]: OperationRef[None]: Use .result() to block until complete.
OperationRef[None]: Raises SandboxNotFoundError if not found (unless missing_ok=True),
OperationRef[None]: SandboxError if deletion failed.
Raises
SandboxNotFoundError: If sandbox doesn’t exist and missing_ok=False
SandboxError: If deletion failed for other reasons
Examples
# Sync usage
Sandbox.delete("sandbox-abc123").result()
# Ignore if already deleted
Sandbox.delete("sandbox-abc123", missing_ok=True).result()
# Async usage
await Sandbox.delete("sandbox-abc123")
get_status
get_status() -> SandboxStatus
Get the current status of the sandbox.
For terminal sandboxes (COMPLETED/FAILED/TERMINATED), returns the cached
status without an API call. For active sandboxes, fetches from backend.
Returns
SandboxStatus: SandboxStatus enum value
Raises
SandboxNotRunningError: If sandbox has not been started
Examples
sb = Sandbox.run("sleep", "10")
status = sb.get_status()
print(f"Sandbox is {status}") # SandboxStatus.PENDING or RUNNING
start
start() -> OperationRef[None]
Send StartSandbox to backend, return OperationRef immediately.
Does NOT wait for RUNNING status. Use wait() to block until ready.
Call .result() to block until the start request is accepted.
Returns
OperationRef[None]: OperationRef[None]: Use .result() to block until backend accepts.
Examples
sandbox = Sandbox(command="sleep", args=["infinity"])
sandbox.start().result()
print(f"Started sandbox: {sandbox.sandbox_id}")
sandbox.wait() # Block until RUNNING
wait
wait(timeout: float | None = None) -> Sandbox
Block until sandbox reaches RUNNING or a terminal state.
Returns when sandbox is RUNNING or has already completed (COMPLETED/UNSPECIFIED).
Parameters
timeout (float | None): Maximum seconds to wait. None means use default timeout.
Returns
Sandbox: Self for method chaining. Check .status to determine final state.
Raises
SandboxFailedError: If sandbox fails to start
SandboxTerminatedError: If sandbox was terminated externally
SandboxTimeoutError: If timeout expires
Examples
sb = Sandbox.run("sleep", "infinity").wait()
result = sb.exec(["echo", "ready"]).result()
wait_until_complete
wait_until_complete(timeout: float | None = None, *, raise_on_termination: bool = True) -> OperationRef[Sandbox]
Wait until sandbox reaches terminal state (COMPLETED/FAILED/TERMINATED).
Returns an OperationRef that resolves when the sandbox reaches a terminal state.
After resolving, returncode will be available.
Parameters
timeout (float | None): Maximum seconds to wait. None means use default timeout.
raise_on_termination (bool): If True (default), raises SandboxTerminatedError if sandbox was terminated externally.
Returns
OperationRef[Sandbox]: OperationRef[Sandbox]: Use .result() to block or await in async contexts.
Raises
SandboxTimeoutError: If timeout expires
SandboxTerminatedError: If sandbox was terminated (and raise_on_termination=True)
SandboxFailedError: If sandbox failed
Examples
sb = Sandbox.run("python", "-c", "print('done')")
sb.wait_until_complete().result()
print(f"Exit code: {sb.returncode}")
stop
stop(*, snapshot_on_stop: bool = False, graceful_shutdown_seconds: float = DEFAULT_GRACEFUL_SHUTDOWN_SECONDS, missing_ok: bool = False) -> OperationRef[None]
Stop sandbox, return OperationRef immediately.
The sandbox is deregistered from its session regardless of whether
the stop was successful, since the sandbox is no longer usable.
Parameters
snapshot_on_stop (bool): If True, capture sandbox state before shutdown.
graceful_shutdown_seconds (float): Time to wait for graceful shutdown.
missing_ok (bool): If True, suppress SandboxNotFoundError when sandbox doesn’t exist.
Returns
OperationRef[None]: OperationRef[None]: Use .result() to block until complete.
OperationRef[None]: Raises SandboxError on failure, SandboxNotFoundError if not found
OperationRef[None]: (unless missing_ok=True).
Examples
sb.stop().result() # Block until stopped
# Ignore if already deleted
sb.stop(missing_ok=True).result()
exec
exec(command: Sequence[str], *, cwd: str | None = None, check: bool = False, timeout_seconds: float | None = None, stdin: bool = False) -> Process
Execute command, return Process immediately.
Note: If sandbox is not yet RUNNING, this method waits for it first.
The timeout_seconds parameter only applies to command execution, not to
the initial wait for RUNNING status.
Parameters
command (Sequence[str]): Command and arguments to execute
cwd (str | None): Working directory for command execution. Must be an absolute path. When specified, the command is wrapped with a shell cd.
check (bool): If True, raise SandboxExecutionError on non-zero returncode
timeout_seconds (float | None): Timeout for command execution (after sandbox is RUNNING). Does not include time waiting for sandbox to reach RUNNING status.
stdin (bool): If True, enable stdin streaming. Process.stdin will be a StreamWriter that can send input to the command. If False (default), stdin is closed immediately and Process.stdin is None.
Returns
Process: Process handle with streaming stdout/stderr. Call .result() to block
Process: for the final ProcessResult, or iterate over .stdout/.stderr for
Process: real-time output. When stdin=True, Process.stdin is a StreamWriter.
Raises
ValueError: If command is empty or cwd is invalid (empty or relative path)
Examples
# Get result directly
process = sb.exec(["echo", "hello"])
result = process.result()
print(result.stdout)
# With working directory
result = sb.exec(["ls", "-la"], cwd="/app").result()
# Stream output in real-time
process = sb.exec(["python", "script.py"])
for line in process.stdout:
print(line)
result = process.result()
# With stdin streaming
process = sb.exec(["cat"], stdin=True)
process.stdin.write(b"hello world").result()
process.stdin.close().result()
result = process.result()
# Async usage
result = await sb.exec(["echo", "hello"])
shell
shell(command: Sequence[str] | None = None, *, width: int | None = None, height: int | None = None) -> TerminalSession
Start an interactive TTY session in the sandbox.
Returns a TerminalSession optimized for interactive terminal use:
raw byte output (no decode/re-encode), no output buffering, and
fire-and-forget stdin.
Parameters
command (Sequence[str] | None): Shell command to execute. Defaults to [“/bin/bash”]. Accepts a sequence like [“/bin/sh”] or [“/usr/bin/python3”].
width (int | None): Initial terminal width in columns.
height (int | None): Initial terminal height in rows.
Returns
TerminalSession: TerminalSession handle with .output (StreamReader[bytes]),
TerminalSession: .stdin (StreamWriter), and .resize(w, h).
Raises
ValueError: If command is explicitly empty.
Example
session = sandbox.shell(width=80, height=24)
session.stdin.writeline("echo hello").result()
for chunk in session.output:
sys.stdout.buffer.write(chunk)
exit_code = session.wait()
read_file
read_file(filepath: str, *, timeout_seconds: float | None = None) -> OperationRef[bytes]
Read file from sandbox, return OperationRef immediately.
Parameters
filepath (str): Path to file in sandbox
timeout_seconds (float | None): Timeout for the operation
Returns
OperationRef[bytes]: OperationRef[bytes]: Use .result() to block and retrieve contents.
Examples
data = sb.read_file("/output/result.txt").result()
write_file
write_file(filepath: str, contents: bytes, *, timeout_seconds: float | None = None) -> OperationRef[None]
Write file to sandbox, return OperationRef immediately.
Parameters
filepath (str): Path to file in sandbox
contents (bytes): File contents as bytes
timeout_seconds (float | None): Timeout for the operation
Returns
OperationRef[None]: OperationRef[None]: Use .result() to block until complete.
Examples
sb.write_file("/input/data.txt", b"content").result()
stream_logs
stream_logs(*, follow: bool = False, tail_lines: int | None = None, since_time: datetime | None = None, timestamps: bool = False, timeout_seconds: float | None = None) -> StreamReader[str]
Stream logs from the sandbox’s main process.
Streams stdout/stderr from the sandbox’s main command, the
entrypoint passed to Sandbox.run() (or the default
tail -f /dev/null). Output from commands started via exec()
is not included; use Process.stdout/Process.stderr for
those.
.. note::
Sandboxes created with the default command (tail -f /dev/null)
do not produce any log output. To see logs here, pass a command
that writes to stdout/stderr when calling Sandbox.run().
Returns a StreamReader that yields log lines as strings. The method
returns immediately, iteration on the StreamReader blocks until
data arrives.
Can also retrieve historical logs from stopped sandboxes when
follow=False.
Parameters
follow (bool): If True, continuously stream new logs (like tail -f). If False, stream existing logs and stop. Only running sandboxes support follow=True.
tail_lines (int | None): Number of most recent lines to retrieve. If None, returns all available lines.
since_time (datetime | None): Only return logs after this timestamp.
timestamps (bool): If True, prefix each line with an ISO 8601 timestamp from the server.
timeout_seconds (float | None): Client-side deadline for the gRPC call. Defaults to request_timeout_seconds when follow=False, and None (no timeout) when follow=True.
Returns
StreamReader[str]: StreamReader yielding log lines as strings. Iterate synchronously
StreamReader[str]: with for line in reader or asynchronously with
StreamReader[str]: async for line in reader.
Raises
SandboxNotRunningError: If follow=True and the sandbox has been stopped.
SandboxError: If the log stream encounters an error.
Example
# One-shot: get recent logs
for line in sandbox.stream_logs(tail_lines=100):
print(line, end="")
# Follow mode: stream continuously
for line in sandbox.stream_logs(follow=True):
print(line, end="")
# Retrieve logs from a stopped sandbox
sb = Sandbox.from_id("sbx-abc123").result()
for line in sb.stream_logs(tail_lines=50):
print(line, end="")
# Async usage
async for line in sandbox.stream_logs(follow=True):
print(line, end="")