Skip to content

refactor(python_executors): replace WorkerPool with loky reusable executor#14

Open
GrigoryEvko wants to merge 816 commits into
FusionBrainLab:mainfrom
GrigoryEvko:loky-executor
Open

refactor(python_executors): replace WorkerPool with loky reusable executor#14
GrigoryEvko wants to merge 816 commits into
FusionBrainLab:mainfrom
GrigoryEvko:loky-executor

Conversation

@GrigoryEvko

@GrigoryEvko GrigoryEvko commented May 15, 2026

Copy link
Copy Markdown

The hand-rolled WorkerPool (persistent subprocess pool over a length-prefixed cloudpickle stdin/stdout protocol) is replaced with loky.get_reusable_executor. Results are spilled to a per-call file on disk and read in the parent via mmap, so per-call memory cost is bounded by free disk space rather than parent RAM. Workers spawn lazily, are reused across calls, and on Linux receive PR_SET_PDEATHSIG so they die with the parent.

Known defects closed by construction:

  • WorkerPool never recycled workers; per-worker state accumulated indefinitely.
  • The default-pool singleton was lru_cache over asyncio.Queue/asyncio.Lock, binding the pool to the first event loop and breaking on subsequent loops (pytest-asyncio reset, repeated asyncio.run).
  • Worker timeout silently fell through to a one-shot subprocess retry, doubling wall time and hiding the timeout from callers.
  • MemoryLimitExceeded was a \"MemoryError\" in stderr substring heuristic that mislabelled in-validator OOMs, KeyError(\"MemoryError\"), and unrelated library messages.
  • Worker stderr=PIPE was never drained; parent RSS leaked unbounded.
  • _kill_process_tree's bare proc.kill() could target reused PIDs after a worker exit.
  • _run_via_worker swallowed cloudpickle.loads errors and discarded captured stderr on TimeoutError/IncompleteReadError.
  • get_worker awaited subprocess spawn while holding the pool lock, serialising the supposed fan-out.
  • cloudpickle.register_pickle_by_value(mod) accumulation concerns (verified bounded after refactor).

Defects explicitly fixed (locked in by regression tests):

  • ExecRunnerError.__str__ dropped stderr; now formatted into the exception message so log lines carry the user traceback.
  • execution.py truncated error messages to 200 chars; now logged in full.
  • User-code os.chdir()/sys.path.insert() calls persisted across worker reuse, leaking cwd and path mutations into the next task.
  • Loky's env= kwarg is additive, not replace — non-whitelisted env keys are now explicitly dropped in _worker_init, so ambient API tokens in the parent's environment do not reach user code.
  • TOCTOU symlink-swap on the spill path allowed arbitrary cloudpickle bytes to be unpickled by the parent (RCE vector); fixed with O_RDONLY | O_NOFOLLOW + post-open fstat.
  • Default spill directory tightened from bare \$TMPDIR to \$TMPDIR/gigaevo-<uid> with mode 0o700.
  • Spill-file leak on interpreter shutdown when shutdown_executor(wait=False) raced loky's executor-manager thread.
  • Loky pool thrash on os.environ mutation (kwargs inequality triggered pool teardown on every submit); scrubbed env is now cached once at import.
  • fd leak when os.fdopen raised mid-spill-write.
  • Stale type-ignore + 200-char truncation in execution.py.

Behaviour improvements:

  • Per-call result size bounded by free disk space, not parent RAM (mmap + protocol-5 cloudpickle).
  • Hard kill on timeout; no silent retry.
  • Default signal dispositions inside workers (KeyboardInterrupt on SIGINT, SIG_DFL on SIGTERM/SIGHUP/SIGQUIT) — user code sees standard Python semantics regardless of what loky installs internally.
  • Env whitelist drops ambient secrets: AWS_*, GH_TOKEN/GITHUB_TOKEN, OPENAI_API_KEY, ANTHROPIC_API_KEY, LANGFUSE_SECRET_KEY, WANDB_API_KEY, HF_TOKEN, STRIPE_*, SUPABASE_*, and everything else not in the explicit whitelist or under the GIGAEVO_*/LOKY_* prefixes.
  • PR_SET_PDEATHSIG on Linux replaces the ps-grep orphan-reaper in tools/flush.py.
  • Per-call worker accounting (peak RSS, wall time, user/sys time, pid) collected and logged at trace level.
  • max_workers auto-caps to cpu_count // xdist_workers under pytest-xdist to avoid the N×cpu_count fork-bomb on big hosts.
  • shutdown_executor blocks on the executor-manager thread when called with wait=True, so spill-cleanup done-callbacks fire before interpreter exit.

Public API surface:

  • run_exec_runner returns Any directly (was (value, b\"\", \"\") 3-tuple).
  • Dropped kwargs: cwd, runner_path (never used), max_memory_mb, max_output_size (replaced by disk-spilling).
  • Dropped attribute: ExecRunnerError.stdout_bytes (was always b\"\").
  • Constants MAX_MEMORY_MB/MAX_OUTPUT_SIZE removed from gigaevo.entrypoint.constants.
  • WorkerPool, default_exec_runner_pool, _run_via_worker, _start_worker_process, _kill_process_tree, _monitor_rss_limit removed entirely.

New env-var configuration:

  • GIGAEVO_EXECUTOR_MAX_WORKERS — override loky's max_workers.
  • GIGAEVO_EXECUTOR_IDLE_TIMEOUT_S — idle worker reap timeout (default 300s).
  • GIGAEVO_EXECUTOR_SPILL_DIR — where result pickles spill (default `$TMPDIR/gigaevo-`).

New dependency: `loky>=3.5`.

PetrAnokhin and others added 30 commits April 1, 2026 16:00
…_executor

Drops the hand-rolled length-prefixed cloudpickle subprocess protocol in
favour of loky's reusable process executor.  Results are spilled to disk
and read in the parent via mmap, so per-call memory cost is bounded by
free disk space rather than parent RAM.  Workers spawn lazily, share state
across calls, and on Linux receive PR_SET_PDEATHSIG so they die with the
parent (replacing the ps-grep orphan reaper).

Public ``run_exec_runner`` signature: dropped ``cwd``/``runner_path``
(never used), dropped ``max_memory_mb``/``max_output_size`` (replaced by
disk spilling), and dropped the legacy ``(value, b"", "")`` 3-tuple
return — now returns the value directly.

Closes by construction: worker recycling concerns, asyncio.Queue/Lock
event-loop binding via lru_cache, the silent one-shot retry on timeout,
the MemoryLimitExceeded string-match heuristic, undrained worker stderr,
PID-reuse races in process-tree kill, and unbounded register_pickle_by_value.

Regression tests cover: ExecRunnerError formatting includes stderr,
no memory-error string heuristic, multi-event-loop safety, timeout
without silent retry, spill-file cleanup, env scrubbing, worker
observability envelope, unpicklable-result handling, mmap large-array
round-trip, and direct exec_runner._run_one library coverage.

Tooling: ruff and ty clean; pytest-xdist compatible.
Replaces untyped ``dict[str, Any]`` payloads and error envelopes with
``WorkerCall``, ``WorkerError``, and ``WorkerResult`` frozen dataclasses.
Adds ``ExecutorConfig`` with a ``from_env()`` factory so the env-var
knobs are gathered in one declarative place rather than scattered as
module-level mutables.

Replaces the ``_apply_env`` / ``_restore_env`` mutation pair with a
``_scoped_env`` context manager so env updates are guaranteed to be
reverted on exception paths and the call-site reads top-down.

No behaviour change — public ``run_exec_runner`` signature unchanged;
loky executor lifecycle, spill semantics, env scrubbing, and worker
initialiser all identical.
…ill_dir fixture

The previous ``env=_scrub_env(...)`` arg to ``get_reusable_executor`` was
additive — workers still inherited every parent env var, including
secrets — because loky merges the dict on top of the inherited
environment rather than replacing it.  Strip non-whitelisted keys
explicitly in ``_worker_init`` so the worker's os.environ is exactly
the scrubbed set after startup.

Update the ``isolated_spill_dir`` fixture to monkeypatch the new
``_CONFIG`` ``ExecutorConfig`` instance (the previous ``SPILL_DIR``
module attribute was renamed in the dataclass refactor).

All 51 executor-suite tests now pass.
Loky installs internal signal handlers on its workers (typically blocking
SIGINT and installing a custom SIGTERM disposition) so the pool can
manage worker lifecycle.  This breaks user code that wants standard
Python semantics — KeyboardInterrupt on Ctrl-C, immediate termination
on SIGTERM/SIGHUP/SIGQUIT, BrokenPipeError on SIGPIPE.

Reset SIGINT to Python's ``default_int_handler`` (which raises
KeyboardInterrupt — caught by ``_run_one``'s ``except BaseException``
and surfaced as a structured worker error) and SIGTERM/SIGHUP/SIGQUIT/
SIGUSR1/SIGUSR2/SIGCHLD to ``SIG_DFL`` in ``_worker_init``.  Leave
SIGPIPE alone so Python's default ``SIG_IGN`` + BrokenPipeError flow
remains intact.

Two regression tests verify SIGINT and SIGTERM dispositions from inside
the worker.
Folds the two-line ``_env_int_or`` / ``_env_path_or`` helpers into
``ExecutorConfig.from_env`` as a single nested ``_pos_int`` since they
were each used once and only at module-import time.

Drops a stale ``# type: ignore[assignment]`` in the (now context-manager-
backed) env restoration path — ty flags it as unused since the
expression is well-typed.

All 53 executor tests still pass; ruff and ty clean.
…, env-scrub hardening

Three parallel audit passes surfaced fixable defects and added regression
tests.  All survive ruff, ty, and the full stages test suite (839 tests).

Correctness / concurrency:
- Cache the scrubbed worker env in a module-global ``_WORKER_ENV`` and
  pass the same dict identity to every ``get_reusable_executor`` call.
  Previously a fresh dict was built per call from ``os.environ``; loky
  treats kwargs inequality as "config changed" and tears the pool down
  on every submit when any whitelisted var mutated, breaking concurrent
  in-flight tasks with ``BrokenProcessPool``.
- Guard ``os.fdopen(fd, "wb")`` in ``_run_task`` so a fail at fdopen
  doesn't leak the descriptor.
- Move ``cloudpickle.register_pickle_by_value`` outside ``_scoped_env`` —
  it's a process-global registry mutation unrelated to env updates.

Security:
- Default spill dir is now ``$TMPDIR/gigaevo-<uid>`` (was bare
  ``$TMPDIR``), created with mode 0o700 — defeats directory-listing
  workload-metadata leaks on shared hosts.
- ``_load_spill`` opens with ``O_RDONLY | O_NOFOLLOW`` and stats via
  ``fstat`` post-open — closes a TOCTOU symlink-swap window that allowed
  arbitrary cloudpickle bytes to be unpickled by the parent (RCE).
- ``ExecutorConfig.from_env`` resolves ``..`` segments via
  ``Path.resolve(strict=False)``.
- ``_worker_init``'s prctl call now logs errno on failure instead of
  silently swallowing.
- Module docstring documents the trust boundary: user code is unsandboxed
  by design; the trust boundary lives at the LLM call upstream.

Tests added (now 71 in the executor file, 839 total in stages):
- 13 parametrised secret-scrub assertions (AWS_*, GH_TOKEN, OPENAI_API_KEY,
  ANTHROPIC_API_KEY, LANGFUSE_SECRET_KEY, WANDB_API_KEY, HF_TOKEN, STRIPE_*,
  SUPABASE_SERVICE_ROLE_KEY, and more).
- ``TestSpillDirHardening`` — default-dir-per-uid + ``..`` resolution.
- ``TestPythonPathPropagation`` — extra python_path reaches the worker.
- ``TestEnvUpdatesNoneUnsets`` — ``env_updates={K: None}`` unsets K.
- ``TestCancellationCleansSpill`` — cancellation path unlinks spill file.

Deferred (need design discussion, not in this PR):
- RLIMIT_NPROC / RLIMIT_FSIZE / RLIMIT_CORE / RLIMIT_AS opt-in caps.
- Surface ``WorkerResult`` resource accounting through the existing
  ``LogWriter`` pattern.
…auto-cap, blocking shutdown

Three parallel audits in less-trafficked corners (serialization, fork/spawn
inheritance, async lifecycle / pytest-xdist interaction) surfaced real
hazards plus several debunked concerns.  Documents and tests added for
both kinds so future readers see what's been verified.

Worker isolation (HIGH):
- `os.chdir()` and `sys.path.insert()` calls inside user code persisted
  across worker reuse — task B inherited task A's cwd and path prepends.
  ``_run_one`` now snapshots ``os.getcwd()`` and ``list(sys.path)`` on
  entry and restores in ``finally``.  Two regression tests pin this.

Async lifecycle / xdist:
- Under ``pytest-xdist`` each worker process imported wrapper.py and
  defaulted to ``cpu_count`` loky workers — on a 28-CPU / 8-xdist-worker
  host this fan-out is 224 subprocesses.  ``ExecutorConfig.from_env``
  now reads ``PYTEST_XDIST_WORKER_COUNT`` and auto-caps to
  ``max(1, cpu_count // xdist_workers)`` when the user hasn't set an
  explicit override.
- ``shutdown_executor`` gained a keyword-only ``wait: bool = False``
  parameter; ``run.py``'s finally block uses ``wait=True`` so loky's
  executor-manager thread reaps workers and fires done-callbacks
  (including ``_unlink_spill_on_done``) before redis/writer close.
  The session-scoped pytest fixture also uses ``wait=True`` so xdist
  workers don't exit while their loky children are mid-SIGKILL.

Serialization / inheritance (debunked but documented):
- ``context="loky"`` actually uses ``fork_exec(close_fds=True)`` followed
  by a re-exec — spawn semantics, not fork.  Workers inherit none of
  the parent's Redis sockets, langfuse handlers, asyncio loop state,
  loguru sinks, or atexit hooks.  Module docstring rewritten to spell
  this out so future readers don't repeat the audit.
- Protocol-5 cloudpickle without ``buffer_callback`` is verified safe —
  ``BYTEARRAY8`` opcodes copy into freshly allocated buffers, so the
  unpickled result does not alias the spill mmap.  ``buffer_callback``
  could shrink ML-payload IPC but introduces dangling-reference hazards;
  deferred until ML workloads dominate.
- ``cloudpickle._PICKLE_BY_VALUE_MODULES`` is a set keyed by name; with
  ``user_code`` reused every call there's no monotonic growth.
- ``sys.path`` accumulation, ``sys.modules`` shadowing, ``linecache``
  growth, ``register_pickle_by_value`` idempotency — all verified bounded.

Tests added (now 93 in the executor file, 861 in stages tree):
- 13 serialization regression tests (cyclic structures, structured
  dtypes, numpy memmap, closures over user classes, instances of inner
  classes, mmap-UAF stress, envelope round-trip, unpicklable result).
- 2 worker-isolation tests (cwd leak, sys.path leak).
- 5 xdist auto-cap tests + 2 ``shutdown_executor(wait=...)`` tests.

All pass under ``pytest -n 4`` xdist as well as serial execution.
…ction banners

Aggressive comment hygiene pass on a too-verbose diff.  The PR description
covers all the audit rationale; source-level comments should be one-line
WHY hints, not paragraph-length narrative.

- wrapper.py module docstring: 109 lines → 12 (drop verbose
  ``Spill backing storage`` / ``Result pickle protocol`` / ``Worker state
  lifecycle`` / ``Start method`` sections — they belong in PR body).
- ``ExecutorConfig.from_env`` docstring + inline comments: ~50 lines → 8.
- ``_worker_init`` / ``shutdown_executor`` / ``_load_spill`` docstrings
  trimmed to behavioural one-liners.
- ``_get_executor`` comment about lazy env capture: 6 lines → 0 (moved
  context to ``_WORKER_ENV`` decl above).
- ``run.py`` finally-block comment: 7 lines → 1.
- ``conftest.py`` session-fixture / ``isolated_spill_dir`` / ``fresh_executor``
  docstrings trimmed.
- ``exec_runner.py`` ``_run_one`` docstring + inline comments: 22 lines → 7.
  ``register_pickle_by_value`` rationale: 6 lines → 2.  ``WorkerError``
  docstring: 6 lines → 1.
- ``test_python_executors.py``: drop 4 ``# ===== section ====== `` banners
  (class names are the section headers), trim 12 regression-class docstrings
  to one-line summaries, drop bug-id list from module docstring.

Net: 114 insertions / 452 deletions on the diff.  Behavior unchanged;
93/93 executor tests pass.
Six surgical reverts on lines the architectural refactor never needed to
touch.  Each is pure noise (variable rename, inlined locals, dropped blank
lines, dropped pre-existing docstrings / section banners) that earlier
trim passes introduced when they should have left those lines alone.

exec_runner.py:
- ``_prepend_sys_path``: restore ``normalized_existing`` name and the
  ``| None`` type union; drop the added docstring.
- ``_iter_top_level_module_names``: restore blank line after ``return set()``.
- ``_module_belongs_to_path``: restore ``file_candidate``/``package_candidate``
  locals and the blank line.
- ``_clear_shadowed_top_level_modules``: restore the ``| None`` union and
  the blank line; drop the added docstring.
- ``_write_code_context``: drop added docstring; restore
  ``last = user_frames[-1]; lineno = last.lineno`` two-line read.
- ``_register_source``: drop added docstring.
- Imports: collapse ``import contextlib`` + ``from contextlib import …``
  into a single ``from contextlib import …, suppress`` line.

test_python_executors.py:
- Restore the module docstring's pre-existing one-line shape (was inflated
  to a paragraph).
- Restore the five ``# --- section ---`` banners stripped by earlier
  trim passes (TestRunExecRunner, TestRunExecRunnerErrors,
  TestExecRunnerErrorAttributes, TestPythonCodeExecutorStage,
  TestPythonCodeExecutorErrorPaths, TestCallFileFunctionStage,
  TestCallProgramFunctionWithFixedArgs, TestFetchMetricsAndFetchArtifact,
  TestCallValidatorFunction).
- Restore one-line docstrings on pre-existing tests that didn't need to
  change otherwise; restore inline annotations (e.g. ``# 1KB limit``,
  ``# syntax error``, ``# Create a minimal valid file...``).
- Restore the ``# Should return a ProgramStageResult failure, not raise``
  comment + late-import order in test_compute_failure_returns_stage_result.

Each restoration shrinks the diff by turning a ``+/-`` pair into an
unchanged line.  All 93 executor tests still pass.
…sses

Tests in the added classes (TestSerializationDistantCorners,
TestProtocolFiveSafety, TestXdistWorkerCountAutoCap,
TestShutdownExecutorWaitFlag) had paragraph-length docstrings explaining
the audit context behind each probe.  That context is now in the PR
description; the test name plus a one-line behavioural docstring is
enough for the in-source reader.  Test bodies and assertions unchanged.

Saved ~70 LOC of explanatory prose; 93/93 executor tests still pass.
The wrapper previously kept all pool state at module scope: _CONFIG
captured at import time, _WORKER_ENV cached on first call, the spill
directory created during import, the env whitelist hardcoded as a
frozenset, and shutdown_executor reached into loky.reusable_executor._executor
(a private upstream attribute). The result was a process-scoped singleton
with no way to construct an independent second pool with different
config — blocking multi-driver-per-host setups, complicating test
isolation, and tying the module to loky's private APIs.

LokyBackend encapsulates the pool: each instance owns its own
loky.ProcessPoolExecutor, its own scrubbed env, and its own spill
directory. Construction is lazy (executor spawns on first execute());
shutdown is idempotent. The class uses loky.ProcessPoolExecutor directly
rather than loky.get_reusable_executor — get_reusable_executor is a
module-global singleton that tears down on kwargs-mismatch, structurally
incompatible with multiple coexisting backends; ProcessPoolExecutor
allows per-instance pools. Context resolution from the string "loky" is
done via loky.backend.context.get_context, which get_reusable_executor
was doing internally.

WorkerConfig (frozen dataclass) replaces ExecutorConfig with the same
shape plus four new fields: env_whitelist (was hardcoded module
constant), env_prefixes (was hardcoded tuple), max_calls_before_recycle
(forward-compat field, not enforced today — wired up alongside the
distributed ExecutorBackend implementations), enable_pdeathsig (per-pool
opt-out for environments where PR_SET_PDEATHSIG is undesirable), pool_id
(uuid identifier for log attribution), node_id (defaults to
socket.gethostname() — needed by the upcoming worker identity tagging).
DEFAULT_ENV_WHITELIST and DEFAULT_ENV_PREFIXES are now public module
constants so deployments wanting to extend the whitelist can construct
`WorkerConfig(env_whitelist=DEFAULT_ENV_WHITELIST | {"MY_CORP_PROXY"})`
without patching the module.

The worker-side _worker_init learns the scrub rules from an env variable
(_GIGAEVO_WORKER_INIT_CONFIG) the parent encodes — required because
loky's env= kwarg is additive, not replace, so the worker must scrub
itself, and the worker-init function must be picklable (no closure over
self.config). Init config is base64-encoded JSON of {whitelist,
prefixes, pdeathsig}; the function falls back to module defaults if the
env var is absent or malformed, so direct loky usage outside this module
still gets sensible behavior.

Module-level run_exec_runner and shutdown_executor become thin wrappers
over a process-scoped default singleton (default_loky_backend()).
Existing call sites — runtime_metrics.py, optimization/utils.py,
optimization/optuna/stage.py, execution.py — are unchanged. Two
backward-compat shims (`_CONFIG` proxy and `_get_executor()` function)
keep test code that reaches for the old internals working.

The isolated_spill_dir fixture is rewritten to replace the default
singleton with a per-test LokyBackend whose WorkerConfig points at the
test's tmp_path; monkeypatch restores the original singleton on
teardown. Cleaner than the previous dataclass-replace-and-monkeypatch
pattern and exercises the new multi-instance code path.

TestLokyBackendIsolation adds nine new tests: two backends have
independent executors and run user code that returns different values;
shutting down one does not affect the sibling; pool_id is unique per
instance; node_id defaults to socket.gethostname() and is overridable;
env_whitelist and env_prefixes accept extensions; a custom whitelist
entry actually reaches the worker process; the default singleton is
lazy. All 93 prior tests pass unchanged.
Each worker process now generates a stable worker_id at spawn time
(uuid4 hex prefix) and inherits the parent's node_id (from
WorkerConfig). Both are surfaced through three channels:

- WorkerResult gains worker_id and node_id fields populated by _run_task
  from the worker's environment. Every result envelope returned to the
  driver carries the identity of the worker that produced it.
- os.environ in the worker process exposes GIGAEVO_WORKER_ID and
  GIGAEVO_NODE_ID so user code can read them too. Both fall under the
  GIGAEVO_ prefix that the env scrub passes through, so they survive
  the scrub _worker_init runs.
- The parent's trace log line in LokyBackend.execute now includes
  pool_id, node_id, worker_id, and worker_pid in the prefix:
  [LokyBackend:{pool}|{node}:{worker}:{pid}].

Mechanism: _worker_init reads node_id from the same encoded init-config
blob that already carried the env scrub rules; it generates worker_id
via uuid.uuid4().hex[:12] and writes both into os.environ before user
code runs. Identity is stable for the lifetime of a worker process
across multiple reused calls; recycled workers get new ids on respawn.

Four new TestWorkerIdentity tests: worker_id is populated and 12-char
uuid prefix; node_id from WorkerConfig propagates into the worker's
GIGAEVO_NODE_ID env var; two workers in the same pool have distinct
worker_ids; a worker reused across three calls reports the same
worker_id and the same PID.
…+ outcome

WorkerResult previously bundled the success-or-failure outcome together
with the resource/timing metrics in one flat dataclass. The shape was
awkward in three ways: success/failure discrimination was implicit
(callers had to check which of spill_path/error was None); metrics were
structurally coupled to the local-file outcome representation, so a
future remote-execution backend that returns results via HTTP body
rather than spill files would inherit dead state; and adding a new
metric field required modifying the dataclass and all callers.

New ExecutionMetrics dataclass with the seven metric fields plus
worker_id and node_id (already populated since C2), plus
started_at_ns and finished_at_ns timestamps. The timestamps enable
downstream timing analysis without recomputing — wall_time_s gives
duration but does not pin to wall-clock anchors. WorkerResult now
holds spill_path, error, and metrics; legacy property accessors
(peak_rss_kb, wall_time_s, user_time_s, sys_time_s, worker_pid,
worker_id, node_id) read through to metrics so existing callers and
tests that access fields directly on WorkerResult continue to work.

Three new tests in TestWorkerIdentity: metrics carry worker_id,
node_id, worker_pid, started_at_ns, finished_at_ns; wall_time_s
matches (finished - started) / 1e9 within scheduler slack; metrics
are populated even when the worker call fails; legacy property
accessors read through to metrics. One existing serialisation test
updated to use the new ExecutionMetrics constructor. All 109 tests
pass.
New backend.py module declaring ExecutorBackend as a runtime-checkable
Protocol with two core methods (execute / shutdown) plus three
lifecycle-hook registration methods (on_submit / on_complete /
on_shutdown). LokyBackend gains conformance by implementing all of
them; isinstance(backend, ExecutorBackend) now returns True at runtime.

The Protocol is the swap-point that follow-up PRs will plug
distributed backends into (Redis-Stream-mediated remote workers, HTTP
submission services, Ray actors). No call site in this PR uses a
non-loky implementation; the abstraction exists so call sites already
take an injection parameter, not because there are multiple
implementations today.

run_exec_runner gains backend: ExecutorBackend | None = None kwarg.
None preserves the historical default-singleton path; an explicit
backend swaps in the caller's choice. Used by the new test
TestWorkerIdentity.test_custom_backend_injected_into_run_exec_runner
to inject a FakeBackend that implements the Protocol structurally
without inheriting from any base class — that's the whole point of
the Protocol.

Lifecycle hooks on LokyBackend:

- on_submit fires immediately before the call reaches the loky
  executor, with the WorkerCall being submitted.
- on_complete fires after execution finishes, with the WorkerCall,
  the ExecutionMetrics, and an Exception (None on success, the
  raised ExecRunnerError / TimeoutError / CancelledError on failure).
  Even the cancel branch fires it — observers learn that the
  attempted work didn't complete cleanly. When metrics are not
  available (timeout / cancel before any worker reported back), a
  zeroed ExecutionMetrics is constructed with the parent's node_id
  so the handler signature stays uniform.
- on_shutdown fires once when the pool is torn down; idempotent
  second shutdown does not re-fire. A sentinel clears the handler
  list after the first firing.

Hooks fire synchronously in the parent process. A handler that
raises is logged via logger.exception and other handlers in the
same event fire normally; one bad observer does not silence the
rest nor break the execute() path. This is enforced by the
test_handler_exception_does_not_break_siblings case.

Per-worker-process lifecycle (on_worker_start / on_worker_exit) is
deliberately absent: it requires cross-process signalling that has
no immediate consumer on local loky, and becomes natural to wire
up only when the worker layer itself emits network events. The
upcoming RedisStreamWorkerBackend will surface those as XADD events
to a side channel; the local loky backend skips them.

Eight new tests cover:
- ExecutorBackend protocol conformance via runtime isinstance.
- run_exec_runner accepts custom backend via the new kwarg.
- on_submit, on_complete (success), on_complete (failure with
  ExecRunnerError) fire correctly.
- on_shutdown fires once per shutdown; idempotent second shutdown
  does not re-fire.
- Multiple handlers on one event all fire.
- An exception in one handler does not silence siblings or break
  execution.

Total tests in suite: 117 (was 109). Ruff clean.
…cture

After C1-C5 the wrapper module hosts more than just the loky pool: a
WorkerConfig dataclass, an ExecutionMetrics block on WorkerResult,
lifecycle hooks on LokyBackend, and a backend kwarg on
run_exec_runner that opens the door to non-loky implementations. The
old docstring described only the singleton-pool model and didn't
mention any of these.

Updated to reference the ExecutorBackend protocol, the configuration
object, the metrics envelope, and the lifecycle subscription API.
Calls out backend.py as the swap-point for future distributed
implementations.
…ty, dead code

Cleanup of five issues surfaced by re-reading the C1-C5 commits with a
critical lens.

CompleteHandler signature semantics: the type was
Callable[[WorkerCall, ExecutionMetrics, BaseException | None], None].
For timeout / cancel paths and the new pre-submit-failure path, the
call did not produce a result envelope, so the metrics block was
synthesized with peak_rss_kb=0, wall_time_s=0.0, worker_pid=0,
worker_id="". An observer could not distinguish "real but trivial
work" from "no metrics available". Changed to
ExecutionMetrics | None; the zero-synth is dropped. Observers get
None and exc together as the signal "this call did not finish
normally". The on_complete handler in the failure-path tests is
updated accordingly.

Hook imbalance on _get_executor() failure: execute() fires
on_submit before constructing the loky pool. If pool construction
fails (spill_dir.mkdir denied by filesystem permissions, loky's
ProcessPoolExecutor __init__ raises, get_context fails), on_complete
was never fired — observer's submit/complete counts drifted apart
permanently. Wrapped _get_executor() + executor.submit() in
try/except; on failure, _fire_complete(call, None, exc) before
re-raising. New test
test_on_complete_fires_with_none_metrics_on_pre_submit_failure
monkeypatches _get_executor to raise and asserts exactly one submit
event and exactly one complete event with None metrics.

Dead conformance check removed: _assert_loky_conforms() was defined
but never called. The internal `_: ExecutorBackend = LokyBackend.__new__(LokyBackend)`
annotation was a static-only check that pyright would see at the
function definition site, but the runtime intent ("fail fast at import
if LokyBackend stops conforming") never executed because the function
was never called. The test_executor_backend_protocol_conformance case
already covers runtime conformance via isinstance, so the dead
function is removed.

Hook event name in error logs: _fire previously logged
"[LokyBackend:{pool_id}] hook raised" without saying which event
(submit / complete / shutdown) the misbehaving handler was attached
to. Threaded the event name through _fire's signature; error logs
now read "[LokyBackend:{pool_id}] {event} hook raised". Trivial
debugging-convenience improvement.

execution.py PythonCodeExecutor docstring: said "loky-managed
subprocess" — accurate before C4 when loky was the only path.
After C4 the dispatch goes through ExecutorBackend; the loky
implementation is the default but not the only option. Docstring
updated to reflect the abstraction.

Total: ~50 LOC code change, ~45 LOC test addition. 118/118 tests
pass (was 117). Ruff clean.
Split ExecutorBackend Protocol — required core (execute / shutdown) and
opt-in LifecycleObservable (the three hooks).  Backends with no useful
events to emit no longer need three no-op registrations; callers gate
hook wiring with isinstance(backend, LifecycleObservable).

Drop max_calls_before_recycle from WorkerConfig.  It was a
forward-compat placeholder with no enforcement — YAGNI.  Add the field
back with real recycle logic when a backend actually needs it.

Drop base64 wrapping around the worker init-config env var.  POSIX env
values accept any byte except NUL and execve does not re-tokenise them,
so raw JSON passes through unmodified.  Removes one import and one
decode-failure branch.

Surface GIGAEVO_EXECUTOR_NODE_ID and GIGAEVO_EXECUTOR_DISABLE_PDEATHSIG
in WorkerConfig.from_env so k8s downward-API and debugging contexts can
override identity / pdeathsig without programmatic construction.

Document peak_rss_kb semantics accurately — it's the worker
lifetime-watermark RSS from ru_maxrss, not a per-call peak; loky reuses
workers across calls so the value for call N includes peaks from
earlier calls on the same worker.

Use time.perf_counter_ns consistently in _run_task — drop the redundant
time.monotonic clock and derive wall_time_s from finished_at_ns minus
started_at_ns, eliminating cross-clock skew.

Document thread-safety semantics (single asyncio loop; no internal
locking) on the wrapper module and hook-lifetime semantics
(handlers drop with the singleton) on shutdown_executor.

Tests: hook firing order is registration order; from_env identity and
safety knobs honour their env vars; empty NODE_ID falls back to
hostname; non-truthy DISABLE_PDEATHSIG keeps pdeathsig enabled.
135 / 903 stages tests pass, ruff clean.
@GrigoryEvko

Copy link
Copy Markdown
Author

Pushed an additional cleanup commit (8f2d3b0) tightening the Protocol surface and a handful of accuracy / consistency fixes that came out of a self-review pass.

Protocol split

ExecutorBackend is now the minimal required contract (execute + shutdown). The three lifecycle hooks moved to a separate, opt-in LifecycleObservable Protocol. Backends with no useful events to emit no longer need three no-op registration methods; callers gate hook wiring with isinstance(backend, LifecycleObservable). LokyBackend conforms to both.

Dropped surface

  • WorkerConfig.max_calls_before_recycle was a forward-compat placeholder with no enforcement path. Removed — re-introduce alongside actual recycle logic when a backend needs it.
  • Base64 wrapping around _GIGAEVO_WORKER_INIT_CONFIG: POSIX env values accept any byte except NUL and execve does not re-tokenise them, so raw JSON passes through unmodified. Drops one import and one decode-failure branch.

Accuracy fixes

  • ExecutionMetrics.peak_rss_kb is sourced from ru_maxrss, which is a worker-process lifetime watermark, not a per-call peak. The docstring now states this directly — loky reuses workers, so the value for call N includes peaks from earlier calls on the same worker. An accurate per-call peak would need RSS sampling during execution (e.g. psutil); deferred until telemetry actually requires it.
  • _run_task now uses a single time.perf_counter_ns clock end-to-end. wall_time_s is derived from (finished_at_ns - started_at_ns) / 1e9 rather than a parallel time.monotonic reading; eliminates cross-clock skew.

from_env() coverage

Two new env-driven knobs so k8s downward-API and debugging contexts do not require programmatic WorkerConfig construction:

  • GIGAEVO_EXECUTOR_NODE_ID — overrides socket.gethostname(). Empty / whitespace falls back to hostname.
  • GIGAEVO_EXECUTOR_DISABLE_PDEATHSIG — truthy values (1 / true / yes / on, case-insensitive) opt out of PR_SET_PDEATHSIG; useful when running workers outside a parent-supervised context.

Documentation

  • Module docstring records that LokyBackend is designed for a single asyncio event loop — the lazy _executor creation, the hook lists, and the module-level singleton are not internally locked.
  • shutdown_executor docstring records hook-lifetime semantics: dropping the default singleton releases its registered handlers; the next call to default_loky_backend() constructs a fresh backend with empty hook lists.

Test additions

  • Hook firing order is now pinned: handlers fire in registration order across all three channels.
  • Six new from_env cases cover NODE_ID default / override / empty-fallback and DISABLE_PDEATHSIG truthy / non-truthy variants (parameterised, 15 invocations total).

135 tests in test_python_executors.py pass; full tests/stages/ is 903 green. Ruff clean.

GrigoryEvko added a commit to GrigoryEvko/gigaevo-core that referenced this pull request May 16, 2026
The 1-line modifier added in 9882855 is in the legacy ``WorkerPool``
path of ``wrapper.py``.  PR FusionBrainLab#14 (loky-executor) rewrites the entire
module — the ``run_exec_runner`` function this raise statement lives
in is replaced by ``LokyBackend.execute``, and the cloudpickle-loads
site moves to ``_load_spill``.  Keeping the modifier here forces a
merge conflict with FusionBrainLab#14 regardless of order; removing it lets either
PR merge independently against ``main``.

The equivalent ``from e`` is worth re-adding inside the post-FusionBrainLab#14
``LokyBackend`` cloudpickle path as a follow-up after either PR
merges.
@GrigoryEvko

Copy link
Copy Markdown
Author

Conflict map vs PR #10 (fix/llm-output-sanitization)

File Hunks Type
gigaevo/programs/stages/python_executors/execution.py 3 Structural — this PR removes the memory-detection block (loky's LokyBackend does not expose max_memory_mb); PR #10 adds sanitize_for_log wrappers to the same log statements

Resolution (whichever PR merges second):

Take this PR's simplified structure (no memory-detection block, no error_type / error_msg locals) and re-apply PR #10's sanitize_for_log to the two logger.warning interpolation arguments:

except ExecRunnerError as e:
    logger.warning(
        "[{}] SubprocessError FAILED for {}: {}",
        stage_name,
        program.id[:8],
        sanitize_for_log(str(e)[:200]),
    )
    ...
except Exception as e:
    logger.warning(
        "[{}] Exception for {}: {}",
        stage_name,
        program.id[:8],
        sanitize_for_log(str(e)[:200]),
    )
    ...

The conflict is intrinsic — same lines edited for orthogonal reasons (executor refactor vs control-byte sanitisation). It is not eliminable at the branch level without restructuring this PR's __init__ signature. Merge order is interchangeable; the second-merged PR needs the ~10-line manual union shown above.

GrigoryEvko added a commit to GrigoryEvko/gigaevo-core that referenced this pull request May 17, 2026
…elper

The last live bypass of bug class FusionBrainLab#14 (gigaevo/evolution/bus/node.py:120
direct program.state = ProgramState.DONE on cross-engine migrant
ingestion). The migrant is a freshly-rehydrated Python object with no
prior in-run FSM history, so the in-run FSM table can't validate the
transition — the migrant arrives in whatever state the originating
engine last persisted (typically QUEUED), and the receiving engine
must mark it DONE so its own evolution loop ignores it.

state_manager.register_external_terminal_state(program, new_state) is
the named cross-run bypass: rejects non-terminal targets, mutates the
in-memory Program, no Redis I/O. Greppable so any future contributor
adding cross-run state transfer has one place to read.

bus/node.py:120 now calls the named helper; the bypass is no longer
anonymous.

4 tests pin the helper's rejection of non-terminal targets and the
node-level routing through it.
@GrigoryEvko

Copy link
Copy Markdown
Author

Heads-up on test-side integration with #10 (fix/llm-output-sanitization): this PR drops max_memory_mb from evaluate_single's signature (intentionally — replaced by disk spilling). #10 adds tests/stages/test_sanitize_integration.py that passes max_memory_mb=None to that function. After both merge: TypeError: evaluate_single() got an unexpected keyword argument 'max_memory_mb'. Whichever lands second: drop the kwarg from the test call (line 356 on #10's branch). Posted parallel note on #10.

Separately on integration with #20 (feat/dataplane-foundation): #20 reintroduces WorkerPool + ambient-pool semantics for the executor (different motivation — DataPlane lifecycle ownership). The three pool=None ambient-fallback tests added by #20 in tests/stages/test_exec_runner_pool.py won't apply against this PR's loky-based run_exec_runner (no pool parameter). #20 or this PR will need to reconcile on rebase.

GrigoryEvko added a commit to GrigoryEvko/gigaevo-core that referenced this pull request May 28, 2026
run_exec_runner has dispatched through LokyBackend exclusively since the
loky cutover (bdf8185); the WorkerPool class, its persistent-subprocess
helpers, the _ambient_pool ContextVar with its set/get/reset, and the
object_graph wiring that built and shut the pool down were never
consulted on the dispatch path. The pool was instantiated and stored in
a ContextVar purely because the merge that reconciled FusionBrainLab#14 against the
rest of nightly never finished pruning the old surface.

Removes ~416 lines (wrapper.py: 187, object_graph.py: 29, test_exec_runner_pool.py:
185, test_run_experiment_lifecycle.py: 13 net) for 23 lines of comment/
docstring rewording. Full suite still passes; the renamed lifecycle test
(does_not_leak_pool -> does_not_leak_threads) preserves the thread-leak
invariant which was the part with actual behavior.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants