---
title: Train a Model on a Pod
sidebar_label: Training Workflow
---

> ## Documentation Index
> Fetch the complete documentation index at: https://docs.lium.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Train a Model on a Pod

This example runs a local training project on a Lium GPU pod. It selects a GPU, starts a pod with a PyTorch template, syncs code and data, installs dependencies, streams the training output, downloads the best checkpoint, and stops the pod.

The script assumes this local project layout:

```text
.
|-- requirements.txt
|-- src/
|   `-- train.py
`-- data/
```

`src/train.py` should accept `--data-dir` and `--output-dir`, then write the checkpoint to `checkpoints/best_model.pt`.

Run the orchestration script from the project root:

```python
#!/usr/bin/env python3
"""Run a local training project on a Lium GPU pod."""

from pathlib import Path

from lium.sdk import Lium

GPU_TYPE = "A100"
GPU_COUNT = 1
MAX_PRICE_PER_HOUR = 1.50
MIN_CUDA_VERSION = 12.4
POD_NAME = "sdk-training-demo"

PROJECT_ROOT = Path.cwd()
LOCAL_REQUIREMENTS = PROJECT_ROOT / "requirements.txt"
LOCAL_SRC = PROJECT_ROOT / "src"
LOCAL_DATA = PROJECT_ROOT / "data"
LOCAL_CHECKPOINT = PROJECT_ROOT / "checkpoints" / "best_model.pt"

REMOTE_WORKSPACE = "/workspace"
REMOTE_REQUIREMENTS = f"{REMOTE_WORKSPACE}/requirements.txt"
REMOTE_SRC = f"{REMOTE_WORKSPACE}/src"
REMOTE_DATA = f"{REMOTE_WORKSPACE}/data"
REMOTE_CHECKPOINTS = f"{REMOTE_WORKSPACE}/checkpoints"
REMOTE_BEST_MODEL = f"{REMOTE_CHECKPOINTS}/best_model.pt"


def require_local_inputs() -> None:
    missing = [
        str(path)
        for path in (LOCAL_REQUIREMENTS, LOCAL_SRC, LOCAL_DATA)
        if not path.exists()
    ]
    if missing:
        raise RuntimeError(f"Missing local training inputs: {', '.join(missing)}")


def require_success(result: dict, label: str) -> None:
    if result["success"]:
        return

    stderr = result.get("stderr", "").strip()
    stdout = result.get("stdout", "").strip()
    details = stderr or stdout or f"exit code {result.get('exit_code')}"
    raise RuntimeError(f"{label} failed: {details}")


def ensure_rsync(lium: Lium, pod) -> None:
    check = lium.exec(pod, command="which rsync")
    if check["success"]:
        return

    install = lium.exec(
        pod,
        command="apt-get update -qq && apt-get install -y rsync -qq",
    )
    require_success(install, "install rsync")


def select_executor(lium: Lium):
    executors = lium.ls(
        gpu_type=GPU_TYPE,
        gpu_count=GPU_COUNT,
        min_cuda_version=MIN_CUDA_VERSION,
    )
    executors = [
        executor
        for executor in executors
        if executor.price_per_hour <= MAX_PRICE_PER_HOUR
    ]
    if not executors:
        raise RuntimeError(
            f"No {GPU_COUNT}x {GPU_TYPE} executor under "
            f"${MAX_PRICE_PER_HOUR:.2f}/hr with CUDA >= {MIN_CUDA_VERSION}"
        )

    return min(executors, key=lambda executor: executor.price_per_hour)


def select_pytorch_template(lium: Lium):
    templates = lium.templates(filter="pytorch")
    verified = [
        template
        for template in templates
        if template.status.upper() == "VERIFY_SUCCESS"
    ]
    if not verified:
        raise RuntimeError("No verified PyTorch template is available")

    return verified[0]


require_local_inputs()

lium = Lium()
created_pod_id = None
ready_pod = None

try:
    executor = select_executor(lium)
    template = select_pytorch_template(lium)

    print(
        f"Using {executor.machine_name} at ${executor.price_per_hour:.2f}/hr "
        f"with template {template.name}"
    )

    pod = lium.up(
        executor_id=executor.id,
        name=POD_NAME,
        template_id=template.id,
    )
    created_pod_id = pod["id"]

    ready_pod = lium.wait_ready(pod, timeout=600)
    if ready_pod is None:
        raise RuntimeError("Pod did not become ready before the timeout")

    mkdir = lium.exec(
        ready_pod,
        command=f"mkdir -p {REMOTE_SRC} {REMOTE_DATA} {REMOTE_CHECKPOINTS}",
    )
    require_success(mkdir, "create remote workspace")

    lium.upload(
        ready_pod,
        local=str(LOCAL_REQUIREMENTS),
        remote=REMOTE_REQUIREMENTS,
    )
    ensure_rsync(lium, ready_pod)
    lium.rsync(ready_pod, local=f"{LOCAL_SRC}/", remote=f"{REMOTE_SRC}/")
    lium.rsync(ready_pod, local=f"{LOCAL_DATA}/", remote=f"{REMOTE_DATA}/")

    install = lium.exec(
        ready_pod,
        command=f"cd {REMOTE_WORKSPACE} && python -m pip install -r requirements.txt",
    )
    require_success(install, "install dependencies")

    train_command = (
        f"cd {REMOTE_WORKSPACE} && "
        "PYTHONUNBUFFERED=1 python src/train.py "
        "--data-dir data "
        "--output-dir checkpoints"
    )
    for chunk in lium.stream_exec(ready_pod, command=train_command):
        print(chunk["data"], end="")

    checkpoint = lium.exec(ready_pod, command=f"test -f {REMOTE_BEST_MODEL}")
    require_success(checkpoint, "check training artifact")

    LOCAL_CHECKPOINT.parent.mkdir(parents=True, exist_ok=True)
    lium.download(
        ready_pod,
        remote=REMOTE_BEST_MODEL,
        local=str(LOCAL_CHECKPOINT),
    )
    print(f"Downloaded checkpoint to {LOCAL_CHECKPOINT}")

finally:
    pod_to_stop = ready_pod
    if pod_to_stop is None and created_pod_id:
        pod_to_stop = next((p for p in lium.ps() if p.id == created_pod_id), None)

    if pod_to_stop is not None:
        lium.down(pod_to_stop)
        print(f"Stopped pod {pod_to_stop.name} ({pod_to_stop.huid})")
```

## Adapting the Workflow

- Change `GPU_TYPE`, `GPU_COUNT`, `MAX_PRICE_PER_HOUR`, and `MIN_CUDA_VERSION` to match the hardware your job needs.
- Update `train_command` if your training script uses different flags or writes artifacts to a different path.
- Use `lium.exec()` instead of `stream_exec()` when you want a final `stdout`, `stderr`, and `exit_code` result dictionary instead of live output.
- `ensure_rsync()` mirrors the CLI behavior: it checks whether the pod has `rsync`, then installs it with `apt-get` when missing.
- For multiple output files, write a small archive on the pod with `tar` and download that single archive with `download()`.

## Minimal `train.py`

If you do not already have a training script, use this minimal PyTorch example to verify the workflow. It trains a tiny linear model, uses CUDA when available, streams progress, and writes `checkpoints/best_model.pt`. With the PyTorch template above, `requirements.txt` can be empty.

```python title="src/train.py"
#!/usr/bin/env python3
import argparse
from pathlib import Path

import torch


parser = argparse.ArgumentParser()
parser.add_argument("--data-dir", required=True)
parser.add_argument("--output-dir", required=True)
args = parser.parse_args()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
data_dir = Path(args.data_dir)
output_dir = Path(args.output_dir)

records = sorted(path.name for path in data_dir.iterdir())
print(f"input files: {records}", flush=True)
print(f"torch: {torch.__version__}", flush=True)
print(f"device: {device}", flush=True)

torch.manual_seed(7)
x = torch.linspace(-1, 1, 256, device=device).unsqueeze(1)
y = 2.0 * x + 0.3

model = torch.nn.Linear(1, 1).to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=0.2)
loss_fn = torch.nn.MSELoss()

for step in range(1, 31):
    optimizer.zero_grad()
    loss = loss_fn(model(x), y)
    loss.backward()
    optimizer.step()
    if step % 10 == 0:
        print(f"step {step:02d} loss={loss.item():.6f}", flush=True)

output_dir.mkdir(parents=True, exist_ok=True)
torch.save(
    {
        "state_dict": model.state_dict(),
        "final_loss": loss.item(),
        "device": str(device),
        "input_files": records,
    },
    output_dir / "best_model.pt",
)
print("checkpoint written", flush=True)
```
