Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ pip install -e .
```

## Usage
Refer to [examples](examples/).
Refer to [examples](examples/):
- `examples/afti16_example.py` (linear AFTI-16 aircraft model)
- `examples/unicycle_nmpc_example.py` (nonlinear unicycle with time-varying linearization)

Create a model, configure with `Modelsetup()`, and solve with `Model.solve()`.

Expand Down Expand Up @@ -65,6 +67,8 @@ where:

![](assets/example_afti16.png)

![](assets/example_unicycle.png)

## Testing
```bash
pytest tests/test_pimpc.py
Expand Down
Binary file added assets/example_unicycle.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
158 changes: 158 additions & 0 deletions examples/benchmark_cpu_gpu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import argparse
import time

import numpy as np

from pimpc import Model


def build_afti16(ts=0.05):
As = np.array(
[
[-0.0151, -60.5651, 0.0, -32.174],
[-0.0001, -1.3411, 0.9929, 0.0],
[0.00018, 43.2541, -0.86939, 0.0],
[0.0, 0.0, 1.0, 0.0],
]
)
Bs = np.array(
[
[-2.516, -13.136],
[-0.1689, -0.2514],
[-17.251, -1.5766],
[0.0, 0.0],
]
)

nx, nu = 4, 2
try:
from scipy.linalg import expm
except ImportError as exc:
raise RuntimeError("SciPy is required for the AFTI-16 benchmark.") from exc

M = expm(np.block([[As, Bs], [np.zeros((nu, nx)), np.zeros((nu, nu))]]) * ts)
A = M[:nx, :nx]
B = M[:nx, nx:]
C = np.array(
[
[0.0, 1.0, 0.0, 0.0],
[0.0, 0.0, 0.0, 1.0],
]
)
return A, B, C


def build_random_system(nx=8, nu=3, ny=4, seed=0):
rng = np.random.default_rng(seed)
A = rng.standard_normal((nx, nx)) * 0.1
radius = np.max(np.abs(np.linalg.eigvals(A)))
if radius > 0:
A = A * min(0.95 / radius, 1.0)
B = rng.standard_normal((nx, nu)) * 0.1
C = rng.standard_normal((ny, nx))
return A, B, C


def setup_model(A, B, C, device, Np):
nx, nu = B.shape
ny = C.shape[0]
model = Model()
model.setup(
A=A,
B=B,
C=C,
Np=Np,
Wy=10.0 * np.eye(ny),
Wu=0.1 * np.eye(nu),
Wdu=0.1 * np.eye(nu),
rho=1.0,
tol=1e-6,
maxiter=200,
precond=True,
accel=True,
device=device,
)
return model


def run_benchmark(model, A, B, repeats, warmup):
nx, nu = B.shape
ny = model.ny
x0 = np.zeros(nx)
u0 = np.zeros(nu)
yref = np.zeros(ny)
uref = np.zeros(nu)
w = np.zeros(nx)

for _ in range(warmup):
results = model.solve(x0, u0, yref, uref, w, verbose=False)
u0 = results.u[:, 0]
x0 = A @ x0 + B @ u0

times_ms = []
for _ in range(repeats):
t0 = time.perf_counter()
results = model.solve(x0, u0, yref, uref, w, verbose=False)
t1 = time.perf_counter()
times_ms.append((t1 - t0) * 1000.0)
u0 = results.u[:, 0]
x0 = A @ x0 + B @ u0

return np.array(times_ms, dtype=float)


def summarize(times_ms):
return {
"mean": float(np.mean(times_ms)),
"median": float(np.median(times_ms)),
"std": float(np.std(times_ms)),
"min": float(np.min(times_ms)),
"max": float(np.max(times_ms)),
}


def main():
parser = argparse.ArgumentParser(description="Benchmark PiMPC CPU vs GPU solver.")
parser.add_argument("--system", choices=["afti16", "random"], default="afti16")
parser.add_argument("--repeats", type=int, default=50)
parser.add_argument("--warmup", type=int, default=5)
parser.add_argument("--Np", type=int, default=5)
args = parser.parse_args()

if args.system == "afti16":
A, B, C = build_afti16()
else:
A, B, C = build_random_system()

cpu_model = setup_model(A, B, C, device="cpu", Np=args.Np)
cpu_times = run_benchmark(cpu_model, A, B, repeats=args.repeats, warmup=args.warmup)
cpu_stats = summarize(cpu_times)

print("CPU benchmark")
print(f" mean: {cpu_stats['mean']:.3f} ms")
print(f" median: {cpu_stats['median']:.3f} ms")
print(f" std: {cpu_stats['std']:.3f} ms")
print(f" min: {cpu_stats['min']:.3f} ms")
print(f" max: {cpu_stats['max']:.3f} ms")

gpu_model = setup_model(A, B, C, device="gpu", Np=args.Np)
if gpu_model.device != "gpu":
print("GPU benchmark skipped (GPU not available or CuPy missing).")
return

gpu_times = run_benchmark(gpu_model, A, B, repeats=args.repeats, warmup=args.warmup)
gpu_stats = summarize(gpu_times)

print("GPU benchmark")
print(f" mean: {gpu_stats['mean']:.3f} ms")
print(f" median: {gpu_stats['median']:.3f} ms")
print(f" std: {gpu_stats['std']:.3f} ms")
print(f" min: {gpu_stats['min']:.3f} ms")
print(f" max: {gpu_stats['max']:.3f} ms")

speedup = cpu_stats["mean"] / gpu_stats["mean"]
print(f"Speedup (mean CPU / mean GPU): {speedup:.2f}x")


if __name__ == "__main__":
main()
142 changes: 142 additions & 0 deletions examples/unicycle_nmpc_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import os
import sys

import numpy as np

ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if ROOT not in sys.path:
sys.path.insert(0, ROOT)

from pimpc import Model


def unicycle_dynamics(x, u):
px, py, theta = x
v, omega = u
return np.array([v * np.cos(theta), v * np.sin(theta), omega])


def linearize_discrete_unicycle(x, u, dt):
theta = x[2]
v = u[0]

A = np.eye(3)
A[0, 2] = -dt * v * np.sin(theta)
A[1, 2] = dt * v * np.cos(theta)

B = np.zeros((3, 2))
B[0, 0] = dt * np.cos(theta)
B[1, 0] = dt * np.sin(theta)
B[2, 1] = dt

x_next = x + dt * unicycle_dynamics(x, u)
c = x_next - A @ x - B @ u
return A, B, c


def build_ltv_matrices(x0, u_nom, dt):
nx = x0.shape[0]
nu, Np = u_nom.shape
x_nom = np.zeros((nx, Np + 1))
x_nom[:, 0] = x0

for k in range(Np):
x_nom[:, k + 1] = x_nom[:, k] + dt * unicycle_dynamics(x_nom[:, k], u_nom[:, k])

A_seq = np.zeros((Np, nx, nx))
B_seq = np.zeros((Np, nx, nu))
c_seq = np.zeros((nx, Np))
for k in range(Np):
A_k, B_k, c_k = linearize_discrete_unicycle(x_nom[:, k], u_nom[:, k], dt)
A_seq[k] = A_k
B_seq[k] = B_k
c_seq[:, k] = c_k

return A_seq, B_seq, c_seq


def main():
dt = 0.1
Np = 25
nsim = 60

nx, nu = 3, 2
x0 = np.array([0.0, 0.0, 0.0])
u_prev = np.zeros(nu)
u_nom = np.zeros((nu, Np))

yref = np.array([2.0, 2.0, 0.0])
uref = np.zeros(nu)

model = Model()

x_hist = np.zeros((nx, nsim + 1))
u_hist = np.zeros((nu, nsim))
x_hist[:, 0] = x0
x_current = x0.copy()

print("=" * 50)
print(" PiMPC - Unicycle NMPC (LTV Linearization)")
print("=" * 50)

for k in range(nsim):
A_seq, B_seq, c_seq = build_ltv_matrices(x_current, u_nom, dt)

model.setup(
A=A_seq,
B=B_seq,
C=np.eye(nx),
Np=Np,
Wy=np.diag([20.0, 20.0, 0.0]),
Wu=0.1 * np.eye(nu),
Wdu=0.1 * np.eye(nu),
umin=np.array([0.0, -1.0]),
umax=np.array([1.5, 1.0]),
rho=1.0,
tol=1e-5,
maxiter=200,
precond=False,
accel=True,
device="cpu",
)

results = model.solve(x_current, u_prev, yref, uref, c_seq, verbose=False)
u_apply = results.u[:, 0]

x_next = x_current + dt * unicycle_dynamics(x_current, u_apply)
x_hist[:, k + 1] = x_next
u_hist[:, k] = u_apply

x_current = x_next
u_prev = u_apply

u_nom = np.hstack([results.u[:, 1:], results.u[:, -1:]])

try:
import matplotlib.pyplot as plt
except ImportError as exc:
raise RuntimeError("matplotlib is required to plot results.") from exc

t = np.arange(nsim + 1) * dt
fig, axes = plt.subplots(3, 1, figsize=(8, 8), sharex=False)

axes[0].plot(x_hist[0, :], x_hist[1, :], color="blue", label="Trajectory")
axes[0].scatter([yref[0]], [yref[1]], color="red", label="Goal")
axes[0].set_ylabel("y (m)")
axes[0].set_xlabel("x (m)")
axes[0].legend(loc="best")
axes[0].set_title("Unicycle Path")

axes[1].plot(t[:-1], u_hist[0, :], color="green")
axes[1].set_ylabel("v (m/s)")

axes[2].plot(t[:-1], u_hist[1, :], color="purple")
axes[2].set_ylabel("omega (rad/s)")
axes[2].set_xlabel("Time (s)")

plt.tight_layout()
plt.show()


if __name__ == "__main__":
main()
28 changes: 22 additions & 6 deletions pimpc/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self, dtype=np.float64):

# State
self.is_setup = False
self.is_time_varying = False
self.warm_vars = None

def setup(
Expand Down Expand Up @@ -72,11 +73,25 @@ def setup(
):
A = np.asarray(A, dtype=self.dtype)
B = np.asarray(B, dtype=self.dtype)
if A.ndim != 2 or B.ndim != 2:
raise ValueError("A and B must be 2D arrays.")
nx, nu = B.shape
if A.shape != (nx, nx):
raise ValueError("A must be square and compatible with B.")
if A.ndim not in (2, 3) or B.ndim not in (2, 3):
raise ValueError("A and B must be 2D or 3D arrays.")
if A.ndim != B.ndim:
raise ValueError("A and B must have the same number of dimensions.")
if A.ndim == 2:
nx, nu = B.shape
if A.shape != (nx, nx):
raise ValueError("A must be square and compatible with B.")
else:
nsteps, nx, nx2 = A.shape
if nx != nx2:
raise ValueError("A must be square for each time step.")
if B.shape[0] != nsteps or B.shape[1] != nx:
raise ValueError("B must have shape (Np, nx, nu) to match A.")
nu = B.shape[2]

Np = int(Np)
if A.ndim == 3 and A.shape[0] != Np:
raise ValueError("A and B must have Np time steps.")

if C is None:
C = np.eye(nx, dtype=self.dtype)
Expand Down Expand Up @@ -133,7 +148,8 @@ def _vec_or_fill(val, size, fill, name):
self.nx = nx
self.nu = nu
self.ny = ny
self.Np = int(Np)
self.Np = Np
self.is_time_varying = A.ndim == 3

self.Wy = Wy
self.Wu = Wu
Expand Down
Loading