Skip to content

feat: Support Spark Streaming trigger modes (Trigger.ProcessingTime) for continuous per-model processing#28

Merged
mdrakiburrahman merged 2 commits into
mainfrom
dev/mdrrahman/aggregation-queries
Apr 12, 2026
Merged

feat: Support Spark Streaming trigger modes (Trigger.ProcessingTime) for continuous per-model processing#28
mdrakiburrahman merged 2 commits into
mainfrom
dev/mdrrahman/aggregation-queries

Conversation

@mdrakiburrahman
Copy link
Copy Markdown
Contributor

Summary

Adds processing_time trigger mode that continuously loops models: discover files → process batches → sleep → repeat. Fast models are no longer blocked by slow ones during long dbt run sessions.

Closes #27

Changes

File What
trigger_config.py New TriggerConfig frozen dataclass + parse_trigger_config() with interval parser (10s, 1 minute, etc.)
constants.py DEFAULT_PROCESSING_TIME_TIMEOUT_SECONDS (30 days)
impl.py wait_for_next_cycle() (interruptible sleep via threading.Event), parse_trigger(), reset_cycle_count(), get_processing_time_timeout(), SIGTERM/SIGINT signal handlers
incremental.sql Trigger config reading, dynamic loop cap (999M for processing_time), inter-cycle sleep + re-discover, auto 30-day timeout, cache clearing between cycles
README.md New "Streaming / Continuous Processing" section
test_trigger_config.py 27 tests — parsing, validation, interval formats, error cases
test_trigger_lifecycle.py 11 tests — cycle counting, max_cycles, shutdown signals, interruptible sleep

Usage

-- Continuous processing: loop forever, sleeping 30s between cycles
{{ config(
    materialized='incremental',
    incremental_strategy='append',
    trigger={'type': 'processing_time', 'interval': '30 seconds'},
    source_roots=['/shares/...'],
    ...
) }}

-- With max_cycles for controlled shutdown
{{ config(
    trigger={'type': 'processing_time', 'interval': '10 seconds', 'max_cycles': 100},
    ...
) }}

Design Decisions

  • Batch loop stays in Jinjaprocessing_time uses a very large loop cap (999,999,999). When files are exhausted, Jinja calls adapter.wait_for_next_cycle() which sleeps and returns True/False.
  • Only incremental materialization supports processing_time (table does full DELETE each time, incompatible with continuous processing).
  • threading.Event for graceful shutdown — thread-safe, interruptible sleep via event.wait(timeout=interval).
  • Auto 30-day timeout for processing_time models (user can override via job_timeout_seconds).

Testing

  • ✅ 335 unit tests pass (including 38 new trigger tests)
  • ✅ 3 integration tests pass (full refresh, incremental, filtered edition)
  • ✅ Lint clean (ruff check + format)

mdrakiburrahman and others added 2 commits April 12, 2026 16:04
…for continuous per-model processing

Add processing_time trigger mode that continuously loops models:
discover files -> process batches -> sleep -> repeat. Fast models
are no longer blocked by slow ones during long dbt runs.

New files:
- trigger_config.py: TriggerConfig dataclass + interval parser
- test_trigger_config.py: 27 parsing/validation tests
- test_trigger_lifecycle.py: 11 lifecycle/shutdown tests

Modified:
- constants.py: trigger-related constants (30-day timeout)
- impl.py: wait_for_next_cycle(), parse_trigger(), signal handlers
- incremental.sql: trigger config, dynamic loop cap, inter-cycle sleep
- README.md: new Streaming / Continuous Processing section

Closes #27

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
dbt's Jinja sandbox blocks range() calls larger than 100,000.
The previous value of 999,999,999 caused 'Range too big' errors.
99,999 cycles at 10s intervals = ~11.5 days, well within the
30-day timeout for processing_time models.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@mdrakiburrahman mdrakiburrahman merged commit 2ee9cde into main Apr 12, 2026
2 checks passed
@mdrakiburrahman mdrakiburrahman deleted the dev/mdrrahman/aggregation-queries branch April 12, 2026 22:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: Support Spark Streaming trigger modes (Trigger.ProcessingTime) for continuous per-model processing

1 participant