Train a Model on a Pod
This example runs a local training project on a Lium GPU pod. It selects a GPU, starts a pod with a PyTorch template, syncs code and data, installs dependencies, streams the training output, downloads the best checkpoint, and stops the pod.
The script assumes this local project layout:
.
|-- requirements.txt
|-- src/
| `-- train.py
`-- data/
src/train.py should accept --data-dir and --output-dir, then write the checkpoint to checkpoints/best_model.pt.
Run the orchestration script from the project root:
#!/usr/bin/env python3
"""Run a local training project on a Lium GPU pod."""
from pathlib import Path
from lium.sdk import Lium
GPU_TYPE = "A100"
GPU_COUNT = 1
MAX_PRICE_PER_HOUR = 1.50
MIN_CUDA_VERSION = 12.4
POD_NAME = "sdk-training-demo"
PROJECT_ROOT = Path.cwd()
LOCAL_REQUIREMENTS = PROJECT_ROOT / "requirements.txt"
LOCAL_SRC = PROJECT_ROOT / "src"
LOCAL_DATA = PROJECT_ROOT / "data"
LOCAL_CHECKPOINT = PROJECT_ROOT / "checkpoints" / "best_model.pt"
REMOTE_WORKSPACE = "/workspace"
REMOTE_REQUIREMENTS = f"{REMOTE_WORKSPACE}/requirements.txt"
REMOTE_SRC = f"{REMOTE_WORKSPACE}/src"
REMOTE_DATA = f"{REMOTE_WORKSPACE}/data"
REMOTE_CHECKPOINTS = f"{REMOTE_WORKSPACE}/checkpoints"
REMOTE_BEST_MODEL = f"{REMOTE_CHECKPOINTS}/best_model.pt"
def require_local_inputs() -> None:
missing = [
str(path)
for path in (LOCAL_REQUIREMENTS, LOCAL_SRC, LOCAL_DATA)
if not path.exists()
]
if missing:
raise RuntimeError(f"Missing local training inputs: {', '.join(missing)}")
def require_success(result: dict, label: str) -> None:
if result["success"]:
return
stderr = result.get("stderr", "").strip()
stdout = result.get("stdout", "").strip()
details = stderr or stdout or f"exit code {result.get('exit_code')}"
raise RuntimeError(f"{label} failed: {details}")
def ensure_rsync(lium: Lium, pod) -> None:
check = lium.exec(pod, command="which rsync")
if check["success"]:
return
install = lium.exec(
pod,
command="apt-get update -qq && apt-get install -y rsync -qq",
)
require_success(install, "install rsync")
def select_executor(lium: Lium):
executors = lium.ls(
gpu_type=GPU_TYPE,
gpu_count=GPU_COUNT,
min_cuda_version=MIN_CUDA_VERSION,
)
executors = [
executor
for executor in executors
if executor.price_per_hour <= MAX_PRICE_PER_HOUR
]
if not executors:
raise RuntimeError(
f"No {GPU_COUNT}x {GPU_TYPE} executor under "
f"${MAX_PRICE_PER_HOUR:.2f}/hr with CUDA >= {MIN_CUDA_VERSION}"
)
return min(executors, key=lambda executor: executor.price_per_hour)
def select_pytorch_template(lium: Lium):
templates = lium.templates(filter="pytorch")
verified = [
template
for template in templates
if template.status.upper() == "VERIFY_SUCCESS"
]
if not verified:
raise RuntimeError("No verified PyTorch template is available")
return verified[0]
require_local_inputs()
lium = Lium()
created_pod_id = None
ready_pod = None
try:
executor = select_executor(lium)
template = select_pytorch_template(lium)
print(
f"Using {executor.machine_name} at ${executor.price_per_hour:.2f}/hr "
f"with template {template.name}"
)
pod = lium.up(
executor_id=executor.id,
name=POD_NAME,
template_id=template.id,
)
created_pod_id = pod["id"]
ready_pod = lium.wait_ready(pod, timeout=600)
if ready_pod is None:
raise RuntimeError("Pod did not become ready before the timeout")
mkdir = lium.exec(
ready_pod,
command=f"mkdir -p {REMOTE_SRC} {REMOTE_DATA} {REMOTE_CHECKPOINTS}",
)
require_success(mkdir, "create remote workspace")
lium.upload(
ready_pod,
local=str(LOCAL_REQUIREMENTS),
remote=REMOTE_REQUIREMENTS,
)
ensure_rsync(lium, ready_pod)
lium.rsync(ready_pod, local=f"{LOCAL_SRC}/", remote=f"{REMOTE_SRC}/")
lium.rsync(ready_pod, local=f"{LOCAL_DATA}/", remote=f"{REMOTE_DATA}/")
install = lium.exec(
ready_pod,
command=f"cd {REMOTE_WORKSPACE} && python -m pip install -r requirements.txt",
)
require_success(install, "install dependencies")
train_command = (
f"cd {REMOTE_WORKSPACE} && "
"PYTHONUNBUFFERED=1 python src/train.py "
"--data-dir data "
"--output-dir checkpoints"
)
for chunk in lium.stream_exec(ready_pod, command=train_command):
print(chunk["data"], end="")
checkpoint = lium.exec(ready_pod, command=f"test -f {REMOTE_BEST_MODEL}")
require_success(checkpoint, "check training artifact")
LOCAL_CHECKPOINT.parent.mkdir(parents=True, exist_ok=True)
lium.download(
ready_pod,
remote=REMOTE_BEST_MODEL,
local=str(LOCAL_CHECKPOINT),
)
print(f"Downloaded checkpoint to {LOCAL_CHECKPOINT}")
finally:
pod_to_stop = ready_pod
if pod_to_stop is None and created_pod_id:
pod_to_stop = next((p for p in lium.ps() if p.id == created_pod_id), None)
if pod_to_stop is not None:
lium.down(pod_to_stop)
print(f"Stopped pod {pod_to_stop.name} ({pod_to_stop.huid})")
Adapting the Workflow​
- Change
GPU_TYPE,GPU_COUNT,MAX_PRICE_PER_HOUR, andMIN_CUDA_VERSIONto match the hardware your job needs. - Update
train_commandif your training script uses different flags or writes artifacts to a different path. - Use
lium.exec()instead ofstream_exec()when you want a finalstdout,stderr, andexit_coderesult dictionary instead of live output. ensure_rsync()mirrors the CLI behavior: it checks whether the pod hasrsync, then installs it withapt-getwhen missing.- For multiple output files, write a small archive on the pod with
tarand download that single archive withdownload().
Minimal train.py​
If you do not already have a training script, use this minimal PyTorch example to verify the workflow. It trains a tiny linear model, uses CUDA when available, streams progress, and writes checkpoints/best_model.pt. With the PyTorch template above, requirements.txt can be empty.
src/train.py
#!/usr/bin/env python3
import argparse
from pathlib import Path
import torch
parser = argparse.ArgumentParser()
parser.add_argument("--data-dir", required=True)
parser.add_argument("--output-dir", required=True)
args = parser.parse_args()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
data_dir = Path(args.data_dir)
output_dir = Path(args.output_dir)
records = sorted(path.name for path in data_dir.iterdir())
print(f"input files: {records}", flush=True)
print(f"torch: {torch.__version__}", flush=True)
print(f"device: {device}", flush=True)
torch.manual_seed(7)
x = torch.linspace(-1, 1, 256, device=device).unsqueeze(1)
y = 2.0 * x + 0.3
model = torch.nn.Linear(1, 1).to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=0.2)
loss_fn = torch.nn.MSELoss()
for step in range(1, 31):
optimizer.zero_grad()
loss = loss_fn(model(x), y)
loss.backward()
optimizer.step()
if step % 10 == 0:
print(f"step {step:02d} loss={loss.item():.6f}", flush=True)
output_dir.mkdir(parents=True, exist_ok=True)
torch.save(
{
"state_dict": model.state_dict(),
"final_loss": loss.item(),
"device": str(device),
"input_files": records,
},
output_dir / "best_model.pt",
)
print("checkpoint written", flush=True)