Skip to content

Notification — Telemetry Rules Engine

dataland-notification is the museum's reactive layer. It drains the museum:telemetry Redis stream produced by Museum, evaluates a set of rules against each visitor's live biometric + location data, and turns the ones that fire into two visitor-facing channels — an in-chat message written through the Agent, and a OneSignal push to the visitor's phone. A second, off-path pipeline turns silently-detected visitor complaints into ops alerts on Discord / Slack.

Recent changes

  • DAT-287 — content/condition pushes are gated until the visitor reaches Gallery A; the welcome and checkout are exempt.
  • DAT-289 / DAT-290 — Gallery-B session-flow pushes (save-to-phone, return-to-B feedback).
  • DAT-293 — room-transition and session-flow pushes bypass the 240 s per-ticket cooldown.
  • DAT-282visit_ended / checkout push on Visitors:ActiveTicketIDs departure.
  • DAT-213 — silent visitor-complaint detection → swappable Discord/Slack ops notifier (comma-separated multi-provider fan-out), POST /v1/ops/complaint.
  • DAT-296 — mobile-init welcome push, ticket-deduped against the RDC visit_started welcome, POST /v1/ops/welcome.

Two roles, one image

The stack is two containers built from the same image (dataland/notification). Both run python main.py; the API container adds the --api flag.

Worker API
Container dataland-notification-worker dataland-notification-api
Command python main.py (default) python main.py --api
HTTP — (no HTTP) 8080
Compose profile always on api profile
Role drain museum:telemetry, evaluate rules, deliver ops surface: /events, /v1/ops/*, /state, /rules, /metrics, /health
graph LR
  RDC[("RDC redis<br/>(external)")] --> MUS[museum bridge]
  MUS -->|XADD| TS[("dataland-redis<br/>museum:telemetry")]
  TS -.XREADGROUP.-> NW[notification-worker]
  NW -->|rule fires| ENG{engine}
  ENG -->|chat| AG[agent /v1/service/chat/museum]
  ENG -->|push| OS["OneSignal"]
  AG -.persists message.-> CONV[(conversation)]
  AGT[agent silent detector] -->|POST /v1/ops/complaint| NA[notification-api]
  NA -->|OpsAlert| DS["Discord / Slack"]
  NA -.GET state / metrics.-> ADMIN[admin dashboard]

Service entrypoint flags

main.py supports --api (HTTP ops surface), --once (process one batch then exit, for smoke tests), --explore (run the Redis explorer instead of the worker), and --strict (crash on a malformed rules.toml at boot instead of silently reverting to in-code defaults). See main.py async_main.

How the worker drains the stream

The worker (app/consumer.py::RedisStreamConsumer) is a Redis Streams consumer group reader with two concurrent loops:

  1. Read loopXREADGROUP from museum:telemetry (group CONSUMER_GROUP, default notification-service; consumer CONSUMER_NAME), count=CONSUMER_COUNT (10), block=BLOCK_MS (2000 ms). Each entry is decoded (app/processor.py), passed to engine.process(...), then XACK'd.
  2. Reap loop — every PEL_REAP_INTERVAL_SECONDS (30 s) it scans the Pending Entries List with XPENDING. Entries idle longer than PEL_REAP_IDLE_MS (60 s) are XCLAIMed and re-processed; entries past PEL_REAP_MAX_DELIVERIES (5) are routed to the DLQ. This recovers work from a worker that died mid-batch.

Unique consumer names are mandatory (DAT-109)

Two workers sharing a CONSUMER_NAME collide on the same PEL slot — entries get assigned to whichever replica acks first and the other's work is silently lost. CONSUMER_NAME defaults to <hostname>-<uuid8> (app/config.py::_default_consumer_name), and the consumer refuses to start if a live consumer with its name (idle < PEL_REAP_IDLE_MS) is already registered in the group. The compose default worker-1 is fine for a single replica; bump it when you --scale.

Telemetry decoding (DAT-117)

The RDC stream carries mixed msgpack + plain-UTF-8 encoding. process_stream_fields runs every value through decode_payload (msgpack → JSON → UTF-8 → binary fallback) and then maps publisher field names onto canonical names via FIELD_MAP — e.g. heart_rate accepts heart_rate | heartbeat | hr | bpm, room_code accepts room_code | room_id | room | location | current_room | zone. Fields outside FIELD_MAP and the KNOWN_UNMAPPED_FIELDS allowlist surface a one-shot warning per field name so a real publisher schema rename doesn't silently drop biometric data, without flooding the log on every event.

Opt-out field

A telemetry event with notification_enabled = false is recorded into ticket state but never evaluated against any rule — the visitor has opted out.

The engine

app/engine.py::NotificationEngine.process is the heart of the service. For each decoded telemetry event keyed by ticket_id:

flowchart TD
  A[telemetry event] --> B{ticket_id present?}
  B -- no --> Z[skip]
  B -- yes --> C{notification_enabled == false?}
  C -- yes --> Y[apply state, skip rules]
  C -- no --> D[load ticket_state]
  D --> E{gate open?<br/>room in GA/GB/GC/GD<br/>or entered_gallery}
  E --> F[for each rule]
  F --> G{gate closed AND<br/>not pre_gallery_exempt?}
  G -- yes --> F
  G -- no --> H{required_fields present?}
  H -- no --> F
  H -- yes --> I{should_trigger?}
  I -- no --> F
  I -- yes --> J{ignore_cooldown<br/>OR cooldown acquired?}
  J -- no --> K[rate_limited, count metric]
  J -- yes --> L[resolve ticket via agent]
  L --> M[chat channel: open conversation]
  L --> N[push channel: OneSignal]
  M --> O{chat written, no visible push?}
  O -- yes --> P[silent refetch push]
  L --> Q[record attempt + after_fire]
  Q --> F
  F --> R[apply_telemetry to state]

The Dataland experience starts at Gallery A. The visitor's path through the lobby (LO), onboarding (ON), and corridors should stay quiet — no content or biometric pushes until they actually reach an exhibit gallery.

GALLERY_ROOM_CODES = frozenset({"GA", "GB", "GC", "GD"})  # (1)!
gate_open = (
    telemetry.get("room_code") in GALLERY_ROOM_CODES
    or ticket_state.get("entered_gallery") == "True"  # (2)!
)
  1. DAT-287 — the four exhibit galleries. Lobby (LO), onboarding (ON), and corridors are deliberately absent so content and biometric pushes stay quiet until the visitor reaches an actual gallery.
  2. The latch. Once the visitor has entered any gallery this flag is "True" in ticket state, so the gate stays open for the rest of the session even if they wander back into a corridor. A new session_id resets it and re-gates.

Once a visitor enters any gallery the entered_gallery flag latches in ticket state, so the gate stays open for the rest of the session even if they walk back into a corridor. A new session_id re-gates (the flag resets). Rules marked pre_gallery_exempt = True — only the welcome and the checkout — fire regardless of the gate.

The shared cooldown + bypass (DAT-225 / DAT-293)

A single per-ticket cooldown bucket (notification:cooldown:<ticket>, app/rate_limiter.py) protects the visitor from notification stacking. After any rule fires for a ticket, every other telemetry rule for that ticket is suppressed for telemetry_cooldown_seconds (default 240 s), across all channels. The acquire is an atomic SET NX EX; if the rule's delivery later fails, the engine releases the bucket so a transient error doesn't eat the visitor's quiet window.

Cooldown bypass (DAT-293)

Rules with ignore_cooldown = True skip the bucket entirely. This is reserved for events the 240 s window would wrongly suppress:

  • room_transition — a room change is meaningful and already deduped to once-per-room-per-session. A biometric alert moments earlier shouldn't swallow the "you're entering Gallery C" push.
  • session_flow — Gallery-B prompts are room-change driven and once-per-prompt.
  • visit_ended — terminal checkout must fire even if an in-visit rule set the cooldown seconds before departure.

Delivery mechanics

When a rule with channels fires, the engine resolves the visitor exactly once per event (AgentTicketResolver.resolveGET /v1/service/tickets/{ticket_id}/user on the agent), yielding user_id, external_id, and any existing conversation_id. Then:

  • chatAgentChatClient.open_conversation POSTs to /v1/service/chat/museum. For static_chat rules (the welcome) the rule text is forwarded as assistant_text so the agent persists it verbatim with no generation and no RAG; otherwise it is a visitor-voiced prompt the museum agent answers normally over SSE. The SSE consumption is capped by AGENT_SSE_READ_TIMEOUT_SECONDS (inter-byte) and a hard AGENT_SSE_WALL_CLOCK_TIMEOUT_SECONDS (30 s, DAT-112) so a stuck stream can't pin a worker.
  • pushOneSignalClient.send POSTs to OneSignal, targeting include_aliases.external_id. A fresh UUID per logical send is used as OneSignal's top-level external_id idempotency key (DAT-105), so a retry-after-5xx replays the same key rather than double-delivering.

Silent OneSignal no-ops are failures (DAT-239)

OneSignal returns HTTP 200 even when it routes a push to zero recipients ({"id": "", "errors": ["All included players are not subscribed"]}, or errors.invalid_aliases). The client classifies these as delivery failures so notification_push_failed_total ticks and the alias-contract regression that hid for weeks (DAT-238) can't recur.

Silent refetch push

The mobile has no socket — a push is its only refetch trigger. When a rule writes a chat message but sends no visible push (e.g. artwork_engagement, which is chat-only), the engine fires a silent, data-only push (content_available, silent: true) so the app refetches the conversation and the new message appears. Best-effort: the message is already committed, so a failure here only delays the refetch until the next push.

The rules

Rules live in app/rules/; default_rules() wires them in evaluation order. Each rule declares channels, required_fields (short-circuit before any Redis hop when missing, DAT-118), and the gate/cooldown flags above. Thresholds, channels, priorities, and cooldown come from rules.toml (see Configuration).

Rule Channels Fires when Gate / cooldown DAT
visit_started push + chat new session_id for the ticket pre-gallery exempt; ticket-deduped DAT-296
heart_rate_alert push + chat heart_rate > threshold_bpm (120), edge-triggered gated; cooldown DAT-229, DAT-285
skin_conductance_alert push + chat EDA exceeds the visitor's own rolling baseline by delta_threshold_us (fixed threshold_us until warmed up) gated; cooldown DAT-230
artwork_engagement chat seconds_in_chapter ≥ 90 with an art_name gated; cooldown; chat-only → silent refetch
room_transition push + chat genuinely-new room (not GB, not a corridor) gated; bypasses cooldown DAT-228, DAT-293
session_flow push Gallery-B entry / re-entry prompts gated; bypasses cooldown DAT-287/289/290
experience_tip push + chat config-driven match on chapter_id / chapter_name / art_name gated; cooldown DAT-259
visit_ended push event_type == "visit_ended" pre-gallery exempt; bypasses cooldown DAT-282/287

visit_started — the welcome (DAT-296)

Fires on the first telemetry event carrying a session_id the ticket hasn't seen before. The chat message is a fixed greeting persisted verbatim (static_chat = True) — routing a canned greeting through the museum agent would make the first chat write, and the welcome push that follows it, wait on ~40 s of RAG. The welcome can also be triggered the instant the mobile sends an empty first /museum message (agent → POST /v1/ops/welcome); both paths share a ticket-keyed dedup claim (WelcomeDedupStore, atomic SET NX) so the welcome fires exactly once per visit. Welcome push: "Welcome to Dataland — Your AI guide is ready for this visit."

heart_rate_alert / skin_conductance_alert — biometric edge triggers

Both are edge-triggered: they fire on the rising edge into alert and stay quiet while the condition persists, by reading a state flag (heart_rate_alert_active / skin_conductance_alert_active) that app/state.py::apply_telemetry flips off the same threshold. The chat message is visitor-voiced and biometric-context-rich ("My heart rate just spiked to N BPM while I was viewing X … what is happening with this artwork?"), nudging the agent toward a grounded answer.

EDA baseline calibration (DAT-230)

Skin-conductance uses the visitor's own rolling baseline: the mean of samples in the trailing baseline_window_seconds (300 s) window. It fires when the current reading exceeds that baseline by delta_threshold_us. Until min_samples (60) readings accumulate, it falls back to the absolute threshold_us (1.5 µS) so cold-start visitors are still covered. Vitals are held to physiological sanity bounds upstream (DAT-285).

room_transition vs session_flow (DAT-228 / DAT-287)

room_transition owns "you entered a new room" for every gallery except GB, deduped once-per-room-per-session via RoomsSeenStore (notification:rooms_seen:<ticket>:<session>). It skips corridors (is_in_corridor) and skips GB entirely, because all Gallery-B pushes belong to session_flow, which must fire on re-entry (something the per-room dedup would suppress). session_flow dedups per prompt key instead:

  • First GB entry → "These interactive data can be saved to your phone directly as well!"
  • Returning to GB after Gallery C → "What do you think about the experience in Gallery C?"
  • Returning to GB after Gallery D → "What do you think about the experience in Gallery D?"

The agent speaks real room/section names rather than bare codes — Data Pavilion (GA), Latent Gallery (GB), Infinity Room (GC), The Sanctuary (GD), Discovery Portal (ON) — see Agent (DAT-281).

experience_tip — config-driven micro-prompts (DAT-259)

Experience-specific nudges ("Did you try the chocolate?", "Close your eyes and breathe in") matched off the live stream's chapter_id / chapter_name / art_name. Tips are pure config ([[experience_tip.tips]] in rules.toml) so the museum team owns the copy and trigger targets without a code change — edit the file then POST /rules/reload. Each tip fires once per (ticket, session) via ExperienceTipsSeenStore, with an optional min_seconds_in_chapter dwell gate.

visit_ended — the checkout push (DAT-282 / DAT-287)

Fires on the synthetic event_type == "visit_ended" event the museum bridge's 1 Hz active-tickets loop emits when a ticket leaves Visitors:ActiveTicketIDs. Sends a single "Thanks for visiting Dataland — How was the experience?" push. Terminal, once-per-visit, pre-gallery exempt, and cooldown-bypassing so it always lands.

Ops alerting: complaints + the swappable notifier (DAT-213)

The notification API also receives visitor-complaint events from the agent's silent, server-side LLM judge (off the visitor's response path, invisible to the visitor — see Agent). POST /v1/ops/complaint flows through ComplaintHandler:

  1. Severity threshold — events below COMPLAINT_SEVERITY_THRESHOLD (low) are audited and dropped.
  2. Per-session dedup (ComplaintDedupStore) — one ticket per session unless severity escalates to a strictly higher tier (low < medium < high). Dedup is recorded only on a real send, so a transient notifier failure can re-fire.
  3. PII scrub — emails and phone numbers in the reason / transcript / self-chosen session/ticket ids are scrubbed defensively (the agent strips upstream too). The authenticated complainant's name/email are shown verbatim when present — they are the legitimate contact the ops team needs, and anonymous visitors are first-class (no identity fields required).
  4. Audit — every flag (fired, deduped, below-threshold, failed) is appended to the museum:complaints:audit Redis stream with the verdict and a snippet, for the weekly false-positive / false-negative review.

Provider-agnostic notifier

OpsAlert is a channel-agnostic description; concrete OpsNotifier implementations render it per provider. Switching providers is a config change, never a code change. OPS_NOTIFIER_PROVIDER is a comma-separated list:

Value Behaviour
none (default) / empty alerting disabled (no-op)
discord Discord incoming-webhook embeds
slack Slack incoming-webhook Block Kit
discord,slack fan out to both via MultiNotifier
graph TD
  CE["museum.visitor.complaint"] --> CH[ComplaintHandler]
  CH -->|threshold + dedup + scrub| BUILD[OpsAlert]
  BUILD --> N{OPS_NOTIFIER_PROVIDER}
  N -->|discord| D[DiscordWebhookNotifier]
  N -->|slack| S[SlackWebhookNotifier]
  N -->|discord,slack| M[MultiNotifier]
  M --> D
  M --> S
  CH --> AUD[(museum:complaints:audit)]

Two logical channels resolve to per-provider webhook URLs: ops carries visitor complaints (DAT-213); tech is reserved for sensor malfunctions (DAT-214). Discord falls back to the legacy generic OPS_ALERT_WEBHOOK_URL / TECH_ALERT_WEBHOOK_URL when no Discord-specific URL is set. MultiNotifier runs each sub-notifier independently — one provider being down or unconfigured never blocks the others — and returns success if at least one delivered.

Defence-in-depth rendering

Both notifiers escape visitor-derived text so a hostile display name or message can't inject masked links (Discord markdown) or broadcast pings (<!channel>, @everyone). Discord embeds set allowed_mentions.parse=[]; transcripts are truncated to embed/Block-Kit limits and capped at OPS_ALERT_TRANSCRIPT_MAX_TURNS (10).

HTTP API (the --api container)

All endpoints except /health and /metrics require a bearer token equal to NOTIFICATION_OPS_TOKEN (DAT-103). The dependency distinguishes failure modes: 503 when the server token is unset (broken deploy), 401 + WWW-Authenticate: Bearer when the client's header is missing or wrong. Comparison is constant-time (secrets.compare_digest).

Method + path Auth Purpose
GET /health none Redis ping liveness
GET /metrics none Prometheus text exposition
GET /rules bearer list rules with channels + priority + the cooldown
POST /rules/reload bearer re-read NOTIFICATION_RULES_CONFIG, swap rules in place (strict; returns source + sha256)
POST /rules/validate bearer dry-run a candidate rules file ({"path": "..."}) without swapping
POST /events bearer inject a telemetry event directly into the engine (testing / replay)
POST /v1/ops/complaint bearer receive a museum.visitor.complaint → ops ticket
POST /v1/ops/welcome bearer send the mobile-init welcome push for a ticket (deduped)
GET /state/{ticket_id} bearer dump a ticket's current state hash
# Liveness + a metrics snapshot
curl -fsS http://localhost:8080/health      # (1)!
curl -fsS http://localhost:8080/metrics

# Hot-reload rules.toml after editing (no restart)
curl -fsS -X POST http://localhost:8080/rules/reload \
  -H "Authorization: Bearer $NOTIFICATION_OPS_TOKEN"   # (2)!

# Inspect one visitor's state
curl -fsS http://localhost:8080/state/TICKET123 \
  -H "Authorization: Bearer $NOTIFICATION_OPS_TOKEN" | jq   # (3)!
  1. /health and /metrics are the only unauthenticated endpoints. -fsS makes curl fail loudly on a non-2xx (-f) while staying quiet on success (-sS shows errors but no progress bar), so this is safe in a health probe.
  2. Hot-reloads NOTIFICATION_RULES_CONFIG and swaps the rules in place with no restart. The reload is strict and returns the source + sha256, so a malformed edit is rejected without dropping the running ruleset.
  3. Every authenticated endpoint requires this bearer header equal to NOTIFICATION_OPS_TOKEN (DAT-103). A missing/wrong token returns 401; an unset server token returns 503 (broken deploy), not 401.

DLQ + replay

An entry that raises during engine.process is moved to the DLQ stream museum:telemetry:dlq (DLQ_STREAM_KEY, capped at DLQ_MAX_LEN = 10 000) and then XACK'd off the input. The DLQ payload records original_stream, original_entry_id, error_type, error_message, error_traceback, failed_at_ms, and fields_json — all redacted (DAT-115) so bearer/basic tokens or credential-bearing URLs never persist where an operator can read them. The PEL reaper sends entries past PEL_REAP_MAX_DELIVERIES to the same DLQ.

Replay re-publishes fields_json back onto the original stream so the engine gets another shot. It defaults to a dry run — pass --commit to actually write:

# Inside the worker/api container. Dry run first.
python -m app.dlq_replay --since -1h            # (1)!

# Replay everything DLQed in the last hour back to the input stream.
python -m app.dlq_replay --since -1h --batch 100 --commit   # (2)!

# Replay one specific entry.
python -m app.dlq_replay --entry 1715635200000-0 --commit   # (3)!
  1. Replay defaults to a dry run — it reports what would be re-published without writing anything. Always run this form first to confirm the scope.
  2. --commit is what actually re-publishes fields_json back onto the original stream so the engine gets another shot. --batch caps how many entries are replayed per pass; pace it during a maintenance window.
  3. Targets a single DLQ entry by its original stream id — use this to surgically re-drive one poison message after you've fixed the cause.

Replay during a window

Best practice: replay during a maintenance window and watch notification_dlq_depth drop while notification_processed_total climbs in lockstep.

Metrics + observability

/metrics renders Prometheus text from Redis-backed counters (notification:counters:*, best-effort — a Redis hiccup never crashes the engine). Notable series:

Metric Meaning
notification_processed_total entries the consumer ack'd
notification_dlq_total / notification_dlq_depth entries routed to DLQ / live XLEN of the DLQ
notification_push_sent_total / notification_push_failed_total OneSignal 2xx (real delivery) / 4xx-after-retry, missing recipient, or silent no-op
notification_chat_opened_total / notification_chat_failed_total agent chat writes
notification_rule_triggered_total{rule} / notification_rule_rate_limited_total{rule} per-rule fire / cooldown-suppression counts
notification_telemetry_consumer_lag{group} per-group XINFO GROUPS lag — the correct backpressure signal (DAT-82)
notification_e2e_latency_seconds histogram, event timestamp → push ack (DAT-257)

Stream depth vs lag

notification_telemetry_stream_depth (XLEN) pegs near the MAXLEN cap in steady state and is diagnostic-only; alert on notification_telemetry_consumer_lag instead.

Every notification attempt is also appended to the notification:attempts Redis stream (DAT-57) with ticket, rule, channel, status, redacted payload, and timing — query with XRANGE / XREAD. Structured JSON logs + optional Logfire spans cover the rest.

Configuration (rules.toml)

Thresholds, channels, priorities, and the shared cooldown live in rules.toml, the image-baked default at /app/config/rules.toml. Override on a host by pointing NOTIFICATION_RULES_CONFIG at a bind-mounted file (VDS convention: /etc/dataland/notification-rules.toml). A missing or malformed file logs an error and the worker falls back to in-code defaults — unless booted with --strict, which crashes instead so a typo is caught at deploy.

[global]
telemetry_cooldown_seconds = 240   # (1)!

[heart_rate_alert]
threshold_bpm = 120                # (2)!
require_art_name = false
channels = ["push", "chat"]
priority = "normal"

[skin_conductance_alert]
threshold_us = 1.5                 # (3)!
baseline_window_seconds = 300      # (4)!
min_samples = 60
delta_threshold_us = 1.5
channels = ["push", "chat"]

[room_transition]
channels = ["push", "chat"]        # (5)!

[artwork_engagement]
seconds_in_chapter_threshold = 90
channels = ["chat"]

[experience_tip]                   # (6)!
channels = ["push", "chat"]
# [[experience_tip.tips]]
# match_chapter_id = "chocolate-room"
# title = "Before you move on"
# body  = "Did you try the chocolate at the tasting station?"
# chat  = "I'm at the chocolate experience. What should I try here?"
# min_seconds_in_chapter = 10
  1. DAT-225 — a single shared per-ticket cooldown bucket. Every content push draws from this 240s window, so a visitor never receives two back-to-back pushes. The welcome push is exempt, and room-transition pushes intentionally bypass it (DAT-293).
  2. DAT-229 — raised from 100. No alert fires below this BPM.
  3. Absolute fallback threshold, used only until the per-visitor rolling baseline has warmed up.
  4. DAT-230 — rolling baseline window. Once min_samples readings exist, the rule fires on delta_threshold_us above the visitor's own baseline rather than on the absolute threshold_us.
  5. push is the fallback when the app is backgrounded; chat is delivered in-conversation when it is foregrounded.
  6. DAT-259 — the museum team owns the tip copy. Uncomment a [[experience_tip.tips]] table to add one.

Key environment variables

# Redis (telemetry stream + all state)
REDIS_HOST=dataland-redis
REDIS_PORT=6379
REDIS_PASSWORD=***
REDIS_STREAM_KEY=museum:telemetry
CONSUMER_GROUP=notification-service
CONSUMER_NAME=worker-1            # (1)!
CONSUMER_COUNT=10                 # (2)!
BLOCK_MS=2000
DLQ_STREAM_KEY=museum:telemetry:dlq
DLQ_MAX_LEN=10000                 # (3)!

# Agent (chat writes + ticket resolution)
AGENT_BASE_URL=http://dataland-agent:4141
AGENT_SERVICE_TOKEN=***           # (4)!
AGENT_SSE_WALL_CLOCK_TIMEOUT_SECONDS=30   # (5)!

# Push delivery
ONESIGNAL_APP_ID=***
ONESIGNAL_API_KEY=***
ONESIGNAL_API_URL=https://api.onesignal.com/notifications

# Ops API
NOTIFICATION_OPS_TOKEN=***        # (6)!
NOTIFICATION_RULES_CONFIG=/etc/dataland/notification-rules.toml   # (7)!

# Ops alerting (DAT-213)
OPS_NOTIFIER_PROVIDER=none        # (8)!
OPS_ALERT_DISCORD_WEBHOOK_URL=*** # (9)!
OPS_ALERT_SLACK_WEBHOOK_URL=***
COMPLAINT_SEVERITY_THRESHOLD=low  # (10)!

# Lifecycle / state TTLs
STATE_TTL_SECONDS=86400
ROOMS_SEEN_TTL_SECONDS=21600
WELCOME_DEDUP_TTL_SECONDS=21600
COMPLAINT_DEDUP_TTL_SECONDS=21600
APP_ENV=production                # (11)!
  1. DAT-109 — MUST be unique per replica. Two workers sharing this name collide on the same PEL slot and one's work is silently lost. Defaults to <hostname>-<uuid8>; bump the compose worker-1 when you --scale.
  2. XREADGROUP batch size — entries read per loop. Paired with BLOCK_MS (2000 ms) as the blocking timeout when the stream is idle.
  3. Cap (MAXLEN) on the DLQ stream. Older DLQ entries are trimmed past 10 000, so drain or replay before the backlog rolls off.
  4. Placeholder/empty values (change-me-shared-service-token) are rejected at boot when APP_ENV=production (DAT-111/DAT-291) — otherwise every agent call would 401 invisibly.
  5. DAT-112 — hard wall-clock cap on SSE consumption so a stuck agent stream can't pin a worker. There is also a separate inter-byte read timeout.
  6. DAT-103 — bearer for /events, /v1/ops/*, /rules*, /state. Unset on the server yields 503; wrong from the client yields 401. Compared constant-time via secrets.compare_digest.
  7. Points at the bind-mounted rules override (VDS convention shown). Missing or malformed falls back to in-code defaults unless the worker booted --strict.
  8. Comma-separated provider list: discord | slack | discord,slack | none. Default none disables ops alerting (no-op); discord,slack fans out to both via MultiNotifier. Switching providers is config-only, never code.
  9. Per-provider webhook. Discord falls back to the legacy generic OPS_ALERT_WEBHOOK_URL when no Discord-specific URL is set.
  10. DAT-213 — complaints below this severity (low < medium < high) are audited and dropped rather than paged.
  11. Gates the production boot guard (DAT-111/DAT-291). In non-production the worker boots even with OneSignal/agent secrets unset, so local dev still works.

Production boot guard (DAT-111 / DAT-291)

When APP_ENV=production, the worker refuses to start if ONESIGNAL_APP_ID, ONESIGNAL_API_KEY, or AGENT_SERVICE_TOKEN are missing or still at a placeholder value (dataland, change-me-shared-service-token) — otherwise every push would silently skip() and every agent call would 401, indefinitely and invisibly. In non-production the worker boots regardless, so local dev with OneSignal intentionally unset still works. See app/runtime.py and Deploy.

Running locally + with the simulator

# Worker logs in real time
docker logs -f dataland-notification-worker

# Process a single batch then exit (smoke test)
python main.py --once   # (1)!

# Inspect the input stream directly
docker exec dataland-redis sh -c '
  redis-cli -a "$REDIS_PASSWORD" --no-auth-warning XINFO STREAM museum:telemetry'   # (2)!
  1. --once drains exactly one XREADGROUP batch then exits — the smoke-test path. Other entrypoint flags: --api (HTTP ops surface), --explore (Redis explorer), --strict (crash on a malformed rules.toml instead of reverting to defaults).
  2. --no-auth-warning suppresses the stderr notice from passing the password on the command line via -a. XINFO STREAM shows length, last id, and group state for the telemetry stream — handy for confirming the bridge is writing.

For end-to-end testing without an Empatica wearable, drive the worker from the Museum simulator. Per the playback-isolation rule, never replay museum-simulation Avro into the live dataland-redis — point the worker at a dedicated sidecar redis instead. See Deploy and Architecture.