Skip to content

feat: add ExecutionGraph, CompletionTracker, and Task model for async scheduler#356

Open
andreatgretel wants to merge 7 commits intomainfrom
andreatgretel/feat/async-generators-and-task-queue-foundation
Open

feat: add ExecutionGraph, CompletionTracker, and Task model for async scheduler#356
andreatgretel wants to merge 7 commits intomainfrom
andreatgretel/feat/async-generators-and-task-queue-foundation

Conversation

@andreatgretel
Copy link
Contributor

@andreatgretel andreatgretel commented Feb 26, 2026

Summary

PR 1 of 4 in the async generators & task-queue builder plan. Adds the foundational data structures — ExecutionGraph, CompletionTracker, and Task/TaskResult/TaskTrace — that the async scheduler (PR 3) will consume. No existing behavior changes; all new modules under engine/dataset_builders/utils/.

Changes

Added

  • execution_graph.py — Column-level DAG built from config dependencies. Supports topological ordering (Kahn's, cached), critical path, cell-level dependency resolution, side-effect column mapping, Mermaid visualization, upfront task count estimation, cached upstream_by_strategy, and a create() factory classmethod.
  • completion_tracker.py — Tracks per-cell and per-batch completion state across row groups. Uses an event-driven frontier — readiness is computed incrementally on mark_complete/mark_batch_complete/drop_row via _enqueue_downstream, so get_ready_tasks returns in O(frontier) instead of scanning all columns × rows × row groups (O(C × R × G)) per tick. Handles row drops and batch-level markers.
  • task_model.py — Frozen dataclasses for Task (hashable work unit), TaskResult (outcome), and TaskTrace (timing trace). Includes ColumnName, RowGroupIndex, RowIndex type aliases for self-documenting signatures.
  • test_execution_graph.py (381 lines) — Tests for graph construction, topological order, critical path, cell dependencies, side-effects, Mermaid output, cycle detection, task counts.
  • test_completion_tracker.py (257 lines) — Tests for mark/query, batch completion, row drops, frontier-based readiness resolution, multi-row-group scenarios.
  • test_task_model.py (87 lines) — Tests for equality, hashing, set membership, defaults.

Changed

Total: +1,250 / -29 lines across 9 files (6 new, 3 modified). ~58% of added lines are tests (725 test / 506 source).

Attention Areas

Reviewers: Please pay special attention to the following:

  • completion_tracker.py — Event-driven frontier logic in _enqueue_downstream and _reevaluate_batch_tasks. This is the core optimization: cell completions do O(fan_out), batch completions check downstream rows, and get_ready_tasks is just a frontier filter.
  • execution_graph.py — Core DAG logic. The cell_dependencies method resolves side-effect columns and maps generation strategy to readiness granularity (cell vs batch). upstream_by_strategy is cached and used by the frontier logic. This is the contract that PR 3's scheduler will rely on.

Test plan

  • All new tests pass — 188 passed (pytest tests/engine/dataset_builders/utils/)
  • make check-all passes (lint + format)
  • Existing test suite unaffected — no imports from these modules yet

Description updated with AI

… scheduler

Add the foundational data structures for the async task-queue dataset
builder (plan #346, PR 1/4):

- ExecutionGraph: column-level static DAG with topological ordering,
  critical path, task counts, cell-dependency resolution, Mermaid output,
  and side-effect column mapping (__trace, __reasoning_content).
- CompletionTracker: lightweight (column, row_group, row_index) completion
  state with row dropping and ready-task enumeration.
- Task/TaskResult/TaskTrace: frozen hashable task dataclass, result
  container, and opt-in tracing record.

All three are pure data structures with no side effects on the existing
codebase. They live in new modules under engine/dataset_builders/utils/
and are only imported by code introduced in later PRs.

56 unit tests covering graph construction, validation, dependency
resolution, completion tracking, row drops, and task model semantics.

Refs #346
Add `is_ready` and `is_batch_ready` methods to CompletionTracker to
simplify `ready_tasks`. Cache topological order in ExecutionGraph since
the graph is immutable after construction. Move DatasetBuilderColumnConfigT
type alias to multi_column_configs. Fix license header years.
@andreatgretel andreatgretel requested a review from a team as a code owner February 26, 2026 21:59
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 26, 2026

Greptile Summary

This PR introduces three foundational modules — ExecutionGraph, CompletionTracker, and Task/TaskResult/TaskTrace — that serve as the scheduling substrate for the async generator pipeline. The architecture is sound: the event-driven frontier in CompletionTracker efficiently tracks readiness incrementally, and the column-level DAG with cached topological ordering in ExecutionGraph provides a clean contract for downstream schedulers.

Key findings:

  • Logic — completion_tracker.py lines 79 & 143: If mark_complete, mark_batch_complete, or drop_row are called with a row group not in the original row_groups list, self._row_group_sizes[row_group] will raise an unhandled KeyError inside the asyncio event loop. Should validate the row group exists before accessing the dict.
  • Logic — completion_tracker.py lines 84–86: The batch-upstream readiness check uses column key existence, which can pass even if only a single cell of a FULL_COLUMN column has been marked complete. If the scheduler mistakenly calls mark_complete on a FULL_COLUMN column, downstream FULL_COLUMN tasks can be prematurely enqueued without complete upstream data.
  • Logic — execution_graph.py line 136: critical_path() calls max() on an empty list without checking, raising ValueError when no columns have been registered. Should return early for empty graphs.

Test coverage is solid (~58% of added lines are tests), but there is no test covering mark_complete/mark_batch_complete with an unregistered row group.

Confidence Score: 2/5

  • Not safe to merge without addressing the two logic issues in CompletionTracker that could cause silent event loop crashes or incorrect scheduling.
  • Two concrete logic issues in CompletionTracker need fixing: (1) unguarded KeyError access for unknown row groups can crash the event loop, (2) batch-upstream readiness check uses key existence instead of full completion, allowing premature task enqueuing if the API is used incorrectly. A third issue in ExecutionGraph's critical_path() is lower severity but worth fixing. These are straightforward fixes that prevent real runtime bugs.
  • packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py (critical), packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py (moderate)

Last reviewed commit: b08cb3d

@@ -0,0 +1,133 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: thoughts on putting these resources in a dedicated module somewhere outside of dataset_builders? May be async_helpers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought about it but these modules are only consumed by the dataset builder and share types with the other utils here (dag.py, concurrency.py, async_concurrency.py). moving them out would scatter tightly-coupled code without reducing coupling. keeping them in dataset_builders/utils/ for now — happy to revisit if they get reused elsewhere.

def is_complete(self, column: str, row_group: int, row_index: int) -> bool:
return row_index in self._completed.get(row_group, {}).get(column, set())

def all_complete(self, cells: list[tuple[str, int, int | None]]) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: all_complete -> is_all_complete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


def __init__(self) -> None:
# row_group → column → set of completed local row indices
self._completed: dict[int, dict[str, set[int]]] = defaultdict(lambda: defaultdict(set))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Use type aliases (or a NamedTuple) for row/column/row-group coordinates

The nested type dict[int, dict[str, set[int]]] in CompletionTracker._completed is hard to reason about at a glance — you have to mentally map "outer int = row group, str = column, inner int = row index" every time you read it. The same (str, int, int | None) tuple pattern also appears repeatedly across both CompletionTracker and ExecutionGraph.

Type aliases would help:

# In task_model.py or a shared types module
from typing import TypeAlias

RowIndex: TypeAlias = int
RowGroup: TypeAlias = int
ColumnName: TypeAlias = str

Then signatures become self-documenting:

# Before
self._completed: dict[int, dict[str, set[int]]]

# After
self._completed: dict[RowGroup, dict[ColumnName, set[RowIndex]]]
# Before
def mark_complete(self, column: str, row_group: int, row_index: int) -> None:

# After
def mark_complete(self, column: ColumnName, row_group: RowGroup, row_index: RowIndex) -> None:

You could also replace the tuple[str, int, int | None] scattered across both modules with a NamedTuple:

class CellCoord(NamedTuple):
    column: ColumnName
    row_group: RowGroup
    row_index: RowIndex | None

This lets you write coord.column instead of coord[0], is still hashable and tuple-compatible, and makes cell_dependencies and all_complete easier to follow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added ColumnName, RowGroup, RowIndex type aliases in task_model.py and applied them across CompletionTracker and ExecutionGraph. skipped the CellCoord namedtuple — the destructuring pattern for col, rg, ri in cells is already clear and used consistently, and the namedtuple adds allocation overhead in a hot loop.

Comment on lines +180 to +181
graph._columns.append(name)
graph._strategies[name] = strategies[name]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these can change to accessing public api right? There are a few instances of this pattern in this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call. added add_column(), add_edge(), set_side_effect(), and resolve_side_effect() to ExecutionGraph and rewrote build_execution_graph to use them.

status: str = ""
error: str | None = None

@staticmethod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Prefer @classmethod over @staticmethod for TaskTrace.from_task

@classmethod is the standard Python convention for alternative constructors:

# Current
@staticmethod
def from_task(task: Task) -> TaskTrace:
    return TaskTrace(...)

# Preferred
@classmethod
def from_task(cls, task: Task) -> TaskTrace:
    return cls(...)

Using cls(...) instead of hardcoding TaskTrace(...) means the constructor works correctly with subclasses, and more importantly signals "this is an alternative constructor" idiomatically. Minor point since TaskTrace is unlikely to be subclassed, but it's a common convention worth following.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, switched to @classmethod + cls(...)

- Rename all_complete → is_all_complete for boolean method convention
- Add ColumnName, RowGroup, RowIndex type aliases for readability
- Add public mutation API to ExecutionGraph (add_column, add_edge,
  set_side_effect, resolve_side_effect) and rewrite build_execution_graph
  to use it instead of private attributes
- Change TaskTrace.from_task from @staticmethod to @classmethod
from typing import Any, Literal, TypeAlias

ColumnName: TypeAlias = str
RowGroup: TypeAlias = int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is technically also an index right? RowGroup > RowGroupIndex?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, renamed across all three modules

from data_designer.engine.dataset_builders.utils.task_model import ColumnName, RowGroup, RowIndex


@dataclass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be a dataclass?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, converted to a plain class with __init__

return "\n".join(lines)


def build_execution_graph(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably mostly stylistic:

This could be a factory create class method of the ExecutionGraph class itself:

@classmethod
def create(cls, column_configs: list[DatasetBuilderColumnConfigT], strategies: dict[ColumnName, GenerationStrategy]) -> Self:
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the logic into ExecutionGraph.create(), kept build_execution_graph as a thin deprecated wrapper so existing call sites still work

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to not update existing call sites now?

- Rename RowGroup type alias to RowGroupIndex for consistency
- Convert ExecutionGraph from dataclass to plain class
- Move build_execution_graph logic to ExecutionGraph.create() classmethod
Comment on lines +52 to +75
def is_ready(
self,
column: ColumnName,
row_group: RowGroupIndex,
row_index: RowIndex,
graph: ExecutionGraph,
row_group_size: int,
) -> bool:
"""Check if all upstream columns are done for this (column, row_group, row_index)."""
deps = graph.cell_dependencies(column, row_group, row_index, row_group_size)
return self.is_all_complete(deps)

def is_batch_ready(
self,
column: ColumnName,
row_group: RowGroupIndex,
row_group_size: int,
graph: ExecutionGraph,
) -> bool:
"""Check if all upstream columns are done for all non-dropped rows in the row group."""
deps = graph.cell_dependencies(column, row_group, None, row_group_size)
# Dropped rows don't need their upstream cells complete
deps = [(c, rg, ri) for c, rg, ri in deps if ri is None or not self.is_dropped(rg, ri)]
return self.is_all_complete(deps)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit detail: could these methods instead take whatever graph.cell_dependencies returns as a dependency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moot now — get_ready_tasks no longer scans or checks dependencies. it just returns [t for t in self._frontier if t not in dispatched]. dependency resolution moved into _enqueue_downstream, which fires incrementally on each mark_complete / mark_batch_complete using graph.upstream_by_strategy. this turns the scheduler tick from O(C × R × G) to O(downstream_fan_out) per completion.

Copy link
Contributor

@nabinchha nabinchha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andreatgretel a few more comments related to perf!

Optimization Review

High Impact

1. get_ready_tasks is O(C × R × G) on every scheduler tick

This scans every column × every row × every row group on each call. With 10 columns, 10k records, buffer_size=100, that's ~100k iterations per tick, each triggering cell_dependencies() + is_all_complete().

Two suggestions:

  • Early skip for completed column×row_group pairs in the cell-by-cell branch. Before the inner row loop, a quick check like len(completed.get(col, set())) + len(dropped) >= rg_size would let you skip entire blocks.
  • Incremental/event-driven readiness (future PR): maintain a frontier set updated on mark_complete instead of full-scanning. This turns the scheduler from poll-based to event-driven.

2. cell_dependencies allocates a new list + tuples every call

Called per-cell inside the hot loop. For a 100-row batch with 3 upstream columns: 100 list allocations + 300 tuple allocations per column per row group per tick. Since the graph is immutable, the dependency pattern for a given column is always the same — only (row_group, row_index) varies. A cached descriptor that is_all_complete interprets directly could avoid most allocations.

3. is_batch_ready builds full dep list then filters it

deps = graph.cell_dependencies(column, row_group, None, row_group_size)
deps = [(c, rg, ri) for c, rg, ri in deps if ri is None or not self.is_dropped(rg, ri)]

For a full-column downstream of a 1000-row cell-by-cell column, this builds 1000 tuples then creates a second filtered list. Consider checking dropped rows inline or passing the dropped set into the dependency resolution.

Low Impact (fine to defer)

4. topological_order() and columns copy on every accesstopological_order() does return list(cache) and is called once per column per row group in get_ready_tasks. Since the graph is immutable and callers don't mutate the result, an internal _topological_order that returns the cached list directly (skipping the copy) would help in the hot path. Same for the columns property.

5. is_all_complete repeated dict lookups — Each (col, rg, ri) tuple triggers self._completed.get(rg, {}).get(col, set()) with temporary empty dict/set allocations on misses. Hoisting the row-group lookup outside the per-cell loop would reduce overhead.

6. _upstream/_downstream are defaultdict but accessors use .get(key, set()) — Allocates a fresh empty set on every miss. Minor, but switching to plain dict would make the no-side-effect intent explicit and avoid the allocation.

Summary

The two highest-impact changes are (1) early-skip logic in get_ready_tasks and (2) reducing per-cell allocations in cell_dependencies. Everything else is micro-optimization that can wait until profiling confirms it matters. Great foundation overall.

@andreatgretel
Copy link
Contributor Author

@nabinchha update on the optimization review after the event-driven frontier refactor:

1. get_ready_tasks O(C × R × G) per tick — addressed. get_ready_tasks is now [t for t in self._frontier if t not in dispatched]. Readiness is computed incrementally in _enqueue_downstream on each mark_complete/mark_batch_complete, so cost is O(downstream_fan_out) per completion instead of O(C × R × G) per tick.

2. cell_dependencies allocations per call — no longer in the hot path. The frontier logic uses graph.upstream_by_strategy (cached) directly. No per-cell list/tuple allocations on each tick.

3. is_batch_ready builds full dep list then filters — removed. Batch readiness is checked inline by _are_cell_ups_complete inside _enqueue_downstream and _reevaluate_batch_tasks, no intermediate list construction.

4–6 (topological_order copies, is_all_complete lookups, defaultdict) — already addressed in previous commits or no longer in the hot path.

Replace the poll-based get_ready_tasks (O(C × R × G) per tick) with an
event-driven frontier maintained on mark_complete/mark_batch_complete/
drop_row. get_ready_tasks now returns O(frontier) instead of scanning
all columns × rows × row groups.
@andreatgretel andreatgretel requested a review from nabinchha March 2, 2026 20:13
- Add ReadyTasksFixture dataclass and ready_ctx pytest fixture to
  deduplicate graph/tracker/dispatched setup across get_ready_tasks tests
- Align test with ExecutionGraph.create API rename
- Remove redundant inline comments
- CompletionTracker now raises ValueError when graph/row_groups
  are provided without each other
- resolve_side_effect prefers real columns over aliases when a
  name collision exists
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7 files reviewed, 7 comments

Edit Code Review Agent Settings | Greptile

Comment on lines +59 to +69
def mark_complete(self, column: ColumnName, row_group: RowGroupIndex, row_index: RowIndex) -> None:
self._completed[row_group][column].add(row_index)
if self._graph is not None:
self._frontier.discard(Task(column=column, row_group=row_group, row_index=row_index, task_type="cell"))
self._enqueue_downstream(column, row_group, row_index=row_index)

def mark_batch_complete(self, column: ColumnName, row_group: RowGroupIndex, row_group_size: int) -> None:
self._completed[row_group][column] = set(range(row_group_size))
if self._graph is not None:
self._frontier.discard(Task(column=column, row_group=row_group, row_index=None, task_type="batch"))
self._enqueue_downstream(column, row_group, row_index=None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KeyError when graph is provided without row_groups

The frontier-driven path (mark_complete, mark_batch_complete, drop_row) is guarded only on self._graph is not None, but _row_group_sizes is only populated when both graph and row_groups are provided in __init__ (line 41). If a caller constructs CompletionTracker(graph=g) without row_groups and then calls any of these methods, line 76 in _enqueue_downstream will raise:

rg_size = self._row_group_sizes[row_group]   # KeyError — dict is empty

The docstring correctly states the frontier is only maintained when both are provided, so the guard condition should reflect this. Consider:

Suggested change
def mark_complete(self, column: ColumnName, row_group: RowGroupIndex, row_index: RowIndex) -> None:
self._completed[row_group][column].add(row_index)
if self._graph is not None:
self._frontier.discard(Task(column=column, row_group=row_group, row_index=row_index, task_type="cell"))
self._enqueue_downstream(column, row_group, row_index=row_index)
def mark_batch_complete(self, column: ColumnName, row_group: RowGroupIndex, row_group_size: int) -> None:
self._completed[row_group][column] = set(range(row_group_size))
if self._graph is not None:
self._frontier.discard(Task(column=column, row_group=row_group, row_index=None, task_type="batch"))
self._enqueue_downstream(column, row_group, row_index=None)
def mark_complete(self, column: ColumnName, row_group: RowGroupIndex, row_index: RowIndex) -> None:
self._completed[row_group][column].add(row_index)
if self._graph is not None and self._row_group_sizes:
self._frontier.discard(Task(column=column, row_group=row_group, row_index=row_index, task_type="cell"))
self._enqueue_downstream(column, row_group, row_index=row_index)

Apply the same fix to mark_batch_complete (line 67) and drop_row (line 128).

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py
Line: 59-69

Comment:
**KeyError when `graph` is provided without `row_groups`**

The frontier-driven path (`mark_complete`, `mark_batch_complete`, `drop_row`) is guarded only on `self._graph is not None`, but `_row_group_sizes` is only populated when **both** `graph` and `row_groups` are provided in `__init__` (line 41). If a caller constructs `CompletionTracker(graph=g)` without `row_groups` and then calls any of these methods, line 76 in `_enqueue_downstream` will raise:

```python
rg_size = self._row_group_sizes[row_group]   # KeyError — dict is empty
```

The docstring correctly states the frontier is only maintained when *both* are provided, so the guard condition should reflect this. Consider:

```suggestion
    def mark_complete(self, column: ColumnName, row_group: RowGroupIndex, row_index: RowIndex) -> None:
        self._completed[row_group][column].add(row_index)
        if self._graph is not None and self._row_group_sizes:
            self._frontier.discard(Task(column=column, row_group=row_group, row_index=row_index, task_type="cell"))
            self._enqueue_downstream(column, row_group, row_index=row_index)
```

Apply the same fix to `mark_batch_complete` (line 67) and `drop_row` (line 128).

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +65 to +69
def mark_batch_complete(self, column: ColumnName, row_group: RowGroupIndex, row_group_size: int) -> None:
self._completed[row_group][column] = set(range(row_group_size))
if self._graph is not None:
self._frontier.discard(Task(column=column, row_group=row_group, row_index=None, task_type="batch"))
self._enqueue_downstream(column, row_group, row_index=None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Row-group size mismatch can permanently block downstream tasks

The mark_batch_complete method accepts a row_group_size parameter which sets the completed set (line 66):

self._completed[row_group][column] = set(range(row_group_size))

However, _enqueue_downstream uses the authoritative size from _row_group_sizes (populated at construction, line 76):

rg_size = self._row_group_sizes[row_group]

If these sizes differ — e.g., a partial last batch where caller passes row_group_size=50 but _row_group_sizes[rg]=100 — the frontier logic will iterate over rows 0..99 in _are_cell_ups_complete (line 191). Rows 50..99 will never be marked complete, so downstream FULL_COLUMN tasks that depend on this column will never become ready, permanently blocking the pipeline.

Consider removing the row_group_size parameter entirely and using the authoritative self._row_group_sizes[row_group] inside mark_batch_complete, or at minimum asserting equality before marking complete:

Suggested change
def mark_batch_complete(self, column: ColumnName, row_group: RowGroupIndex, row_group_size: int) -> None:
self._completed[row_group][column] = set(range(row_group_size))
if self._graph is not None:
self._frontier.discard(Task(column=column, row_group=row_group, row_index=None, task_type="batch"))
self._enqueue_downstream(column, row_group, row_index=None)
def mark_batch_complete(self, column: ColumnName, row_group: RowGroupIndex, row_group_size: int) -> None:
assert row_group_size == self._row_group_sizes.get(row_group, row_group_size), \
f"Batch size mismatch: got {row_group_size}, expected {self._row_group_sizes.get(row_group, 'unknown')}"
self._completed[row_group][column] = set(range(row_group_size))
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py
Line: 65-69

Comment:
**Row-group size mismatch can permanently block downstream tasks**

The `mark_batch_complete` method accepts a `row_group_size` parameter which sets the completed set (line 66):

```python
self._completed[row_group][column] = set(range(row_group_size))
```

However, `_enqueue_downstream` uses the authoritative size from `_row_group_sizes` (populated at construction, line 76):

```python
rg_size = self._row_group_sizes[row_group]
```

If these sizes differ — e.g., a partial last batch where caller passes `row_group_size=50` but `_row_group_sizes[rg]=100` — the frontier logic will iterate over rows `0..99` in `_are_cell_ups_complete` (line 191). Rows `50..99` will never be marked complete, so downstream FULL_COLUMN tasks that depend on this column will never become ready, permanently blocking the pipeline.

Consider removing the `row_group_size` parameter entirely and using the authoritative `self._row_group_sizes[row_group]` inside `mark_batch_complete`, or at minimum asserting equality before marking complete:

```suggestion
    def mark_batch_complete(self, column: ColumnName, row_group: RowGroupIndex, row_group_size: int) -> None:
        assert row_group_size == self._row_group_sizes.get(row_group, row_group_size), \
            f"Batch size mismatch: got {row_group_size}, expected {self._row_group_sizes.get(row_group, 'unknown')}"
        self._completed[row_group][column] = set(range(row_group_size))
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +115 to +116
self._topological_order_cache = order
return order
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mutable cache returned directly — callers can silently corrupt it

topological_order() stores the result in _topological_order_cache and returns the same list object. Any caller that mutates the returned list (sort, append, etc.) will corrupt the cache, causing subsequent calls to return the mutated order without re-running Kahn's algorithm.

A defensive copy is cheap and prevents this edge case:

Suggested change
self._topological_order_cache = order
return order
self._topological_order_cache = order
return list(order)
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py
Line: 115-116

Comment:
**Mutable cache returned directly — callers can silently corrupt it**

`topological_order()` stores the result in `_topological_order_cache` and returns the same list object. Any caller that mutates the returned list (sort, append, etc.) will corrupt the cache, causing subsequent calls to return the mutated order without re-running Kahn's algorithm.

A defensive copy is cheap and prevents this edge case:

```suggestion
        self._topological_order_cache = order
        return list(order)
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +248 to +256
def build_execution_graph(
column_configs: list[DatasetBuilderColumnConfigT],
strategies: dict[ColumnName, GenerationStrategy],
) -> ExecutionGraph:
"""Build an ``ExecutionGraph`` from column configs and pre-computed strategies.

.. deprecated:: Use ``ExecutionGraph.create()`` instead.
"""
return ExecutionGraph.create(column_configs, strategies)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deprecated function emits no runtime warning

build_execution_graph is marked deprecated in the docstring, but there is no warnings.warn() call, so callers receive no signal at runtime that they should migrate to ExecutionGraph.create(). Additionally, the test file test_execution_graph.py uses the deprecated wrapper throughout (12 times), inconsistent with test_completion_tracker.py which uses the new ExecutionGraph.create() API.

Consider emitting a runtime deprecation signal:

Suggested change
def build_execution_graph(
column_configs: list[DatasetBuilderColumnConfigT],
strategies: dict[ColumnName, GenerationStrategy],
) -> ExecutionGraph:
"""Build an ``ExecutionGraph`` from column configs and pre-computed strategies.
.. deprecated:: Use ``ExecutionGraph.create()`` instead.
"""
return ExecutionGraph.create(column_configs, strategies)
def build_execution_graph(
column_configs: list[DatasetBuilderColumnConfigT],
strategies: dict[ColumnName, GenerationStrategy],
) -> ExecutionGraph:
"""Build an ``ExecutionGraph`` from column configs and pre-computed strategies.
.. deprecated:: Use ``ExecutionGraph.create()`` instead.
"""
import warnings
warnings.warn(
"build_execution_graph() is deprecated; use ExecutionGraph.create() instead.",
DeprecationWarning,
stacklevel=2,
)
return ExecutionGraph.create(column_configs, strategies)
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py
Line: 248-256

Comment:
**Deprecated function emits no runtime warning**

`build_execution_graph` is marked deprecated in the docstring, but there is no `warnings.warn()` call, so callers receive no signal at runtime that they should migrate to `ExecutionGraph.create()`. Additionally, the test file `test_execution_graph.py` uses the deprecated wrapper throughout (12 times), inconsistent with `test_completion_tracker.py` which uses the new `ExecutionGraph.create()` API.

Consider emitting a runtime deprecation signal:

```suggestion
def build_execution_graph(
    column_configs: list[DatasetBuilderColumnConfigT],
    strategies: dict[ColumnName, GenerationStrategy],
) -> ExecutionGraph:
    """Build an ``ExecutionGraph`` from column configs and pre-computed strategies.

    .. deprecated:: Use ``ExecutionGraph.create()`` instead.
    """
    import warnings
    warnings.warn(
        "build_execution_graph() is deprecated; use ExecutionGraph.create() instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    return ExecutionGraph.create(column_configs, strategies)
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +62 to +72
def mark_complete(self, column: ColumnName, row_group: RowGroupIndex, row_index: RowIndex) -> None:
self._completed[row_group][column].add(row_index)
if self._graph is not None:
self._frontier.discard(Task(column=column, row_group=row_group, row_index=row_index, task_type="cell"))
self._enqueue_downstream(column, row_group, row_index=row_index)

def mark_batch_complete(self, column: ColumnName, row_group: RowGroupIndex, row_group_size: int) -> None:
self._completed[row_group][column] = set(range(row_group_size))
if self._graph is not None:
self._frontier.discard(Task(column=column, row_group=row_group, row_index=None, task_type="batch"))
self._enqueue_downstream(column, row_group, row_index=None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._row_group_sizes[row_group] (accessed at lines 79 and 143) will raise KeyError if mark_complete, mark_batch_complete, or drop_row is ever called with a row_group value that was not included in the row_groups argument at construction. In the async scheduler context, a late-arriving completion event from an unregistered row group would crash the event loop silently.

Add a defensive guard in _enqueue_downstream and _reevaluate_batch_tasks:

rg_size = self._row_group_sizes.get(row_group)
if rg_size is None:
    return
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py
Line: 62-72

Comment:
`self._row_group_sizes[row_group]` (accessed at lines 79 and 143) will raise `KeyError` if `mark_complete`, `mark_batch_complete`, or `drop_row` is ever called with a `row_group` value that was not included in the `row_groups` argument at construction. In the async scheduler context, a late-arriving completion event from an unregistered row group would crash the event loop silently.

Add a defensive guard in `_enqueue_downstream` and `_reevaluate_batch_tasks`:

```python
rg_size = self._row_group_sizes.get(row_group)
if rg_size is None:
    return
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +84 to +86
# All batch upstreams must be present in completed dict
if any(up not in rg_completed for up in batch_ups):
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition if any(up not in rg_completed for up in batch_ups) checks only whether a batch-upstream column key exists in the completion dict, not whether all rows are complete. A single call to mark_complete(up_col, rg, 0) on a FULL_COLUMN column creates the column key even though only one row is marked complete.

If the scheduler calls mark_complete on a FULL_COLUMN column (due to strategy mismatch or other paths), downstream FULL_COLUMN tasks with no CELL_BY_CELL upstreams will be enqueued prematurely. The downstream check _are_cell_ups_complete([], ...) returns true for an empty list, bypassing actual completion validation.

Consider tracking batch-level completions separately to distinguish between partial and complete batches, or validate that all rows in batch upstreams are complete before enqueuing downstream tasks.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py
Line: 84-86

Comment:
The condition `if any(up not in rg_completed for up in batch_ups)` checks only whether a batch-upstream column key exists in the completion dict, not whether all rows are complete. A single call to `mark_complete(up_col, rg, 0)` on a `FULL_COLUMN` column creates the column key even though only one row is marked complete.

If the scheduler calls `mark_complete` on a `FULL_COLUMN` column (due to strategy mismatch or other paths), downstream `FULL_COLUMN` tasks with no `CELL_BY_CELL` upstreams will be enqueued prematurely. The downstream check `_are_cell_ups_complete([], ...)` returns true for an empty list, bypassing actual completion validation.

Consider tracking batch-level completions separately to distinguish between partial and complete batches, or validate that all rows in batch upstreams are complete before enqueuing downstream tasks.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +124 to +143
def critical_path(self) -> list[str]:
"""Longest dependency chain (by number of columns)."""
order = self.topological_order()
dist: dict[str, int] = {col: 0 for col in order}
pred: dict[str, str | None] = {col: None for col in order}

for col in order:
for child in self._downstream.get(col, set()):
if dist[col] + 1 > dist[child]:
dist[child] = dist[col] + 1
pred[child] = col

end = max(order, key=lambda c: dist[c])
path: list[str] = []
cur: str | None = end
while cur is not None:
path.append(cur)
cur = pred[cur]
path.reverse()
return path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical_path() calls max(order, ...) on line 136 without checking if order is empty. When no columns have been registered, topological_order() returns [], and max() on an empty sequence raises ValueError: max() arg is an empty sequence.

Add an early return for the empty case:

def critical_path(self) -> list[str]:
    order = self.topological_order()
    if not order:
        return []
    ...
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py
Line: 124-143

Comment:
`critical_path()` calls `max(order, ...)` on line 136 without checking if `order` is empty. When no columns have been registered, `topological_order()` returns `[]`, and `max()` on an empty sequence raises `ValueError: max() arg is an empty sequence`.

Add an early return for the empty case:

```python
def critical_path(self) -> list[str]:
    order = self.topological_order()
    if not order:
        return []
    ...
```

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ Claude also flagged this error in my review.

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 3, 2026

Additional Comments (3)

packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py, line 72
self._row_group_sizes[row_group] (accessed at lines 79 and 143) will raise KeyError if mark_complete, mark_batch_complete, or drop_row is ever called with a row_group value that was not included in the row_groups argument at construction. In the async scheduler context, a late-arriving completion event from an unregistered row group would crash the event loop silently.

Add a defensive guard in _enqueue_downstream and _reevaluate_batch_tasks:

rg_size = self._row_group_sizes.get(row_group)
if rg_size is None:
    return

packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py, line 86
The condition if any(up not in rg_completed for up in batch_ups) checks only whether a batch-upstream column key exists in the completion dict, not whether all rows are complete. A single call to mark_complete(up_col, rg, 0) on a FULL_COLUMN column creates the column key even though only one row is marked complete.

If the scheduler calls mark_complete on a FULL_COLUMN column (due to strategy mismatch or other paths), downstream FULL_COLUMN tasks with no CELL_BY_CELL upstreams will be enqueued prematurely. The downstream check _are_cell_ups_complete([], ...) returns true for an empty list, bypassing actual completion validation.

Consider tracking batch-level completions separately to distinguish between partial and complete batches, or validate that all rows in batch upstreams are complete before enqueuing downstream tasks.


packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py, line 143
critical_path() calls max(order, ...) on line 136 without checking if order is empty. When no columns have been registered, topological_order() returns [], and max() on an empty sequence raises ValueError: max() arg is an empty sequence.

Add an early return for the empty case:

def critical_path(self) -> list[str]:
    order = self.topological_order()
    if not order:
        return []
    ...

Comment on lines +115 to +116
def is_all_complete(self, cells: list[tuple[ColumnName, RowGroupIndex, RowIndex | None]]) -> bool:
"""Check whether all the given (column, row_group, row_index) tuples are done.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would it be helpful to have a Cell object that we pass around? maybe not a big deal for agents, but for humans the requirement to get the tuple oder right is error prone 🤷‍♂️

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would also remove the need for this type aliases, i think

Comment on lines +62 to +63
def mark_complete(self, column: ColumnName, row_group: RowGroupIndex, row_index: RowIndex) -> None:
self._completed[row_group][column].add(row_index)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we be more explicit about what is being marked complete? This is marking a single row complete, right?

Also, another super nit: historically we have used "record" throughout the codebase, including in the interface – e.g., num_records. I like that "row" is short lol, but wanted to call out that this is a small inconsistency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW – I'm not saying we necessarily need to change row -> record. Just wanted to make sure we knowingly have an inconsistency. Your call.

Comment on lines +118 to +119
A ``row_index`` of ``None`` means the entire batch for that column must
be complete (i.e., that column key must exist in the row group's dict).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Purely speculating here, but this being the signal that a column is complete feels like something is off – e.g. why are are passing a column name and row group index when no row index exists. I realize this is the opposite of actionable feedback lol. Just noting this came to mind.

self._frontier.discard(Task(column=column, row_group=row_group, row_index=row_index, task_type="cell"))
self._enqueue_downstream(column, row_group, row_index=row_index)

def mark_batch_complete(self, column: ColumnName, row_group: RowGroupIndex, row_group_size: int) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: for me, it would be more clear what this method does if we called it something like mark_row_range_complete.

Comment on lines +121 to +124
for col, rg, ri in cells:
if ri is None:
if col not in self._completed.get(rg, {}):
return False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude seems convinced that is a bug in is_all_complete. Having trouble figure out if this is true, so sharing here:

When called with row_index=None (meaning "is this batch column fully done?"), line 120 only checks key presence:

if col not in self._completed.get(rg, {}):
    return False

It doesn't check how many rows are in the set. So this sequence produces a wrong answer:

tracker = CompletionTracker()
tracker.mark_complete("topic", row_group=0, row_index=0)  # just row 0

# This returns True — but only 1 of 3 rows is done!
tracker.is_all_complete([("topic", 0, None)])

The key "topic" exists in self._completed[0] after the single mark_complete call, so the check passes.

Why it matters

is_all_complete is consumed by cell_dependencies callers to verify upstream readiness. If FULL_COLUMN column (like a sampler) is an upstream dep, the downstream's dependency list includes ("topic", 0, None). If something goes wrong and individual mark_complete calls happen on a column that should only be batch-completed, is_all_complete would report "ready" after a single row — potentially causing the scheduler to dispatch work before its inputs are actually available.

Why it's not catastrophic today

The frontier-based scheduling in _enqueue_downstream does not use is_all_complete. It has its own separate check at line 82:

if any(up not in rg_completed for up in batch_ups):
    continue

This has the same key-presence issue, but the frontier is only updated from mark_complete and mark_batch_complete call sites, which the scheduler will control. So in practice the scheduler would call the right method for the right column type. The bug is latent — it's an API semantics issue rather than a live runtime failure.

The two fix options

Option A: Track batch completions separately (cleaner)

  def __init__(self, ...) -> None:
      ...
      self._batch_complete: dict[RowGroupIndex, set[ColumnName]] = defaultdict(set)

  def mark_batch_complete(self, column, row_group, row_group_size):
      self._completed[row_group][column].update(range(row_group_size))
      self._batch_complete[row_group].add(column)
      ...

  def is_all_complete(self, cells):
      for col, rg, ri in cells:
          if ri is None:
              if col not in self._batch_complete.get(rg, set()):
                  return False
          elif not self.is_complete(col, rg, ri):
              return False
      return True

Now the ri=None path checks a definitive signal that mark_batch_complete was actually called, not just that some row happened to exist.

Option B: Document the precondition (simpler)

Add to the docstring: "Callers must ensure that batch columns (row_index=None) are completed via mark_batch_complete, not individual mark_complete calls. The check only verifies the column key is present."

This accepts the semantic gap but makes it explicit so future callers don't trip over it.

return column
return self._side_effect_map.get(column, column)

def upstream(self, column: ColumnName) -> set[ColumnName]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: get_upstream_columns would be more consistent with the existing codebase. same for downstream and other places names are potentially vague

self._frontier.add(task)
else:
# Batch completion: check all non-dropped, non-complete rows
down_completed = rg_completed.get(down, set())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are needing to do this .get pattern an awful lot throughout this implementation. Not a big deal I suppose, but it might be a signal that some sort of abstraction might help with code readability.

Comment on lines +16 to +17
class CompletionTracker:
"""Tracks which (column, row_group, row_index) tuples are done.
Copy link
Contributor

@johnnygreco johnnygreco Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tuple (column, row_group, row_index) is a cell, right? Sorry, I have left too many comments about this, but it feels clunky to have to carry the tuple around everywhere 😅

Can this be indexed / framed as cell_{row_group}? Actually, this makes me realize I'm not sure what the range of row_index is. Is it the actual dataset range, so (i, j) = (row_index, column) in the dataset? Or are we resetting the range for each row group?

self._topological_order_cache = order
return order

def critical_path(self) -> list[str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things I'm noticing as general feedback throughout:

  • We generally start method names with a verb – e.g, "get_", "load_", "save_", etc.
  • Where possible, it would be nice to be as explicit as possible (without have names that are 10 words long lol). In this case, I can probably guess what critical path means, but "longest dependency chain" is more clear at first glance.

class Task:
"""A unit of work for the async scheduler."""

column: ColumnName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we use column_name in other places (note sure if we are 100% consistent about this, though). personally, I find ColumnName sort of strange – would prefer column_name: str.

Comment on lines +19 to +21
row_group: RowGroupIndex
row_index: RowIndex | None # None for batch/full-column tasks
task_type: Literal["from_scratch", "cell", "batch", "pre_batch_processor", "post_batch_processor"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coming back to feeling like maybe the abstractions might still need some fiddling. Mainly because we are passing the tuple around, which when fully specified is a single cell, but then it also is the same as a Task once we add a task_type.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants