Notification — Telemetry Rules Engine¶
dataland-notification is the museum's reactive layer. It drains the
museum:telemetry Redis stream produced by Museum, evaluates a set
of rules against each visitor's live biometric + location data, and turns the
ones that fire into two visitor-facing channels — an in-chat message written
through the Agent, and a OneSignal push to the visitor's phone. A
second, off-path pipeline turns silently-detected visitor complaints into ops
alerts on Discord / Slack.
Recent changes
- DAT-287 — content/condition pushes are gated until the visitor reaches Gallery A; the welcome and checkout are exempt.
- DAT-289 / DAT-290 — Gallery-B session-flow pushes (save-to-phone, return-to-B feedback).
- DAT-293 — room-transition and session-flow pushes bypass the 240 s per-ticket cooldown.
- DAT-282 —
visit_ended/ checkout push onVisitors:ActiveTicketIDsdeparture. - DAT-213 — silent visitor-complaint detection → swappable Discord/Slack ops notifier (comma-separated multi-provider fan-out),
POST /v1/ops/complaint. - DAT-296 — mobile-init welcome push, ticket-deduped against the RDC
visit_startedwelcome,POST /v1/ops/welcome.
Two roles, one image¶
The stack is two containers built from the same image (dataland/notification).
Both run python main.py; the API container adds the --api flag.
| Worker | API | |
|---|---|---|
| Container | dataland-notification-worker |
dataland-notification-api |
| Command | python main.py (default) |
python main.py --api |
| HTTP | — (no HTTP) | 8080 |
| Compose profile | always on | api profile |
| Role | drain museum:telemetry, evaluate rules, deliver |
ops surface: /events, /v1/ops/*, /state, /rules, /metrics, /health |
graph LR
RDC[("RDC redis<br/>(external)")] --> MUS[museum bridge]
MUS -->|XADD| TS[("dataland-redis<br/>museum:telemetry")]
TS -.XREADGROUP.-> NW[notification-worker]
NW -->|rule fires| ENG{engine}
ENG -->|chat| AG[agent /v1/service/chat/museum]
ENG -->|push| OS["OneSignal"]
AG -.persists message.-> CONV[(conversation)]
AGT[agent silent detector] -->|POST /v1/ops/complaint| NA[notification-api]
NA -->|OpsAlert| DS["Discord / Slack"]
NA -.GET state / metrics.-> ADMIN[admin dashboard]
Service entrypoint flags
main.py supports --api (HTTP ops surface), --once (process one batch
then exit, for smoke tests), --explore (run the Redis explorer instead of
the worker), and --strict (crash on a malformed rules.toml at boot
instead of silently reverting to in-code defaults). See
main.py async_main.
How the worker drains the stream¶
The worker (app/consumer.py::RedisStreamConsumer) is a Redis Streams consumer
group reader with two concurrent loops:
- Read loop —
XREADGROUPfrommuseum:telemetry(groupCONSUMER_GROUP, defaultnotification-service; consumerCONSUMER_NAME),count=CONSUMER_COUNT(10),block=BLOCK_MS(2000 ms). Each entry is decoded (app/processor.py), passed toengine.process(...), thenXACK'd. - Reap loop — every
PEL_REAP_INTERVAL_SECONDS(30 s) it scans the Pending Entries List withXPENDING. Entries idle longer thanPEL_REAP_IDLE_MS(60 s) areXCLAIMed and re-processed; entries pastPEL_REAP_MAX_DELIVERIES(5) are routed to the DLQ. This recovers work from a worker that died mid-batch.
Unique consumer names are mandatory (DAT-109)
Two workers sharing a CONSUMER_NAME collide on the same PEL slot — entries
get assigned to whichever replica acks first and the other's work is
silently lost. CONSUMER_NAME defaults to <hostname>-<uuid8>
(app/config.py::_default_consumer_name), and the consumer refuses to
start if a live consumer with its name (idle < PEL_REAP_IDLE_MS) is
already registered in the group. The compose default worker-1 is fine for
a single replica; bump it when you --scale.
Telemetry decoding (DAT-117)¶
The RDC stream carries mixed msgpack + plain-UTF-8 encoding. process_stream_fields
runs every value through decode_payload (msgpack → JSON → UTF-8 → binary
fallback) and then maps publisher field names onto canonical names via
FIELD_MAP — e.g. heart_rate accepts heart_rate | heartbeat | hr | bpm,
room_code accepts room_code | room_id | room | location | current_room | zone.
Fields outside FIELD_MAP and the KNOWN_UNMAPPED_FIELDS allowlist surface a
one-shot warning per field name so a real publisher schema rename doesn't
silently drop biometric data, without flooding the log on every event.
Opt-out field
A telemetry event with notification_enabled = false is recorded into ticket
state but never evaluated against any rule — the visitor has opted out.
The engine¶
app/engine.py::NotificationEngine.process is the heart of the service. For
each decoded telemetry event keyed by ticket_id:
flowchart TD
A[telemetry event] --> B{ticket_id present?}
B -- no --> Z[skip]
B -- yes --> C{notification_enabled == false?}
C -- yes --> Y[apply state, skip rules]
C -- no --> D[load ticket_state]
D --> E{gate open?<br/>room in GA/GB/GC/GD<br/>or entered_gallery}
E --> F[for each rule]
F --> G{gate closed AND<br/>not pre_gallery_exempt?}
G -- yes --> F
G -- no --> H{required_fields present?}
H -- no --> F
H -- yes --> I{should_trigger?}
I -- no --> F
I -- yes --> J{ignore_cooldown<br/>OR cooldown acquired?}
J -- no --> K[rate_limited, count metric]
J -- yes --> L[resolve ticket via agent]
L --> M[chat channel: open conversation]
L --> N[push channel: OneSignal]
M --> O{chat written, no visible push?}
O -- yes --> P[silent refetch push]
L --> Q[record attempt + after_fire]
Q --> F
F --> R[apply_telemetry to state]
The pre-gallery gate (DAT-287)¶
The Dataland experience starts at Gallery A. The visitor's path through the
lobby (LO), onboarding (ON), and corridors should stay quiet — no
content or biometric pushes until they actually reach an exhibit gallery.
GALLERY_ROOM_CODES = frozenset({"GA", "GB", "GC", "GD"}) # (1)!
gate_open = (
telemetry.get("room_code") in GALLERY_ROOM_CODES
or ticket_state.get("entered_gallery") == "True" # (2)!
)
- DAT-287 — the four exhibit galleries. Lobby (
LO), onboarding (ON), and corridors are deliberately absent so content and biometric pushes stay quiet until the visitor reaches an actual gallery. - The latch. Once the visitor has entered any gallery this flag is
"True"in ticket state, so the gate stays open for the rest of the session even if they wander back into a corridor. A newsession_idresets it and re-gates.
Once a visitor enters any gallery the entered_gallery flag latches in ticket
state, so the gate stays open for the rest of the session even if they walk
back into a corridor. A new session_id re-gates (the flag resets). Rules marked
pre_gallery_exempt = True — only the welcome and the checkout — fire
regardless of the gate.
The shared cooldown + bypass (DAT-225 / DAT-293)¶
A single per-ticket cooldown bucket (notification:cooldown:<ticket>,
app/rate_limiter.py) protects the visitor from notification stacking. After
any rule fires for a ticket, every other telemetry rule for that ticket is
suppressed for telemetry_cooldown_seconds (default 240 s), across all
channels. The acquire is an atomic SET NX EX; if the rule's delivery later
fails, the engine releases the bucket so a transient error doesn't eat the
visitor's quiet window.
Cooldown bypass (DAT-293)
Rules with ignore_cooldown = True skip the bucket entirely. This is reserved
for events the 240 s window would wrongly suppress:
room_transition— a room change is meaningful and already deduped to once-per-room-per-session. A biometric alert moments earlier shouldn't swallow the "you're entering Gallery C" push.session_flow— Gallery-B prompts are room-change driven and once-per-prompt.visit_ended— terminal checkout must fire even if an in-visit rule set the cooldown seconds before departure.
Delivery mechanics¶
When a rule with channels fires, the engine resolves the visitor exactly once
per event (AgentTicketResolver.resolve → GET /v1/service/tickets/{ticket_id}/user
on the agent), yielding user_id, external_id, and any existing
conversation_id. Then:
- chat →
AgentChatClient.open_conversationPOSTs to/v1/service/chat/museum. Forstatic_chatrules (the welcome) the rule text is forwarded asassistant_textso the agent persists it verbatim with no generation and no RAG; otherwise it is a visitor-voiced prompt the museum agent answers normally over SSE. The SSE consumption is capped byAGENT_SSE_READ_TIMEOUT_SECONDS(inter-byte) and a hardAGENT_SSE_WALL_CLOCK_TIMEOUT_SECONDS(30 s, DAT-112) so a stuck stream can't pin a worker. - push →
OneSignalClient.sendPOSTs to OneSignal, targetinginclude_aliases.external_id. A fresh UUID per logical send is used as OneSignal's top-levelexternal_ididempotency key (DAT-105), so a retry-after-5xx replays the same key rather than double-delivering.
Silent OneSignal no-ops are failures (DAT-239)
OneSignal returns HTTP 200 even when it routes a push to zero
recipients ({"id": "", "errors": ["All included players are not
subscribed"]}, or errors.invalid_aliases). The client classifies these as
delivery failures so notification_push_failed_total ticks and the
alias-contract regression that hid for weeks (DAT-238) can't recur.
Silent refetch push¶
The mobile has no socket — a push is its only refetch trigger. When a rule writes
a chat message but sends no visible push (e.g. artwork_engagement, which is
chat-only), the engine fires a silent, data-only push (content_available,
silent: true) so the app refetches the conversation and the new message
appears. Best-effort: the message is already committed, so a failure here only
delays the refetch until the next push.
The rules¶
Rules live in app/rules/; default_rules() wires them in evaluation order.
Each rule declares channels, required_fields (short-circuit before any Redis
hop when missing, DAT-118), and the gate/cooldown flags above. Thresholds,
channels, priorities, and cooldown come from rules.toml (see
Configuration).
| Rule | Channels | Fires when | Gate / cooldown | DAT |
|---|---|---|---|---|
visit_started |
push + chat | new session_id for the ticket |
pre-gallery exempt; ticket-deduped | DAT-296 |
heart_rate_alert |
push + chat | heart_rate > threshold_bpm (120), edge-triggered |
gated; cooldown | DAT-229, DAT-285 |
skin_conductance_alert |
push + chat | EDA exceeds the visitor's own rolling baseline by delta_threshold_us (fixed threshold_us until warmed up) |
gated; cooldown | DAT-230 |
artwork_engagement |
chat | seconds_in_chapter ≥ 90 with an art_name |
gated; cooldown; chat-only → silent refetch | — |
room_transition |
push + chat | genuinely-new room (not GB, not a corridor) |
gated; bypasses cooldown | DAT-228, DAT-293 |
session_flow |
push | Gallery-B entry / re-entry prompts | gated; bypasses cooldown | DAT-287/289/290 |
experience_tip |
push + chat | config-driven match on chapter_id / chapter_name / art_name |
gated; cooldown | DAT-259 |
visit_ended |
push | event_type == "visit_ended" |
pre-gallery exempt; bypasses cooldown | DAT-282/287 |
visit_started — the welcome (DAT-296)¶
Fires on the first telemetry event carrying a session_id the ticket hasn't seen
before. The chat message is a fixed greeting persisted verbatim
(static_chat = True) — routing a canned greeting through the museum agent would
make the first chat write, and the welcome push that follows it, wait on ~40 s of
RAG. The welcome can also be triggered the instant the mobile sends an empty first
/museum message (agent → POST /v1/ops/welcome); both paths share a
ticket-keyed dedup claim (WelcomeDedupStore, atomic SET NX) so the welcome
fires exactly once per visit. Welcome push: "Welcome to Dataland — Your AI guide
is ready for this visit."
heart_rate_alert / skin_conductance_alert — biometric edge triggers¶
Both are edge-triggered: they fire on the rising edge into alert and stay
quiet while the condition persists, by reading a state flag
(heart_rate_alert_active / skin_conductance_alert_active) that
app/state.py::apply_telemetry flips off the same threshold. The chat message is
visitor-voiced and biometric-context-rich ("My heart rate just spiked to N BPM
while I was viewing X … what is happening with this artwork?"), nudging the agent
toward a grounded answer.
EDA baseline calibration (DAT-230)
Skin-conductance uses the visitor's own rolling baseline: the mean of
samples in the trailing baseline_window_seconds (300 s) window. It fires
when the current reading exceeds that baseline by delta_threshold_us. Until
min_samples (60) readings accumulate, it falls back to the absolute
threshold_us (1.5 µS) so cold-start visitors are still covered. Vitals are
held to physiological sanity bounds upstream (DAT-285).
room_transition vs session_flow (DAT-228 / DAT-287)¶
room_transition owns "you entered a new room" for every gallery except
GB, deduped once-per-room-per-session via RoomsSeenStore
(notification:rooms_seen:<ticket>:<session>). It skips corridors
(is_in_corridor) and skips GB entirely, because all Gallery-B pushes belong
to session_flow, which must fire on re-entry (something the per-room dedup
would suppress). session_flow dedups per prompt key instead:
- First GB entry → "These interactive data can be saved to your phone directly as well!"
- Returning to GB after Gallery C → "What do you think about the experience in Gallery C?"
- Returning to GB after Gallery D → "What do you think about the experience in Gallery D?"
The agent speaks real room/section names rather than bare codes — Data Pavilion (GA), Latent Gallery (GB), Infinity Room (GC), The Sanctuary (GD), Discovery Portal (ON) — see Agent (DAT-281).
experience_tip — config-driven micro-prompts (DAT-259)¶
Experience-specific nudges ("Did you try the chocolate?", "Close your eyes and
breathe in") matched off the live stream's chapter_id / chapter_name /
art_name. Tips are pure config ([[experience_tip.tips]] in rules.toml) so
the museum team owns the copy and trigger targets without a code change — edit
the file then POST /rules/reload. Each tip fires once per (ticket, session)
via ExperienceTipsSeenStore, with an optional min_seconds_in_chapter dwell
gate.
visit_ended — the checkout push (DAT-282 / DAT-287)¶
Fires on the synthetic event_type == "visit_ended" event the museum bridge's
1 Hz active-tickets loop emits when a ticket leaves Visitors:ActiveTicketIDs.
Sends a single "Thanks for visiting Dataland — How was the experience?" push.
Terminal, once-per-visit, pre-gallery exempt, and cooldown-bypassing so it always
lands.
Ops alerting: complaints + the swappable notifier (DAT-213)¶
The notification API also receives visitor-complaint events from the agent's
silent, server-side LLM judge (off the visitor's response path, invisible to the
visitor — see Agent). POST /v1/ops/complaint flows through
ComplaintHandler:
- Severity threshold — events below
COMPLAINT_SEVERITY_THRESHOLD(low) are audited and dropped. - Per-session dedup (
ComplaintDedupStore) — one ticket per session unless severity escalates to a strictly higher tier (low<medium<high). Dedup is recorded only on a real send, so a transient notifier failure can re-fire. - PII scrub — emails and phone numbers in the reason / transcript / self-chosen session/ticket ids are scrubbed defensively (the agent strips upstream too). The authenticated complainant's name/email are shown verbatim when present — they are the legitimate contact the ops team needs, and anonymous visitors are first-class (no identity fields required).
- Audit — every flag (fired, deduped, below-threshold, failed) is appended
to the
museum:complaints:auditRedis stream with the verdict and a snippet, for the weekly false-positive / false-negative review.
Provider-agnostic notifier¶
OpsAlert is a channel-agnostic description; concrete OpsNotifier
implementations render it per provider. Switching providers is a config change,
never a code change. OPS_NOTIFIER_PROVIDER is a comma-separated list:
| Value | Behaviour |
|---|---|
none (default) / empty |
alerting disabled (no-op) |
discord |
Discord incoming-webhook embeds |
slack |
Slack incoming-webhook Block Kit |
discord,slack |
fan out to both via MultiNotifier |
graph TD
CE["museum.visitor.complaint"] --> CH[ComplaintHandler]
CH -->|threshold + dedup + scrub| BUILD[OpsAlert]
BUILD --> N{OPS_NOTIFIER_PROVIDER}
N -->|discord| D[DiscordWebhookNotifier]
N -->|slack| S[SlackWebhookNotifier]
N -->|discord,slack| M[MultiNotifier]
M --> D
M --> S
CH --> AUD[(museum:complaints:audit)]
Two logical channels resolve to per-provider webhook URLs: ops carries
visitor complaints (DAT-213); tech is reserved for sensor malfunctions
(DAT-214). Discord falls back to the legacy generic OPS_ALERT_WEBHOOK_URL /
TECH_ALERT_WEBHOOK_URL when no Discord-specific URL is set. MultiNotifier
runs each sub-notifier independently — one provider being down or unconfigured
never blocks the others — and returns success if at least one delivered.
Defence-in-depth rendering
Both notifiers escape visitor-derived text so a hostile display name or
message can't inject masked links (Discord markdown) or broadcast pings
(<!channel>, @everyone). Discord embeds set allowed_mentions.parse=[];
transcripts are truncated to embed/Block-Kit limits and capped at
OPS_ALERT_TRANSCRIPT_MAX_TURNS (10).
HTTP API (the --api container)¶
All endpoints except /health and /metrics require a bearer token equal to
NOTIFICATION_OPS_TOKEN (DAT-103). The dependency distinguishes failure modes:
503 when the server token is unset (broken deploy), 401 + WWW-Authenticate:
Bearer when the client's header is missing or wrong. Comparison is constant-time
(secrets.compare_digest).
| Method + path | Auth | Purpose |
|---|---|---|
GET /health |
none | Redis ping liveness |
GET /metrics |
none | Prometheus text exposition |
GET /rules |
bearer | list rules with channels + priority + the cooldown |
POST /rules/reload |
bearer | re-read NOTIFICATION_RULES_CONFIG, swap rules in place (strict; returns source + sha256) |
POST /rules/validate |
bearer | dry-run a candidate rules file ({"path": "..."}) without swapping |
POST /events |
bearer | inject a telemetry event directly into the engine (testing / replay) |
POST /v1/ops/complaint |
bearer | receive a museum.visitor.complaint → ops ticket |
POST /v1/ops/welcome |
bearer | send the mobile-init welcome push for a ticket (deduped) |
GET /state/{ticket_id} |
bearer | dump a ticket's current state hash |
# Liveness + a metrics snapshot
curl -fsS http://localhost:8080/health # (1)!
curl -fsS http://localhost:8080/metrics
# Hot-reload rules.toml after editing (no restart)
curl -fsS -X POST http://localhost:8080/rules/reload \
-H "Authorization: Bearer $NOTIFICATION_OPS_TOKEN" # (2)!
# Inspect one visitor's state
curl -fsS http://localhost:8080/state/TICKET123 \
-H "Authorization: Bearer $NOTIFICATION_OPS_TOKEN" | jq # (3)!
/healthand/metricsare the only unauthenticated endpoints.-fsSmakes curl fail loudly on a non-2xx (-f) while staying quiet on success (-sSshows errors but no progress bar), so this is safe in a health probe.- Hot-reloads
NOTIFICATION_RULES_CONFIGand swaps the rules in place with no restart. The reload is strict and returns the source + sha256, so a malformed edit is rejected without dropping the running ruleset. - Every authenticated endpoint requires this bearer header equal to
NOTIFICATION_OPS_TOKEN(DAT-103). A missing/wrong token returns 401; an unset server token returns 503 (broken deploy), not 401.
DLQ + replay¶
An entry that raises during engine.process is moved to the DLQ stream
museum:telemetry:dlq (DLQ_STREAM_KEY, capped at DLQ_MAX_LEN = 10 000) and
then XACK'd off the input. The DLQ payload records original_stream,
original_entry_id, error_type, error_message, error_traceback,
failed_at_ms, and fields_json — all redacted (DAT-115) so bearer/basic
tokens or credential-bearing URLs never persist where an operator can read them.
The PEL reaper sends entries past PEL_REAP_MAX_DELIVERIES to the same DLQ.
Replay re-publishes fields_json back onto the original stream so the engine gets
another shot. It defaults to a dry run — pass --commit to actually write:
# Inside the worker/api container. Dry run first.
python -m app.dlq_replay --since -1h # (1)!
# Replay everything DLQed in the last hour back to the input stream.
python -m app.dlq_replay --since -1h --batch 100 --commit # (2)!
# Replay one specific entry.
python -m app.dlq_replay --entry 1715635200000-0 --commit # (3)!
- Replay defaults to a dry run — it reports what would be re-published without writing anything. Always run this form first to confirm the scope.
--commitis what actually re-publishesfields_jsonback onto the original stream so the engine gets another shot.--batchcaps how many entries are replayed per pass; pace it during a maintenance window.- Targets a single DLQ entry by its original stream id — use this to surgically re-drive one poison message after you've fixed the cause.
Replay during a window
Best practice: replay during a maintenance window and watch
notification_dlq_depth drop while notification_processed_total climbs in
lockstep.
Metrics + observability¶
/metrics renders Prometheus text from Redis-backed counters
(notification:counters:*, best-effort — a Redis hiccup never crashes the
engine). Notable series:
| Metric | Meaning |
|---|---|
notification_processed_total |
entries the consumer ack'd |
notification_dlq_total / notification_dlq_depth |
entries routed to DLQ / live XLEN of the DLQ |
notification_push_sent_total / notification_push_failed_total |
OneSignal 2xx (real delivery) / 4xx-after-retry, missing recipient, or silent no-op |
notification_chat_opened_total / notification_chat_failed_total |
agent chat writes |
notification_rule_triggered_total{rule} / notification_rule_rate_limited_total{rule} |
per-rule fire / cooldown-suppression counts |
notification_telemetry_consumer_lag{group} |
per-group XINFO GROUPS lag — the correct backpressure signal (DAT-82) |
notification_e2e_latency_seconds |
histogram, event timestamp → push ack (DAT-257) |
Stream depth vs lag
notification_telemetry_stream_depth (XLEN) pegs near the MAXLEN cap in
steady state and is diagnostic-only; alert on
notification_telemetry_consumer_lag instead.
Every notification attempt is also appended to the notification:attempts Redis
stream (DAT-57) with ticket, rule, channel, status, redacted payload, and timing —
query with XRANGE / XREAD. Structured JSON logs + optional Logfire spans
cover the rest.
Configuration (rules.toml)¶
Thresholds, channels, priorities, and the shared cooldown live in
rules.toml, the image-baked default at /app/config/rules.toml. Override on a
host by pointing NOTIFICATION_RULES_CONFIG at a bind-mounted file (VDS
convention: /etc/dataland/notification-rules.toml). A missing or malformed file
logs an error and the worker falls back to in-code defaults — unless booted with
--strict, which crashes instead so a typo is caught at deploy.
[global]
telemetry_cooldown_seconds = 240 # (1)!
[heart_rate_alert]
threshold_bpm = 120 # (2)!
require_art_name = false
channels = ["push", "chat"]
priority = "normal"
[skin_conductance_alert]
threshold_us = 1.5 # (3)!
baseline_window_seconds = 300 # (4)!
min_samples = 60
delta_threshold_us = 1.5
channels = ["push", "chat"]
[room_transition]
channels = ["push", "chat"] # (5)!
[artwork_engagement]
seconds_in_chapter_threshold = 90
channels = ["chat"]
[experience_tip] # (6)!
channels = ["push", "chat"]
# [[experience_tip.tips]]
# match_chapter_id = "chocolate-room"
# title = "Before you move on"
# body = "Did you try the chocolate at the tasting station?"
# chat = "I'm at the chocolate experience. What should I try here?"
# min_seconds_in_chapter = 10
- DAT-225 — a single shared per-ticket cooldown bucket. Every content push draws from this 240s window, so a visitor never receives two back-to-back pushes. The welcome push is exempt, and room-transition pushes intentionally bypass it (DAT-293).
- DAT-229 — raised from 100. No alert fires below this BPM.
- Absolute fallback threshold, used only until the per-visitor rolling baseline has warmed up.
- DAT-230 — rolling baseline window. Once
min_samplesreadings exist, the rule fires ondelta_threshold_usabove the visitor's own baseline rather than on the absolutethreshold_us. pushis the fallback when the app is backgrounded;chatis delivered in-conversation when it is foregrounded.- DAT-259 — the museum team owns the tip copy. Uncomment a
[[experience_tip.tips]]table to add one.
Key environment variables¶
# Redis (telemetry stream + all state)
REDIS_HOST=dataland-redis
REDIS_PORT=6379
REDIS_PASSWORD=***
REDIS_STREAM_KEY=museum:telemetry
CONSUMER_GROUP=notification-service
CONSUMER_NAME=worker-1 # (1)!
CONSUMER_COUNT=10 # (2)!
BLOCK_MS=2000
DLQ_STREAM_KEY=museum:telemetry:dlq
DLQ_MAX_LEN=10000 # (3)!
# Agent (chat writes + ticket resolution)
AGENT_BASE_URL=http://dataland-agent:4141
AGENT_SERVICE_TOKEN=*** # (4)!
AGENT_SSE_WALL_CLOCK_TIMEOUT_SECONDS=30 # (5)!
# Push delivery
ONESIGNAL_APP_ID=***
ONESIGNAL_API_KEY=***
ONESIGNAL_API_URL=https://api.onesignal.com/notifications
# Ops API
NOTIFICATION_OPS_TOKEN=*** # (6)!
NOTIFICATION_RULES_CONFIG=/etc/dataland/notification-rules.toml # (7)!
# Ops alerting (DAT-213)
OPS_NOTIFIER_PROVIDER=none # (8)!
OPS_ALERT_DISCORD_WEBHOOK_URL=*** # (9)!
OPS_ALERT_SLACK_WEBHOOK_URL=***
COMPLAINT_SEVERITY_THRESHOLD=low # (10)!
# Lifecycle / state TTLs
STATE_TTL_SECONDS=86400
ROOMS_SEEN_TTL_SECONDS=21600
WELCOME_DEDUP_TTL_SECONDS=21600
COMPLAINT_DEDUP_TTL_SECONDS=21600
APP_ENV=production # (11)!
- DAT-109 — MUST be unique per replica. Two workers sharing this name
collide on the same PEL slot and one's work is silently lost. Defaults to
<hostname>-<uuid8>; bump the composeworker-1when you--scale. XREADGROUPbatch size — entries read per loop. Paired withBLOCK_MS(2000 ms) as the blocking timeout when the stream is idle.- Cap (
MAXLEN) on the DLQ stream. Older DLQ entries are trimmed past 10 000, so drain or replay before the backlog rolls off. - Placeholder/empty values (
change-me-shared-service-token) are rejected at boot whenAPP_ENV=production(DAT-111/DAT-291) — otherwise every agent call would 401 invisibly. - DAT-112 — hard wall-clock cap on SSE consumption so a stuck agent stream can't pin a worker. There is also a separate inter-byte read timeout.
- DAT-103 — bearer for
/events,/v1/ops/*,/rules*,/state. Unset on the server yields 503; wrong from the client yields 401. Compared constant-time viasecrets.compare_digest. - Points at the bind-mounted rules override (VDS convention shown). Missing or
malformed falls back to in-code defaults unless the worker booted
--strict. - Comma-separated provider list:
discord | slack | discord,slack | none. Defaultnonedisables ops alerting (no-op);discord,slackfans out to both viaMultiNotifier. Switching providers is config-only, never code. - Per-provider webhook. Discord falls back to the legacy generic
OPS_ALERT_WEBHOOK_URLwhen no Discord-specific URL is set. - DAT-213 — complaints below this severity (
low<medium<high) are audited and dropped rather than paged. - Gates the production boot guard (DAT-111/DAT-291). In non-production the worker boots even with OneSignal/agent secrets unset, so local dev still works.
Production boot guard (DAT-111 / DAT-291)
When APP_ENV=production, the worker refuses to start if ONESIGNAL_APP_ID,
ONESIGNAL_API_KEY, or AGENT_SERVICE_TOKEN are missing or still at a
placeholder value (dataland, change-me-shared-service-token) — otherwise
every push would silently skip() and every agent call would 401, indefinitely
and invisibly. In non-production the worker boots regardless, so local dev with
OneSignal intentionally unset still works. See app/runtime.py and
Deploy.
Running locally + with the simulator¶
# Worker logs in real time
docker logs -f dataland-notification-worker
# Process a single batch then exit (smoke test)
python main.py --once # (1)!
# Inspect the input stream directly
docker exec dataland-redis sh -c '
redis-cli -a "$REDIS_PASSWORD" --no-auth-warning XINFO STREAM museum:telemetry' # (2)!
--oncedrains exactly oneXREADGROUPbatch then exits — the smoke-test path. Other entrypoint flags:--api(HTTP ops surface),--explore(Redis explorer),--strict(crash on a malformedrules.tomlinstead of reverting to defaults).--no-auth-warningsuppresses the stderr notice from passing the password on the command line via-a.XINFO STREAMshows length, last id, and group state for the telemetry stream — handy for confirming the bridge is writing.
For end-to-end testing without an Empatica wearable, drive the worker from the
Museum simulator. Per the playback-isolation rule, never replay
museum-simulation Avro into the live dataland-redis — point the worker at a
dedicated sidecar redis instead. See Deploy and
Architecture.