Skip to main content

Pod Lifecycle with Lium()

This example shows the core pod workflow in Python: choose a GPU, create a pod, wait for SSH readiness, run a command, move a small file, and stop the pod.

Use this pattern when you want a script, notebook, CI job, or agent to manage pod lifecycle directly instead of shelling out to the CLI.

#!/usr/bin/env python3
"""Create a Lium pod, run a command, transfer a file, and clean up."""

from pathlib import Path

from lium.sdk import Lium

GPU_TYPE = "A100"
GPU_COUNT = 1
POD_NAME = "sdk-lifecycle-demo"

LOCAL_NOTE = Path("lium-sdk-note.txt")
DOWNLOADED_NOTE = Path("lium-sdk-note.remote.txt")
REMOTE_NOTE = "/root/lium-sdk-note.txt"


def require_success(result: dict, label: str) -> None:
"""Raise a useful local error when a remote command fails."""
if result["success"]:
return

stderr = result.get("stderr", "").strip()
stdout = result.get("stdout", "").strip()
details = stderr or stdout or f"exit code {result.get('exit_code')}"
raise RuntimeError(f"{label} failed: {details}")


lium = Lium()
created_pod_id = None
ready_pod = None

try:
# 1. Find a matching GPU executor.
executors = lium.ls(gpu_type=GPU_TYPE, gpu_count=GPU_COUNT)
if not executors:
raise RuntimeError(f"No available {GPU_COUNT}x {GPU_TYPE} executors")

# Pick the lowest listed hourly price for this small demo.
executor = min(executors, key=lambda item: item.price_per_hour)
print(
f"Using {executor.machine_name} at ${executor.price_per_hour:.2f}/hr "
f"({executor.huid})"
)

# 2. Create the pod.
pod = lium.up(
executor_id=executor.id,
name=POD_NAME,
ports=1,
)
created_pod_id = pod["id"]
print(f"Created pod {pod.get('name', POD_NAME)} ({created_pod_id})")

# 3. Wait until the pod is running and has SSH metadata.
ready_pod = lium.wait_ready(pod, timeout=600)
if ready_pod is None:
raise RuntimeError("Pod did not become ready before the timeout")

print(f"Ready: {ready_pod.name} ({ready_pod.huid})")
print(lium.ssh(ready_pod))

# 4. Inspect active pods, like `lium ps`.
print("Active pods:")
for active in lium.ps():
print(f"- {active.name}: {active.status} ({active.huid})")

# 5. Run a command over SSH, like `lium exec`.
gpu_info = lium.exec(ready_pod, command="nvidia-smi")
require_success(gpu_info, "nvidia-smi")
print(gpu_info["stdout"])

# 6. Upload and download a file, like `lium scp`.
LOCAL_NOTE.write_text("hello from the local machine\n", encoding="utf-8")
lium.upload(ready_pod, local=str(LOCAL_NOTE), remote=REMOTE_NOTE)

cat_note = lium.exec(ready_pod, command=f"cat {REMOTE_NOTE}")
require_success(cat_note, "cat uploaded note")
print(cat_note["stdout"].strip())

lium.download(ready_pod, remote=REMOTE_NOTE, local=str(DOWNLOADED_NOTE))
print(f"Downloaded: {DOWNLOADED_NOTE.read_text(encoding='utf-8').strip()}")

finally:
# Always stop temporary pods so they do not keep accruing charges.
pod_to_stop = ready_pod
if pod_to_stop is None and created_pod_id:
pod_to_stop = next((p for p in lium.ps() if p.id == created_pod_id), None)

if pod_to_stop is not None:
lium.down(pod_to_stop)
print(f"Stopped pod {pod_to_stop.name} ({pod_to_stop.huid})")

Notes​

  • Call wait_ready() before exec, upload, download, rsync, or ssh; those operations need SSH connection metadata.
  • Use try / finally around temporary pods so failures do not leave a pod running.
  • Use stream_exec() instead of exec() for long-running jobs where you want incremental output.
  • Use rsync() for directory syncs only when your container image has rsync installed. The training workflow shows how to check for it and install it when missing.