Pod Lifecycle with Lium()
This example shows the core pod workflow in Python: choose a GPU, create a pod, wait for SSH readiness, run a command, move a small file, and stop the pod.
Use this pattern when you want a script, notebook, CI job, or agent to manage pod lifecycle directly instead of shelling out to the CLI.
#!/usr/bin/env python3
"""Create a Lium pod, run a command, transfer a file, and clean up."""
from pathlib import Path
from lium.sdk import Lium
GPU_TYPE = "A100"
GPU_COUNT = 1
POD_NAME = "sdk-lifecycle-demo"
LOCAL_NOTE = Path("lium-sdk-note.txt")
DOWNLOADED_NOTE = Path("lium-sdk-note.remote.txt")
REMOTE_NOTE = "/root/lium-sdk-note.txt"
def require_success(result: dict, label: str) -> None:
"""Raise a useful local error when a remote command fails."""
if result["success"]:
return
stderr = result.get("stderr", "").strip()
stdout = result.get("stdout", "").strip()
details = stderr or stdout or f"exit code {result.get('exit_code')}"
raise RuntimeError(f"{label} failed: {details}")
lium = Lium()
created_pod_id = None
ready_pod = None
try:
# 1. Find a matching GPU executor.
executors = lium.ls(gpu_type=GPU_TYPE, gpu_count=GPU_COUNT)
if not executors:
raise RuntimeError(f"No available {GPU_COUNT}x {GPU_TYPE} executors")
# Pick the lowest listed hourly price for this small demo.
executor = min(executors, key=lambda item: item.price_per_hour)
print(
f"Using {executor.machine_name} at ${executor.price_per_hour:.2f}/hr "
f"({executor.huid})"
)
# 2. Create the pod.
pod = lium.up(
executor_id=executor.id,
name=POD_NAME,
ports=1,
)
created_pod_id = pod["id"]
print(f"Created pod {pod.get('name', POD_NAME)} ({created_pod_id})")
# 3. Wait until the pod is running and has SSH metadata.
ready_pod = lium.wait_ready(pod, timeout=600)
if ready_pod is None:
raise RuntimeError("Pod did not become ready before the timeout")
print(f"Ready: {ready_pod.name} ({ready_pod.huid})")
print(lium.ssh(ready_pod))
# 4. Inspect active pods, like `lium ps`.
print("Active pods:")
for active in lium.ps():
print(f"- {active.name}: {active.status} ({active.huid})")
# 5. Run a command over SSH, like `lium exec`.
gpu_info = lium.exec(ready_pod, command="nvidia-smi")
require_success(gpu_info, "nvidia-smi")
print(gpu_info["stdout"])
# 6. Upload and download a file, like `lium scp`.
LOCAL_NOTE.write_text("hello from the local machine\n", encoding="utf-8")
lium.upload(ready_pod, local=str(LOCAL_NOTE), remote=REMOTE_NOTE)
cat_note = lium.exec(ready_pod, command=f"cat {REMOTE_NOTE}")
require_success(cat_note, "cat uploaded note")
print(cat_note["stdout"].strip())
lium.download(ready_pod, remote=REMOTE_NOTE, local=str(DOWNLOADED_NOTE))
print(f"Downloaded: {DOWNLOADED_NOTE.read_text(encoding='utf-8').strip()}")
finally:
# Always stop temporary pods so they do not keep accruing charges.
pod_to_stop = ready_pod
if pod_to_stop is None and created_pod_id:
pod_to_stop = next((p for p in lium.ps() if p.id == created_pod_id), None)
if pod_to_stop is not None:
lium.down(pod_to_stop)
print(f"Stopped pod {pod_to_stop.name} ({pod_to_stop.huid})")
Notes​
- Call
wait_ready()beforeexec,upload,download,rsync, orssh; those operations need SSH connection metadata. - Use
try/finallyaround temporary pods so failures do not leave a pod running. - Use
stream_exec()instead ofexec()for long-running jobs where you want incremental output. - Use
rsync()for directory syncs only when your container image hasrsyncinstalled. The training workflow shows how to check for it and install it when missing.