Online Algorithms#
Online (streaming) algorithms process one observation at a time and emit a per-step result indicating the probability of a change point. changepoint-doctor provides three online detectors.
BOCPD (Bayesian Online Changepoint Detection)#
BOCPD [9] maintains a posterior distribution over run lengths (time since the last change point). At each step it computes the probability of a change given the observation and the predictive distribution of the current regime.
When to use: You need probabilistic change likelihood in streaming data; conjugate Bayesian inference gives calibrated uncertainty estimates.
import cpd
bocpd = cpd.Bocpd(
model="gaussian_nig",
hazard=1.0 / 200.0,
max_run_length=512,
alert_policy={"threshold": 0.35, "cooldown": 5, "min_run_length": 10},
)
for x_t in stream:
step = bocpd.update(x_t)
if step.alert:
print(f"Change at t={step.t}, p={step.p_change:.3f}")
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Observation model (conjugate prior family) |
|
|
|
Prior change rate (1/expected_run_length), or a dict for custom hazard |
|
|
|
Truncation limit for the run-length distribution |
|
|
|
Alert thresholds and cooldown configuration |
|
|
|
How to handle late-arriving data |
Hazard configuration#
The hazard parameter controls the prior belief about how often changes occur:
Float: Constant hazard rate, e.g.
hazard=1/200means “expect a change every ~200 steps”Dict: Custom hazard configuration for non-constant hazard functions
Alert policy#
The alert policy determines when step.alert fires:
alert_policy = {
"threshold": 0.35, # p_change must exceed this
"cooldown": 5, # minimum steps between alerts
"min_run_length": 10, # suppress alerts during initial transient
}
CUSUM (Cumulative Sum)#
CUSUM [10] tracks cumulative deviations from a target mean. When the cumulative sum exceeds a threshold, an alert is triggered.
When to use: Lightweight mean-shift monitoring with well-understood statistical properties.
import cpd
cusum = cpd.Cusum(
drift=0.0,
threshold=8.0,
target_mean=0.0,
alert_policy={"threshold": 0.5, "cooldown": 5},
)
steps = cusum.update_many(data)
alerts = [s for s in steps if s.alert]
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Allowance parameter (slack before accumulation) |
|
|
|
Decision threshold for the cumulative sum |
|
|
|
Expected mean under no-change hypothesis |
|
|
|
Alert thresholds and cooldown |
|
|
|
Late-data handling policy |
Page-Hinkley#
The Page-Hinkley test is a sequential analysis technique for drift/change monitoring, closely related to CUSUM but with a different accumulation formula.
When to use: Low-overhead drift monitoring; slightly different sensitivity profile than CUSUM.
import cpd
ph = cpd.PageHinkley(
delta=0.01,
threshold=8.0,
initial_mean=0.0,
alert_policy={"threshold": 0.5, "cooldown": 5},
)
for x_t in stream:
step = ph.update(x_t)
if step.alert:
print(f"Drift detected at t={step.t}")
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Magnitude tolerance parameter |
|
|
|
Decision threshold |
|
|
|
Initial mean estimate |
|
|
|
Alert thresholds and cooldown |
|
|
|
Late-data handling policy |
Common patterns#
update() vs update_many() performance#
update_many() processes a batch of observations and uses a size-aware GIL strategy:
Workloads with < 16 scalar work items keep the GIL (lower overhead for tiny batches)
Workloads with >= 16 scalar work items release the GIL for throughput
Batch size |
|
|
Speedup |
|---|---|---|---|
1 |
0.004 |
0.010 |
0.36x |
16 |
0.036 |
0.031 |
1.15x |
64 |
0.131 |
0.089 |
1.47x |
4096 |
7.822 |
4.462 |
1.75x |
Guidance: Use update() for single-observation real-time processing. Use update_many() for batch replay or when processing buffers of observations.
Checkpoint and restore#
All online detectors support stateful checkpoint/restore for fault tolerance:
# Save state
state_bytes = bocpd.save_state(format="bytes")
# ... later, after restart ...
new_bocpd = cpd.Bocpd(model="gaussian_nig", hazard=1.0 / 200.0)
new_bocpd.load_state(state_bytes, format="bytes")
Supported formats:
"bytes": compact binary format (default)"dict": Python dict for inspectionFile path: save/load directly to disk via the
pathparameter
Late data policy#
Configure how detectors handle observations arriving after a gap:
bocpd = cpd.Bocpd(
model="gaussian_nig",
late_data_policy="ignore", # or a dict with custom config
)
Reset#
All online detectors support reset() to clear internal state and start fresh:
bocpd.reset() # Clears run-length distribution, ready for new stream