{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Online Algorithms Quickstart\n", "\n", "This notebook walks through streaming changepoint detection with BOCPD, CUSUM, and Page-Hinkley on a synthetic service-latency stream." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import cpd\n", "\n", "try:\n", " import matplotlib.pyplot as plt\n", " HAS_MATPLOTLIB = True\n", "except ImportError:\n", " HAS_MATPLOTLIB = False\n", " print(\"matplotlib is optional. Install with: python -m pip install matplotlib\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Deterministic service latency stream: baseline, shift, drift, and transient spikes\n", "rng = np.random.default_rng(2027)\n", "n1, n2, n3 = 220, 160, 140\n", "seg1 = 95.0 + rng.normal(0.0, 1.8, size=n1)\n", "seg2 = 112.0 + rng.normal(0.0, 2.0, size=n2)\n", "seg3_base = np.linspace(112.0, 128.0, n3)\n", "seg3 = seg3_base + rng.normal(0.0, 1.8, size=n3)\n", "latency = np.concatenate([seg1, seg2, seg3]).astype(np.float64)\n", "for idx in (75, 260, 310, 430):\n", " latency[idx] += 14.0\n", "true_change_points = [n1, n1 + n2]\n", "latency.shape, true_change_points\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def summarize_steps(name, steps, change_index):\n", " first_alert = next((i for i, step in enumerate(steps) if step.alert), None)\n", " pre_alerts = sum(1 for i, step in enumerate(steps) if i < change_index and step.alert)\n", " post_alerts = sum(1 for i, step in enumerate(steps) if i >= change_index and step.alert)\n", " mean_p_change = float(np.mean([step.p_change for step in steps]))\n", " return {\n", " \"detector\": name,\n", " \"first_alert\": first_alert,\n", " \"pre_alerts\": pre_alerts,\n", " \"post_alerts\": post_alerts,\n", " \"mean_p_change\": round(mean_p_change, 4),\n", " }\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# BOCPD\n", "bocpd = cpd.Bocpd(\n", " model=\"gaussian_nig\",\n", " hazard=1.0 / 200.0,\n", " max_run_length=512,\n", " alert_policy={\"threshold\": 0.35, \"cooldown\": 6, \"min_run_length\": 10},\n", ")\n", "bocpd_steps = bocpd.update_many(latency)\n", "bocpd_summary = summarize_steps(\"bocpd\", bocpd_steps, true_change_points[0])\n", "bocpd_summary\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# CUSUM\n", "cusum = cpd.Cusum(\n", " drift=0.05,\n", " threshold=8.0,\n", " target_mean=float(np.mean(latency[:n1])),\n", " alert_policy={\"threshold\": 0.95, \"cooldown\": 6, \"min_run_length\": 10},\n", ")\n", "cusum_steps = cusum.update_many(latency)\n", "cusum_summary = summarize_steps(\"cusum\", cusum_steps, true_change_points[0])\n", "cusum_summary\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Page-Hinkley\n", "page_hinkley = cpd.PageHinkley(\n", " delta=0.02,\n", " threshold=8.0,\n", " initial_mean=float(np.mean(latency[:n1])),\n", " alert_policy={\"threshold\": 0.95, \"cooldown\": 6, \"min_run_length\": 10},\n", ")\n", "page_hinkley_steps = page_hinkley.update_many(latency)\n", "page_hinkley_summary = summarize_steps(\"page_hinkley\", page_hinkley_steps, true_change_points[0])\n", "page_hinkley_summary\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Detector-by-detector summary\n", "summaries = [bocpd_summary, cusum_summary, page_hinkley_summary]\n", "for row in summaries:\n", " print(\n", " f\"{row['detector']:<13} first_alert={row['first_alert']} \"\n", " f\"pre_alerts={row['pre_alerts']} post_alerts={row['post_alerts']} \"\n", " f\"mean_p_change={row['mean_p_change']}\"\n", " )\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Plot stream and alert markers\n", "if HAS_MATPLOTLIB:\n", " fig, ax = plt.subplots(figsize=(12, 4))\n", " ax.plot(latency, color=\"black\", linewidth=1.0, alpha=0.85, label=\"latency\")\n", " for cp in true_change_points:\n", " ax.axvline(cp, color=\"tab:green\", linestyle=\"--\", linewidth=1.5, alpha=0.8)\n", " detector_steps = {\n", " \"bocpd\": bocpd_steps,\n", " \"cusum\": cusum_steps,\n", " \"page_hinkley\": page_hinkley_steps,\n", " }\n", " marker_colors = {\"bocpd\": \"tab:blue\", \"cusum\": \"tab:orange\", \"page_hinkley\": \"tab:red\"}\n", " for name, steps in detector_steps.items():\n", " alert_idx = [i for i, step in enumerate(steps) if step.alert]\n", " ax.scatter(alert_idx, latency[alert_idx], s=28, alpha=0.85, color=marker_colors[name], label=name)\n", " ax.set_title(\"Online detector alerts on service-latency stream\")\n", " ax.set_xlabel(\"time step\")\n", " ax.set_ylabel(\"latency (ms)\")\n", " ax.legend(loc=\"upper left\")\n", " plt.show()\n", "else:\n", " print(\"Skipping plot (matplotlib not installed).\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Optional reliability check: checkpoint save/load for BOCPD\n", "split_idx = 300\n", "checkpoint_detector = cpd.Bocpd(\n", " model=\"gaussian_nig\",\n", " hazard=1.0 / 200.0,\n", " max_run_length=512,\n", " alert_policy={\"threshold\": 0.35, \"cooldown\": 6, \"min_run_length\": 10},\n", ")\n", "_ = checkpoint_detector.update_many(latency[:split_idx])\n", "state = checkpoint_detector.save_state(format=\"json\")\n", "\n", "restored = cpd.Bocpd(\n", " model=\"gaussian_nig\",\n", " hazard=1.0 / 200.0,\n", " max_run_length=512,\n", " alert_policy={\"threshold\": 0.35, \"cooldown\": 6, \"min_run_length\": 10},\n", ")\n", "restored.load_state(state, format=\"json\")\n", "restored_tail = restored.update_many(latency[split_idx:])\n", "len(restored_tail), restored_tail[0].t if restored_tail else None\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "name": "python", "version": "3.9" } }, "nbformat": 4, "nbformat_minor": 5 }