Skip to content
46 changes: 27 additions & 19 deletions src/apify/scrapy/_async_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,26 @@ def run_coro(
The result returned by the coroutine.

Raises:
RuntimeError: If the event loop is not running.
RuntimeError: If the event loop has been closed.
TimeoutError: If the coroutine does not complete within the timeout.
Exception: Any exception raised during coroutine execution.
"""
if timeout == 'default':
timeout = self._default_timeout

if not self._eventloop.is_running():
raise RuntimeError(f'The coroutine {coro} cannot be executed because the event loop is not running.')
if self._eventloop.is_closed():
raise RuntimeError(f'The coroutine {coro} cannot be executed because the event loop is closed.')

# Submit the coroutine to the event loop running in the other thread.
future = asyncio.run_coroutine_threadsafe(coro, self._eventloop)
try:
# Wait for the coroutine's result until the specified timeout.
return future.result(timeout=timeout.total_seconds())
except futures.TimeoutError as exc:
logger.exception('Coroutine execution timed out.', exc_info=exc)
raise
except Exception as exc:
logger.exception('Coroutine execution raised an exception.', exc_info=exc)
except futures.TimeoutError:
# `future.result` gave up, but the coroutine keeps running on the loop; cancel it so it does
# not outlive the timeout. The propagated error is logged once by the caller (or Scrapy), so
# this method does not log it itself.
future.cancel()
raise

def close(self, timeout: timedelta = timedelta(seconds=60)) -> None:
Expand All @@ -83,20 +83,28 @@ def close(self, timeout: timedelta = timedelta(seconds=60)) -> None:
Args:
timeout: The maximum number of seconds to wait for the event loop thread to exit.
"""
if self._eventloop.is_running():
# Cancel all pending tasks in the event loop.
self.run_coro(self._shutdown_tasks())
# A repeated close (e.g. a retried shutdown) would call into the already-closed loop and raise
# `RuntimeError: Event loop is closed`. The loop closes itself once it stops, so a second close
# is a no-op.
if self._eventloop.is_closed():
return

# Schedule the event loop to stop.
self._eventloop.call_soon_threadsafe(self._eventloop.stop)
try:
if self._eventloop.is_running():
# Cancel all pending tasks in the event loop, honouring the caller's timeout.
self.run_coro(self._shutdown_tasks(), timeout=timeout)
finally:
# Stop the loop and join its thread even if cancelling the pending tasks above raised or timed
# out. Skipping this would leave the loop running and leak its thread.
self._eventloop.call_soon_threadsafe(self._eventloop.stop)

# Wait for the event loop thread to finish execution.
self._thread.join(timeout=timeout.total_seconds())
# Wait for the event loop thread to finish execution.
self._thread.join(timeout=timeout.total_seconds())

# If the thread is still running after the timeout, force a shutdown.
if self._thread.is_alive():
logger.warning('Event loop thread did not exit cleanly! Forcing shutdown...')
self._force_exit_event_loop()
# If the thread is still running after the timeout, force a shutdown.
if self._thread.is_alive():
logger.warning('Event loop thread did not exit cleanly! Forcing shutdown...')
self._force_exit_event_loop()

def _start_event_loop(self) -> None:
"""Set up and run the asyncio event loop in the dedicated thread."""
Expand Down
106 changes: 66 additions & 40 deletions src/apify/scrapy/extensions/_httpcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import io
import re
import struct
import traceback
from datetime import timedelta
from logging import getLogger
from time import time
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -38,6 +40,8 @@ def __init__(self, settings: BaseSettings) -> None:
# Upper bound on how many keys the per-spider-close cleanup sweeps (best-effort; `close_spider`).
self._expiration_max_items: int = settings.getint('APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS', 100)
self._expiration_secs: int = settings.getint('HTTPCACHE_EXPIRATION_SECS')
self._async_thread_timeout = timedelta(seconds=settings.getint('APIFY_ASYNC_THREAD_TIMEOUT_SECS', 60))
"""Caps how long each coroutine run on the background event loop may take; defaults to 60 seconds."""
self._spider: Spider | None = None
self._kvs: KeyValueStore | None = None
self._fingerprinter: RequestFingerprinterProtocol | None = None
Expand All @@ -62,55 +66,69 @@ async def open_kvs() -> KeyValueStore:
return await KeyValueStore.open(name=kvs_name)

logger.debug("Starting background thread for cache storage's event loop")
self._async_thread = AsyncThread()
self._async_thread = AsyncThread(default_timeout=self._async_thread_timeout)
logger.debug(f"Opening cache storage's {kvs_name!r} key value store")
self._kvs = self._async_thread.run_coro(open_kvs())
try:
self._kvs = self._async_thread.run_coro(open_kvs())
except Exception:
# Opening the key-value store failed, so close the freshly started async thread instead of
# leaking its event-loop thread (`close_spider` may never run if `open_spider` fails).
self._async_thread.close()
traceback.print_exc()
raise

def close_spider(self, _: Spider, current_time: int | None = None) -> None:
"""Close the cache storage for a spider."""
if self._async_thread is None:
raise ValueError('Async thread not initialized')

logger.info(f'Cleaning up cache items (max {self._expiration_max_items})')
if self._expiration_secs > 0:
if current_time is None:
current_time = int(time())

async def expire_kvs() -> None:
if self._kvs is None:
raise ValueError('Key value store not initialized')
# Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order,
# so stale entries may linger. This only reclaims storage; `retrieve_response` already treats
# an expired entry as a cache miss.
processed = 0
async for item in self._kvs.iterate_keys():
if processed >= self._expiration_max_items:
break
processed += 1
value = await self._kvs.get_value(item.key)
try:
gzip_time = read_gzip_time(value)
except Exception as e:
logger.warning(f'Malformed cache item {item.key}: {e}')
await self._kvs.delete_value(item.key)
else:
if self._expiration_secs < current_time - gzip_time:
logger.debug(f'Expired cache item {item.key}')
# The cleanup sweep runs inside `try` so a failure there cannot skip closing the async thread
# (which would leak its event-loop thread); `close` always runs in the `finally`.
try:
if self._expiration_secs > 0:
if current_time is None:
current_time = int(time())

async def expire_kvs() -> None:
if self._kvs is None:
raise ValueError('Key value store not initialized')
# Best-effort cleanup: at most `_expiration_max_items` keys per close, in no guaranteed order,
# so stale entries may linger. This only reclaims storage; `retrieve_response` already treats
# an expired entry as a cache miss.
processed = 0
async for item in self._kvs.iterate_keys():
if processed >= self._expiration_max_items:
break
processed += 1
value = await self._kvs.get_value(item.key)
try:
gzip_time = read_gzip_time(value)
except Exception as e:
logger.warning(f'Malformed cache item {item.key}: {e}')
await self._kvs.delete_value(item.key)
else:
logger.debug(f'Valid cache item {item.key}')

self._async_thread.run_coro(expire_kvs())

logger.debug('Closing cache storage')
try:
self._async_thread.close()
except KeyboardInterrupt:
logger.warning('Shutdown interrupted by KeyboardInterrupt!')
except Exception:
logger.exception('Exception occurred while shutting down cache storage')
if self._expiration_secs < current_time - gzip_time:
logger.debug(f'Expired cache item {item.key}')
await self._kvs.delete_value(item.key)
else:
logger.debug(f'Valid cache item {item.key}')

try:
self._async_thread.run_coro(expire_kvs())
except Exception:
traceback.print_exc()
raise
finally:
logger.debug('Cache storage closed')
logger.debug('Closing cache storage')
try:
self._async_thread.close()
except KeyboardInterrupt:
logger.warning('Shutdown interrupted by KeyboardInterrupt!')
except Exception:
logger.exception('Exception occurred while shutting down cache storage')
finally:
logger.debug('Cache storage closed')

def retrieve_response(self, _: Spider, request: Request, current_time: int | None = None) -> Response | None:
"""Retrieve a response from the cache storage."""
Expand All @@ -122,7 +140,11 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non
raise ValueError('Request fingerprinter not initialized')

key = self._fingerprinter.fingerprint(request).hex()
value = self._async_thread.run_coro(self._kvs.get_value(key))
try:
value = self._async_thread.run_coro(self._kvs.get_value(key))
except Exception:
traceback.print_exc()
raise

if value is None:
logger.debug('Cache miss', extra={'request': request})
Expand Down Expand Up @@ -169,7 +191,11 @@ def store_response(self, _: Spider, request: Request, response: Response) -> Non
'body': response.body,
}
value = to_gzip(data)
self._async_thread.run_coro(self._kvs.set_value(key, value))
try:
self._async_thread.run_coro(self._kvs.set_value(key, value))
except Exception:
traceback.print_exc()
raise


def to_gzip(data: dict, mtime: int | None = None) -> bytes:
Expand Down
16 changes: 14 additions & 2 deletions src/apify/scrapy/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import traceback
from datetime import timedelta
from logging import getLogger
from typing import TYPE_CHECKING

Expand All @@ -15,6 +16,7 @@
from apify.storages import RequestQueue

if TYPE_CHECKING:
from scrapy.crawler import Crawler
from scrapy.http.request import Request
from twisted.internet.defer import Deferred

Expand All @@ -27,7 +29,7 @@ class ApifyScheduler(BaseScheduler):
This scheduler requires the asyncio Twisted reactor to be installed.
"""

def __init__(self) -> None:
def __init__(self, async_thread_timeout: timedelta = timedelta(seconds=60)) -> None:
if not is_asyncio_reactor_installed():
raise ValueError(
f'{ApifyScheduler.__qualname__} requires the asyncio Twisted reactor. '
Expand All @@ -38,7 +40,17 @@ def __init__(self) -> None:
self.spider: Spider | None = None

# A thread with the asyncio event loop to run coroutines on.
self._async_thread = AsyncThread()
self._async_thread = AsyncThread(default_timeout=async_thread_timeout)

@classmethod
def from_crawler(cls, crawler: Crawler) -> ApifyScheduler:
"""Create the scheduler, reading the async-thread timeout from the Scrapy settings.

The `APIFY_ASYNC_THREAD_TIMEOUT_SECS` setting (in seconds) caps how long each coroutine run on the
background event loop may take before timing out; it defaults to 60 seconds.
"""
timeout_secs = crawler.settings.getint('APIFY_ASYNC_THREAD_TIMEOUT_SECS', 60)
return cls(async_thread_timeout=timedelta(seconds=timeout_secs))

def open(self, spider: Spider) -> Deferred[None] | None:
"""Open the scheduler.
Expand Down
76 changes: 76 additions & 0 deletions tests/unit/scrapy/extensions/test_httpcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io
import json
import pickle
from datetime import timedelta
from time import time
from types import SimpleNamespace
from typing import TYPE_CHECKING, Any, cast
Expand Down Expand Up @@ -274,6 +275,81 @@ def test_close_spider_respects_max_items() -> None:
assert len(kvs.deleted) == 2


def test_close_spider_closes_thread_even_when_cleanup_fails() -> None:
"""If the expiration sweep raises, the async thread is still closed rather than leaked."""
closed: list[bool] = []

class _FailingAsyncThread:
def run_coro(self, coro: Any, *_: Any, **__: Any) -> Any:
coro.close() # we never run it; just avoid an un-awaited coroutine warning
raise RuntimeError('cleanup boom')

def close(self, *_: Any, **__: Any) -> None:
closed.append(True)

storage = ApifyCacheStorage(Settings({'HTTPCACHE_EXPIRATION_SECS': 100}))
storage._async_thread = _FailingAsyncThread() # ty: ignore[invalid-assignment]
storage._kvs = _FakeKvs(None) # ty: ignore[invalid-assignment]

with pytest.raises(RuntimeError, match='cleanup boom'):
storage.close_spider(None, current_time=1000) # ty: ignore[invalid-argument-type]

assert closed == [True]


def test_cache_storage_reads_async_thread_timeout_setting() -> None:
"""`APIFY_ASYNC_THREAD_TIMEOUT_SECS` is read into the storage's async-thread timeout."""
storage = ApifyCacheStorage(Settings({'APIFY_ASYNC_THREAD_TIMEOUT_SECS': 77}))
assert storage._async_thread_timeout == timedelta(seconds=77)


def test_open_spider_passes_timeout_to_async_thread(monkeypatch: pytest.MonkeyPatch) -> None:
"""`open_spider` constructs the async thread with the configured timeout."""
captured: dict[str, Any] = {}

class _RecordingAsyncThread:
def __init__(self, default_timeout: timedelta | None = None) -> None:
captured['default_timeout'] = default_timeout

def run_coro(self, coro: Any, *_: Any, **__: Any) -> Any:
coro.close() # we never run it; just avoid an un-awaited coroutine warning
return _FakeKvs(None)

monkeypatch.setattr('apify.scrapy.extensions._httpcache.AsyncThread', _RecordingAsyncThread)

storage = ApifyCacheStorage(Settings({'APIFY_ASYNC_THREAD_TIMEOUT_SECS': 77}))
spider = SimpleNamespace(name='myspider', crawler=SimpleNamespace(request_fingerprinter=_FakeFingerprinter()))
storage.open_spider(cast('Any', spider))

assert captured['default_timeout'] == timedelta(seconds=77)


def test_open_spider_closes_async_thread_when_open_kvs_fails(monkeypatch: pytest.MonkeyPatch) -> None:
"""If opening the key-value store fails, `open_spider` closes the async thread rather than leaking it."""
closed: list[bool] = []

class _FailingAsyncThread:
def __init__(self, default_timeout: timedelta | None = None) -> None:
pass

def run_coro(self, coro: Any, *_: Any, **__: Any) -> Any:
coro.close() # we never run it; just avoid an un-awaited coroutine warning
raise RuntimeError('open boom')

def close(self, *_: Any, **__: Any) -> None:
closed.append(True)

monkeypatch.setattr('apify.scrapy.extensions._httpcache.AsyncThread', _FailingAsyncThread)

storage = ApifyCacheStorage(Settings())
spider = SimpleNamespace(name='myspider', crawler=SimpleNamespace(request_fingerprinter=_FakeFingerprinter()))

with pytest.raises(RuntimeError, match='open boom'):
storage.open_spider(cast('Any', spider))

assert closed == [True]


@pytest.mark.parametrize(
('spider_name', 'expected'),
[
Expand Down
Loading
Loading