Online Algorithms Quickstart#
This notebook walks through streaming changepoint detection with BOCPD, CUSUM, and Page-Hinkley on a synthetic service-latency stream.
import numpy as np
import cpd
try:
import matplotlib.pyplot as plt
HAS_MATPLOTLIB = True
except ImportError:
HAS_MATPLOTLIB = False
print("matplotlib is optional. Install with: python -m pip install matplotlib")
# Deterministic service latency stream: baseline, shift, drift, and transient spikes
rng = np.random.default_rng(2027)
n1, n2, n3 = 220, 160, 140
seg1 = 95.0 + rng.normal(0.0, 1.8, size=n1)
seg2 = 112.0 + rng.normal(0.0, 2.0, size=n2)
seg3_base = np.linspace(112.0, 128.0, n3)
seg3 = seg3_base + rng.normal(0.0, 1.8, size=n3)
latency = np.concatenate([seg1, seg2, seg3]).astype(np.float64)
for idx in (75, 260, 310, 430):
latency[idx] += 14.0
true_change_points = [n1, n1 + n2]
latency.shape, true_change_points
def summarize_steps(name, steps, change_index):
first_alert = next((i for i, step in enumerate(steps) if step.alert), None)
pre_alerts = sum(1 for i, step in enumerate(steps) if i < change_index and step.alert)
post_alerts = sum(1 for i, step in enumerate(steps) if i >= change_index and step.alert)
mean_p_change = float(np.mean([step.p_change for step in steps]))
return {
"detector": name,
"first_alert": first_alert,
"pre_alerts": pre_alerts,
"post_alerts": post_alerts,
"mean_p_change": round(mean_p_change, 4),
}
# BOCPD
bocpd = cpd.Bocpd(
model="gaussian_nig",
hazard=1.0 / 200.0,
max_run_length=512,
alert_policy={"threshold": 0.35, "cooldown": 6, "min_run_length": 10},
)
bocpd_steps = bocpd.update_many(latency)
bocpd_summary = summarize_steps("bocpd", bocpd_steps, true_change_points[0])
bocpd_summary
# CUSUM
cusum = cpd.Cusum(
drift=0.05,
threshold=8.0,
target_mean=float(np.mean(latency[:n1])),
alert_policy={"threshold": 0.95, "cooldown": 6, "min_run_length": 10},
)
cusum_steps = cusum.update_many(latency)
cusum_summary = summarize_steps("cusum", cusum_steps, true_change_points[0])
cusum_summary
# Page-Hinkley
page_hinkley = cpd.PageHinkley(
delta=0.02,
threshold=8.0,
initial_mean=float(np.mean(latency[:n1])),
alert_policy={"threshold": 0.95, "cooldown": 6, "min_run_length": 10},
)
page_hinkley_steps = page_hinkley.update_many(latency)
page_hinkley_summary = summarize_steps("page_hinkley", page_hinkley_steps, true_change_points[0])
page_hinkley_summary
# Detector-by-detector summary
summaries = [bocpd_summary, cusum_summary, page_hinkley_summary]
for row in summaries:
print(
f"{row['detector']:<13} first_alert={row['first_alert']} "
f"pre_alerts={row['pre_alerts']} post_alerts={row['post_alerts']} "
f"mean_p_change={row['mean_p_change']}"
)
# Plot stream and alert markers
if HAS_MATPLOTLIB:
fig, ax = plt.subplots(figsize=(12, 4))
ax.plot(latency, color="black", linewidth=1.0, alpha=0.85, label="latency")
for cp in true_change_points:
ax.axvline(cp, color="tab:green", linestyle="--", linewidth=1.5, alpha=0.8)
detector_steps = {
"bocpd": bocpd_steps,
"cusum": cusum_steps,
"page_hinkley": page_hinkley_steps,
}
marker_colors = {"bocpd": "tab:blue", "cusum": "tab:orange", "page_hinkley": "tab:red"}
for name, steps in detector_steps.items():
alert_idx = [i for i, step in enumerate(steps) if step.alert]
ax.scatter(alert_idx, latency[alert_idx], s=28, alpha=0.85, color=marker_colors[name], label=name)
ax.set_title("Online detector alerts on service-latency stream")
ax.set_xlabel("time step")
ax.set_ylabel("latency (ms)")
ax.legend(loc="upper left")
plt.show()
else:
print("Skipping plot (matplotlib not installed).")
# Optional reliability check: checkpoint save/load for BOCPD
split_idx = 300
checkpoint_detector = cpd.Bocpd(
model="gaussian_nig",
hazard=1.0 / 200.0,
max_run_length=512,
alert_policy={"threshold": 0.35, "cooldown": 6, "min_run_length": 10},
)
_ = checkpoint_detector.update_many(latency[:split_idx])
state = checkpoint_detector.save_state(format="json")
restored = cpd.Bocpd(
model="gaussian_nig",
hazard=1.0 / 200.0,
max_run_length=512,
alert_policy={"threshold": 0.35, "cooldown": 6, "min_run_length": 10},
)
restored.load_state(state, format="json")
restored_tail = restored.update_many(latency[split_idx:])
len(restored_tail), restored_tail[0].t if restored_tail else None