diff --git a/.spelling b/.spelling index 82bf70418..0a4bc784e 100644 --- a/.spelling +++ b/.spelling @@ -358,6 +358,7 @@ mockable mockall modularity moka +moka's monomorphization monomorphize monomorphized diff --git a/Cargo.lock b/Cargo.lock index c1258435f..347ab4e70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -509,6 +509,7 @@ dependencies = [ "cachet_service", "cachet_tier", "criterion", + "dashmap", "dynosaur", "futures", "layered", @@ -522,7 +523,6 @@ dependencies = [ "seatbelt", "serde", "testing_aids", - "thread_aware", "tick", "tokio", "tracing", diff --git a/crates/cachet/Cargo.toml b/crates/cachet/Cargo.toml index c5cd5210d..da626a34b 100644 --- a/crates/cachet/Cargo.toml +++ b/crates/cachet/Cargo.toml @@ -56,7 +56,6 @@ parking_lot = { workspace = true } pin-project-lite = { workspace = true } postcard = { workspace = true, optional = true } serde = { workspace = true, optional = true, features = ["derive"] } -thread_aware = { workspace = true } tick = { workspace = true, features = [] } tracing = { workspace = true, optional = true } uniflight = { workspace = true } @@ -67,6 +66,7 @@ bytesbuf = { path = "../bytesbuf" } cachet_memory = { path = "../cachet_memory" } cachet_tier = { path = "../cachet_tier", features = ["test-util"] } criterion = { workspace = true } +dashmap = { workspace = true } dynosaur = { workspace = true } opentelemetry = { workspace = true, features = [ "metrics", @@ -81,7 +81,7 @@ testing_aids = { path = "../testing_aids" } tick = { path = "../tick", features = ["test-util", "tokio"] } tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } tracing = { workspace = true, features = ["std"] } -tracing-subscriber = { workspace = true } +tracing-subscriber = { workspace = true, features = ["fmt", "registry"] } tracing-test = { workspace = true, features = ["no-env-filter"] } [[bench]] @@ -159,6 +159,10 @@ required-features = ["memory", "test-util"] name = "telemetry_subscriber" required-features = ["memory", "logs"] +[[example]] +name = "telemetry_accumulator" +required-features = ["memory", "logs"] + [[example]] name = "serialization" required-features = ["memory", "serialize"] diff --git a/crates/cachet/README.md b/crates/cachet/README.md index 9931e8069..e81dfc1d2 100644 --- a/crates/cachet/README.md +++ b/crates/cachet/README.md @@ -14,7 +14,7 @@ A composable, multi-tier caching library with stampede protection, background -refresh, and built-in OpenTelemetry telemetry. +refresh, and structured telemetry. ## Why Multi-Tier Caching? @@ -89,7 +89,7 @@ builds on top of them and adds: |Stampede protection|❌|✅| |Background refresh|❌|✅| |Service middleware integration|❌|✅| -|Structured telemetry (tracing)|❌|✅| +|Structured telemetry|❌|✅| |Pluggable storage backends|❌|✅| |Clock injection for testing|❌|✅| @@ -228,12 +228,16 @@ cache.insert("key".to_string(), "value".to_string()).await?; ## Telemetry -Enable with the `logs` feature and `.enable_logs()` on the cache builder. +Cachet provides two complementary telemetry channels: + +### Tracing events -Each cache operation emits a structured [`tracing`][__link20] event with fields -`cache.name`, `cache.event`, and `cache.duration_ns`. +Enable with the `logs` feature and `.enable_logs()` on the cache builder. +Each tier outcome and operation completion emits a structured [`tracing`][__link20] event. -### Subscribing to events +**Tier events** carry `cache.name`, `cache.event`, and `cache.duration_ns`. +**Operation-complete events** carry `cache.name`, `cache.operation`, +`cache.duration_ns`, and `cache.coalesced`. Use [`telemetry::attributes`][__link21] constants to filter and match events in a custom `tracing_subscriber::Layer`: @@ -251,21 +255,32 @@ if event_value == attributes::EVENT_HIT { /* cache hit */ } See the `telemetry_subscriber` example for a complete demonstration. -### Event types +#### Event types |Level|Events| |-----|------| |ERROR|`cache.get_error`, `cache.insert_error`, `cache.invalidate_error`, `cache.clear_error`| -|INFO|`cache.expired`, `cache.refresh_miss`, `cache.inserted`, `cache.insert_rejected`, `cache.invalidated`, `cache.fallback`, `cache.eviction`| +|INFO|`cache.expired`, `cache.refresh_miss`, `cache.inserted`, `cache.insert_rejected`, `cache.invalidated`, `cache.eviction`| |DEBUG|`cache.hit`, `cache.miss`, `cache.refresh_hit`, `cache.cleared`| +### Event handler callback API + +Register a [`CacheEventHandler`][__link22] via +`.event_handler(handler)` on the cache builder to receive typed +[`CacheTierEvent`][__link23] and +[`CacheOperationEvent`][__link24] callbacks. +Events carry a `request_id` for correlating tier outcomes with their parent +operation. Works independently of the `logs` feature. + +See the `telemetry_accumulator` example for a DashMap-based accumulation pattern. +
This crate was developed as part of The Oxidizer Project. Browse this crate's source code. - [__cargo_doc2readme_dependencies_info]: ggGmYW0CYXZlMC43LjJhdIQbLiTyV0MU86EbZU15e0PmecoboQ9jo59bnAEbyDXw04U13GlhYvRhcoQbg_hDqE88LP4bMh0J5Y4y4Osb0zDJ1kwqOsoblCGrm49Rx2thZIiCaGJ5dGVzYnVmZTAuNS4zgmZjYWNoZXRlMC42LjSCbWNhY2hldF9tZW1vcnllMC4zLjOCbmNhY2hldF9zZXJ2aWNlZTAuMi4zgmtjYWNoZXRfdGllcmUwLjIuMoJkdGlja2UwLjMuM4JndHJhY2luZ2YwLjEuNDSCaXVuaWZsaWdodGUwLjIuMw + [__cargo_doc2readme_dependencies_info]: ggGkYW0CYXSEGy4k8ldDFPOhG2VNeXtD5nnKG6EPY6OfW5wBG8g18NOFNdxpYXKEG_8ZSA792uloG6CGM3YZObWMG5vDWjb2V8K3G4SF7NHmnsnBYWSIgmhieXRlc2J1ZmUwLjUuM4JmY2FjaGV0ZTAuNi40gm1jYWNoZXRfbWVtb3J5ZTAuMy4zgm5jYWNoZXRfc2VydmljZWUwLjIuM4JrY2FjaGV0X3RpZXJlMC4yLjKCZHRpY2tlMC4zLjOCZ3RyYWNpbmdmMC4xLjQ0gml1bmlmbGlnaHRlMC4yLjM [__link0]: https://docs.rs/cachet/0.6.4/cachet/?search=TimeToRefresh [__link1]: https://crates.io/crates/uniflight/0.2.3 [__link10]: https://docs.rs/cachet_tier/0.2.2/cachet_tier/?search=CacheTier @@ -281,6 +296,9 @@ This crate was developed as part of The Oxidizer Project. Br [__link2]: https://docs.rs/cachet/0.6.4/cachet/?search=CacheBuilder::stampede_protection [__link20]: https://crates.io/crates/tracing/0.1.44 [__link21]: https://docs.rs/cachet/0.6.4/cachet/?search=telemetry::attributes + [__link22]: https://docs.rs/cachet/0.6.4/cachet/?search=telemetry::handler::CacheEventHandler + [__link23]: https://docs.rs/cachet/0.6.4/cachet/?search=telemetry::handler::CacheTierEvent + [__link24]: https://docs.rs/cachet/0.6.4/cachet/?search=telemetry::handler::CacheOperationEvent [__link3]: https://docs.rs/cachet_tier/0.2.2/cachet_tier/?search=CacheTier [__link4]: https://docs.rs/cachet_tier/0.2.2/cachet_tier/?search=DynamicCache [__link5]: https://docs.rs/cachet/0.6.4/cachet/?search=InsertPolicy diff --git a/crates/cachet/benches/operations.rs b/crates/cachet/benches/operations.rs index c3598580f..a4db39446 100644 --- a/crates/cachet/benches/operations.rs +++ b/crates/cachet/benches/operations.rs @@ -15,6 +15,7 @@ use cachet_tier::MockCache; use criterion::{Criterion, criterion_group, criterion_main}; use tick::Clock; use tokio::runtime::Runtime; +use tracing_subscriber::layer::SubscriberExt; fn rt() -> Runtime { Runtime::new().expect("failed to create runtime") @@ -87,6 +88,7 @@ fn bench_cache_operations(c: &mut Criterion) { // Wrapper Overhead (direct vs wrapped vs features) // ============================================================================= +#[expect(clippy::too_many_lines, reason = "benchmark function with multiple related groups")] fn bench_wrapper_overhead(c: &mut Criterion) { let rt = rt(); let mut group = c.benchmark_group("wrapper_overhead"); @@ -148,6 +150,31 @@ fn bench_wrapper_overhead(c: &mut Criterion) { }); }); + // With telemetry + active subscriber (measures event formatting/dispatch overhead) + group.bench_function("with_telemetry_subscriber", |b| { + let subscriber = tracing_subscriber::registry().with(tracing_subscriber::fmt::layer().with_writer(std::io::sink).with_ansi(false)); + let _guard = tracing::subscriber::set_default(subscriber); + + let cache = rt.block_on(async { + let clock = Clock::new_tokio(); + Cache::builder(clock) + .storage(MockCache::::new()) + .enable_logs() + .build() + }); + let key = "key".to_string(); + + b.iter_custom(|iters| { + rt.block_on(async { + let start = Instant::now(); + for _ in 0..iters { + let _ = black_box(cache.get(black_box(&key)).await); + } + start.elapsed() + }) + }); + }); + // With fallback tier group.bench_function("with_fallback", |b| { let cache = rt.block_on(async { diff --git a/crates/cachet/examples/telemetry_accumulator.rs b/crates/cachet/examples/telemetry_accumulator.rs new file mode 100644 index 000000000..45b4e81e4 --- /dev/null +++ b/crates/cachet/examples/telemetry_accumulator.rs @@ -0,0 +1,173 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Demonstrates accumulating cachet telemetry into a single summary +//! per cache operation, correlated by `request_id`. +//! +//! This pattern mirrors how a TVS-style consumer would collect tier +//! outcomes, latencies, and flags into one log row per request. +//! +//! Uses `DashMap` for concurrent, low-contention accumulation — safe across +//! all async runtimes, including work-stealing (tokio) and thread-per-core +//! (oxidizer), even if a task migrates between cores mid-operation. +//! +//! Run with: `cargo run --example telemetry_accumulator --features "memory,logs"` + +use std::time::Duration; + +use cachet::telemetry::handler::{CacheEventHandler, CacheOperationEvent, CacheTierEvent, RequestId}; +use cachet::{Cache, CacheEntry}; +use dashmap::DashMap; +use tick::Clock; + +// --------------------------------------------------------------------------- +// Accumulated state — one entry per in-flight operation, keyed by request_id +// --------------------------------------------------------------------------- + +#[derive(Debug)] +struct TierRecord { + tier_name: String, + outcome: String, + duration_us: u64, + fallback: bool, +} + +/// Handler that accumulates tier events per `request_id` and prints a +/// one-line summary when the operation completes. +/// +/// `DashMap` shards the map internally so concurrent operations on +/// different cores rarely contend. +struct AccumulatingHandler { + pending: DashMap>, +} + +impl AccumulatingHandler { + fn new() -> Self { + Self { pending: DashMap::new() } + } +} + +impl CacheEventHandler for AccumulatingHandler { + fn on_tier_event(&self, event: &CacheTierEvent<'_>) { + // Eviction events have request_id > 0 when triggered synchronously + // during an insert (capacity overflow). Background maintenance + // evictions have request_id == 0. + self.pending.entry(event.request_id).or_default().push(TierRecord { + tier_name: event.tier_name.to_owned(), + outcome: event.outcome.to_owned(), + duration_us: u64::try_from(event.duration.as_micros()).unwrap_or(u64::MAX), + fallback: event.fallback, + }); + } + + fn on_operation_complete(&self, event: &CacheOperationEvent<'_>) { + let tiers = self.pending.remove(&event.request_id).map(|(_, v)| v).unwrap_or_default(); + + // --- Build the summary line --- + // A TVS consumer would pack these into a bitfield here. + + let mut flags = Vec::new(); + if event.coalesced { + flags.push("COALESCED"); + } + if tiers.iter().any(|t| t.fallback) { + flags.push("FALLBACK"); + } + let flags_str = if flags.is_empty() { + String::new() + } else { + format!(" [{}]", flags.join(", ")) + }; + + // Final outcome = last tier's outcome + let outcome = tiers.last().map_or("?", |t| t.outcome.as_str()); + + print!( + "[{}] {} -> {} ({}us total){flags_str}", + event.cache_name, + event.operation, + outcome, + event.duration.as_micros(), + ); + + // Per-tier breakdown for multi-tier caches + if tiers.len() > 1 { + print!(" | "); + for (i, tier) in tiers.iter().enumerate() { + if i > 0 { + print!(", "); + } + print!("{}={} ({}us)", tier.tier_name, tier.outcome, tier.duration_us); + } + } + + println!(); + } +} + +// --------------------------------------------------------------------------- +// Main +// --------------------------------------------------------------------------- + +#[tokio::main] +async fn main() { + let clock = Clock::new_tokio(); + + // Single-tier cache + println!("=== Single-tier cache ==="); + let cache: Cache = Cache::builder(clock.clone()) + .memory() + .name("single") + .event_handler(AccumulatingHandler::new()) + .build(); + + cache + .insert("key".to_string(), CacheEntry::new("value".to_string())) + .await + .expect("insert should succeed"); + let _ = cache.get(&"key".to_string()).await; + let _ = cache.get(&"missing".to_string()).await; + + // Two-tier cache with fallback + println!("\n=== Two-tier cache (L1 -> L2) ==="); + let l2 = Cache::builder::(clock.clone()).memory().name("l2"); + let cache2: Cache = Cache::builder(clock) + .memory() + .name("l1") + .ttl(Duration::from_secs(30)) + .event_handler(AccumulatingHandler::new()) + .fallback(l2) + .build(); + + cache2 + .insert("user:1".to_string(), CacheEntry::new("Alice".to_string())) + .await + .expect("insert should succeed"); + let _ = cache2.get(&"user:1".to_string()).await; + let _ = cache2.get(&"nobody".to_string()).await; + + // Capacity-limited cache — evictions correlated with inserts + println!("\n=== Capacity-limited cache (max 2 entries) ==="); + let cache3: Cache = Cache::builder(Clock::new_tokio()) + .memory_with(|b| b.max_capacity(2).with_eviction_telemetry()) + .name("tiny") + .event_handler(AccumulatingHandler::new()) + .build(); + + // Fill to capacity + cache3 + .insert("a".to_string(), CacheEntry::new("1".to_string())) + .await + .expect("insert should succeed"); + cache3 + .insert("b".to_string(), CacheEntry::new("2".to_string())) + .await + .expect("insert should succeed"); + // This insert may trigger an eviction — the eviction event will carry + // the same request_id as the insert, so the accumulator sees both the + // insert and the eviction in one summary. + cache3 + .insert("c".to_string(), CacheEntry::new("3".to_string())) + .await + .expect("insert should succeed"); +} diff --git a/crates/cachet/examples/telemetry_subscriber.rs b/crates/cachet/examples/telemetry_subscriber.rs index 9ce1d914b..27476f86c 100644 --- a/crates/cachet/examples/telemetry_subscriber.rs +++ b/crates/cachet/examples/telemetry_subscriber.rs @@ -1,96 +1,34 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -//! Demonstrates subscribing to cachet telemetry events using a custom tracing Layer. +//! Demonstrates cachet telemetry as structured tracing events. //! -//! This example shows how to use the public constants in `cachet::telemetry::attributes` -//! to build a Layer that reacts to specific cache events. +//! Run with: `cargo run --example telemetry_subscriber --features "memory,logs"` use std::time::Duration; -use cachet::telemetry::attributes; use cachet::{Cache, CacheEntry}; use tick::Clock; -use tracing::field::{Field, Visit}; -use tracing_subscriber::Layer; -use tracing_subscriber::layer::{Context, SubscriberExt}; - -/// A simple Layer that prints cache events to stdout. -struct CacheEventPrinter; - -/// Visitor that extracts cache telemetry fields from a tracing event. -#[derive(Default)] -struct CacheFieldVisitor { - cache_name: Option, - event: Option, - duration_ns: Option, -} - -impl Visit for CacheFieldVisitor { - fn record_str(&mut self, field: &Field, value: &str) { - match field.name() { - attributes::FIELD_NAME => self.cache_name = Some(value.to_owned()), - attributes::FIELD_EVENT => self.event = Some(value.to_owned()), - _ => {} - } - } - - fn record_u128(&mut self, field: &Field, value: u128) { - if field.name() == attributes::FIELD_DURATION_NS { - self.duration_ns = Some(value); - } - } - - fn record_debug(&mut self, _field: &Field, _value: &dyn std::fmt::Debug) {} -} - -impl Layer for CacheEventPrinter { - fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) { - // Only process cachet events - if !event.metadata().target().starts_with(attributes::TARGET) { - return; - } - - let mut visitor = CacheFieldVisitor::default(); - event.record(&mut visitor); - - let cache_name = visitor.cache_name.as_deref().unwrap_or("unknown"); - let event_type = visitor.event.as_deref().unwrap_or("unknown"); - let duration_us = visitor.duration_ns.unwrap_or(0) / 1000; - - // React to specific events using the public constants - match event_type { - attributes::EVENT_HIT => println!("HIT on {cache_name} ({duration_us}µs)"), - attributes::EVENT_MISS => println!("MISS on {cache_name} ({duration_us}µs)"), - attributes::EVENT_INSERTED => println!("INSERT on {cache_name} ({duration_us}µs)"), - attributes::EVENT_EXPIRED => println!("EXPIRED on {cache_name} ({duration_us}µs)"), - other => println!("{other} on {cache_name} ({duration_us}µs)"), - } - } -} +use tracing_subscriber::layer::SubscriberExt; #[tokio::main] async fn main() { - // Set up the subscriber with our custom cache event layer - let subscriber = tracing_subscriber::registry().with(CacheEventPrinter); - let _guard = tracing::subscriber::set_default(subscriber); + // Set up a subscriber that shows cachet's structured events. + let subscriber = tracing_subscriber::registry().with(tracing_subscriber::fmt::layer().with_ansi(true).with_target(false)); + tracing::subscriber::set_global_default(subscriber).expect("subscriber already set"); let clock = Clock::new_tokio(); - let cache = Cache::builder::(clock) - .memory() - .enable_logs() - .ttl(Duration::from_secs(30)) - .build(); + let cache: Cache = Cache::builder(clock).memory().enable_logs().ttl(Duration::from_secs(30)).build(); - println!("Inserting entry..."); + println!("--- Insert ---"); cache .insert("user:1".to_string(), CacheEntry::new("Alice".to_string())) .await .expect("insert failed"); - println!("Getting existing key..."); + println!("\n--- Get (hit) ---"); let _ = cache.get(&"user:1".to_string()).await; - println!("Getting missing key..."); + println!("\n--- Get (miss) ---"); let _ = cache.get(&"user:999".to_string()).await; } diff --git a/crates/cachet/src/builder/buildable.rs b/crates/cachet/src/builder/buildable.rs index befc93b83..95c4d25d5 100644 --- a/crates/cachet/src/builder/buildable.rs +++ b/crates/cachet/src/builder/buildable.rs @@ -19,7 +19,7 @@ pub(crate) trait Buildable { fn build(self) -> Cache; - fn build_tier(self, clock: Clock, telemetry: CacheTelemetry) -> Self::TierOutput; + fn build_tier(self, clock: Clock, telemetry: CacheTelemetry, fallback: bool) -> Self::TierOutput; } impl Buildable for CacheBuilder @@ -36,18 +36,18 @@ where let telemetry = self.telemetry.clone(); let stampede_protection = self.stampede_protection; - let tier = DynamicCache::new(self.build_tier(clock.clone(), telemetry)); + let tier = DynamicCache::new(self.build_tier(clock.clone(), telemetry.clone(), false)); - Cache::new(type_name::(name), tier, clock, stampede_protection) + Cache::new(type_name::(name), tier, clock, telemetry, stampede_protection) } - fn build_tier(self, clock: Clock, telemetry: CacheTelemetry) -> Self::TierOutput { + fn build_tier(self, clock: Clock, telemetry: CacheTelemetry, fallback: bool) -> Self::TierOutput { let name = type_name::(self.name); #[cfg(feature = "memory")] if let Some(hook) = &self.eviction_hook { hook.init(telemetry.clone(), name); } - CacheWrapper::new(name, self.storage, clock, self.ttl, telemetry, self.policy) + CacheWrapper::new(name, self.storage, clock, self.ttl, telemetry, self.policy, fallback) } } @@ -66,14 +66,14 @@ where let telemetry = self.telemetry.clone(); let stampede_protection = self.stampede_protection; - let tier = DynamicCache::new(self.build_tier(clock.clone(), telemetry)); + let tier = DynamicCache::new(self.build_tier(clock.clone(), telemetry.clone(), false)); - Cache::new(type_name::(name), tier, clock, stampede_protection) + Cache::new(type_name::(name), tier, clock, telemetry, stampede_protection) } - fn build_tier(self, clock: Clock, telemetry: CacheTelemetry) -> Self::TierOutput { - let primary = self.primary_builder.build_tier(clock.clone(), telemetry.clone()); - let fallback = self.fallback_builder.build_tier(clock.clone(), telemetry.clone()); + fn build_tier(self, clock: Clock, telemetry: CacheTelemetry, fallback: bool) -> Self::TierOutput { + let primary = self.primary_builder.build_tier(clock.clone(), telemetry.clone(), fallback); + let fallback = self.fallback_builder.build_tier(clock.clone(), telemetry.clone(), true); FallbackCache::new( type_name::(self.name), diff --git a/crates/cachet/src/builder/cache.rs b/crates/cachet/src/builder/cache.rs index 2a56c6fde..9eea7946d 100644 --- a/crates/cachet/src/builder/cache.rs +++ b/crates/cachet/src/builder/cache.rs @@ -3,7 +3,6 @@ use std::hash::Hash; use std::marker::PhantomData; -#[cfg(feature = "memory")] use std::sync::Arc; use std::time::Duration; @@ -18,6 +17,7 @@ use super::sealed::{CacheTierBuilder, Sealed}; use crate::eviction::EvictionHook; use crate::policy::InsertPolicy; use crate::telemetry::CacheTelemetry; +use crate::telemetry::handler::CacheEventHandler; use crate::{Cache, CacheTier}; /// Builder for constructing a cache with a single tier. @@ -206,7 +206,7 @@ impl CacheBuilder { /// If not set, a name is derived from the storage type. /// /// Requires `&'static str` because the name is embedded in every telemetry - /// event (metric labels, log fields). A static reference avoids cloning the + /// event (tracing fields, handler callbacks). A static reference avoids cloning the /// name into a new allocation on each cache operation, which matters at high /// throughput. In practice, cache names are always string literals. #[must_use] @@ -221,7 +221,7 @@ impl CacheBuilder { #[cfg(any(feature = "logs", test))] #[must_use] pub fn enable_logs(mut self) -> Self { - self.telemetry = CacheTelemetry::with_logging(); + self.telemetry = self.telemetry.enable_logging(); self } @@ -251,6 +251,13 @@ impl CacheBuilder { self } + /// Registers a callback for structured cache events. + #[must_use] + pub fn event_handler(mut self, handler: impl CacheEventHandler + 'static) -> Self { + self.telemetry = self.telemetry.with_handler(Arc::new(handler)); + self + } + /// Sets the time-to-live (TTL) for entries in this cache tier. /// /// Entries older than the TTL will be considered expired and won't be @@ -284,7 +291,7 @@ impl CacheBuilder { /// [`Cache::get_or_insert`](crate::Cache::get_or_insert), and promotion from a fallback tier. /// /// If the policy rejects an insert, the operation is skipped and a - /// `cache.rejected` telemetry event is recorded with `cache.operation = cache.insert`. + /// `cache.insert_rejected` telemetry event is recorded. /// /// # Examples /// diff --git a/crates/cachet/src/builder/transform.rs b/crates/cachet/src/builder/transform.rs index 990b1e9d2..97a48e6f7 100644 --- a/crates/cachet/src/builder/transform.rs +++ b/crates/cachet/src/builder/transform.rs @@ -245,17 +245,17 @@ where let clock = self.clock.clone(); let telemetry = self.telemetry.clone(); let stampede_protection = self.stampede_protection; - let tier = self.build_tier(clock.clone(), telemetry); + let tier = self.build_tier(clock.clone(), telemetry.clone(), false); - crate::Cache::new(type_name::(None), tier, clock, stampede_protection) + crate::Cache::new(type_name::(None), tier, clock, telemetry, stampede_protection) } - fn build_tier(self, clock: Clock, telemetry: CacheTelemetry) -> Self::TierOutput { + fn build_tier(self, clock: Clock, telemetry: CacheTelemetry, fallback: bool) -> Self::TierOutput { // Build pre-transform tier - let pre_tier = self.pre.build_tier(clock.clone(), telemetry.clone()); + let pre_tier = self.pre.build_tier(clock.clone(), telemetry.clone(), fallback); // Build post-transform tier, wrap in TransformAdapter - let post_tier = self.post.build_tier(clock.clone(), telemetry.clone()); + let post_tier = self.post.build_tier(clock.clone(), telemetry.clone(), true); let adapted = TransformAdapter::from_boxed(post_tier, self.key_encoder, self.value_codec); // Combine: pre is primary, adapted is fallback diff --git a/crates/cachet/src/cache.rs b/crates/cachet/src/cache.rs index 9b3c653d3..6c15856a3 100644 --- a/crates/cachet/src/cache.rs +++ b/crates/cachet/src/cache.rs @@ -13,11 +13,13 @@ use uniflight::Merger; use crate::Error; use crate::builder::CacheBuilder; +use crate::telemetry::CacheTelemetry; +use crate::telemetry::cache::{WithRequestIdExt, next_request_id}; /// Type alias for cache names used in telemetry. /// /// A static reference is used so that names can be embedded in telemetry -/// attributes (metric labels, log fields) without allocating on every +/// fields (tracing events, handler callbacks) without allocating on every /// cache operation. pub type CacheName = &'static str; @@ -113,6 +115,7 @@ pub struct Cache { pub(crate) name: CacheName, pub(crate) storage: DynamicCache, pub(crate) clock: Clock, + pub(crate) telemetry: CacheTelemetry, /// Mergers for stampede protection on all operations. /// Only present when `stampede_protection` is enabled. mergers: Option>, @@ -149,11 +152,18 @@ where K: Clone + Eq + Hash + Send + Sync + 'static, V: Clone + Send + Sync + 'static, { - pub(crate) fn new(name: CacheName, storage: DynamicCache, clock: Clock, stampede_protection: bool) -> Self { + pub(crate) fn new( + name: CacheName, + storage: DynamicCache, + clock: Clock, + telemetry: CacheTelemetry, + stampede_protection: bool, + ) -> Self { Self { name, storage, clock, + telemetry, mergers: stampede_protection.then(Mergers::new), } } @@ -220,18 +230,28 @@ where K: Borrow, Q: Hash + Eq + ToOwned + ?Sized + Send + Sync, { - if let Some(mergers) = &self.mergers { - let owned = key.to_owned(); - let storage = &self.storage; - mergers - .get - .execute(key, move || async move { storage.get(&owned).await }) - .await - .unwrap_or_else(|panicked| Err(Error::from_source(panicked))) - } else { - let owned = key.to_owned(); - self.storage.get(&owned).await + let request_id = next_request_id(); + let watch = self.clock.stopwatch(); + async { + let (result, coalesced) = if let Some(mergers) = &self.mergers { + let owned = key.to_owned(); + let storage = &self.storage; + let result = mergers + .get + .execute(key, move || async move { storage.get(&owned).await }) + .await + .unwrap_or_else(|panicked| Err(Error::from_source(panicked))); + (result, true) + } else { + let owned = key.to_owned(); + (self.storage.get(&owned).await, false) + }; + self.telemetry + .complete_operation(request_id, self.name, "cache.get", watch.elapsed(), coalesced); + result } + .with_request_id(request_id) + .await } /// Inserts a value into the cache. @@ -260,7 +280,16 @@ where /// # }; /// ``` pub async fn insert(&self, key: K, entry: impl Into>) -> Result<(), Error> { - self.storage.insert(key, entry.into()).await + let request_id = next_request_id(); + let watch = self.clock.stopwatch(); + async { + let result = self.storage.insert(key, entry.into()).await; + self.telemetry + .complete_operation(request_id, self.name, "cache.insert", watch.elapsed(), false); + result + } + .with_request_id(request_id) + .await } /// Invalidates (removes) a value from the cache. @@ -282,18 +311,28 @@ where K: Borrow, Q: Hash + Eq + ToOwned + ?Sized + Send + Sync, { - if let Some(mergers) = &self.mergers { - let owned = key.to_owned(); - let storage = &self.storage; - mergers - .invalidate - .execute(key, move || async move { storage.invalidate(&owned).await }) - .await - .unwrap_or_else(|panicked| Err(Error::from_source(panicked))) - } else { - let owned = key.to_owned(); - self.storage.invalidate(&owned).await + let request_id = next_request_id(); + let watch = self.clock.stopwatch(); + async { + let (result, coalesced) = if let Some(mergers) = &self.mergers { + let owned = key.to_owned(); + let storage = &self.storage; + let result = mergers + .invalidate + .execute(key, move || async move { storage.invalidate(&owned).await }) + .await + .unwrap_or_else(|panicked| Err(Error::from_source(panicked))); + (result, true) + } else { + let owned = key.to_owned(); + (self.storage.invalidate(&owned).await, false) + }; + self.telemetry + .complete_operation(request_id, self.name, "cache.invalidate", watch.elapsed(), coalesced); + result } + .with_request_id(request_id) + .await } /// Returns true if the cache contains a value for the given key. @@ -315,7 +354,16 @@ where /// /// Returns an error if the underlying cache tier operation fails. pub async fn clear(&self) -> Result<(), Error> { - self.storage.clear().await + let request_id = next_request_id(); + let watch = self.clock.stopwatch(); + async { + let result = self.storage.clear().await; + self.telemetry + .complete_operation(request_id, self.name, "cache.clear", watch.elapsed(), false); + result + } + .with_request_id(request_id) + .await } /// Returns an **approximate** count of entries, if supported by the underlying storage. @@ -412,16 +460,26 @@ where Q: Hash + Eq + ToOwned + ?Sized + Send + Sync, Fut: Future + Send, { - let owned = key.to_owned(); - if let Some(mergers) = &self.mergers { - mergers - .get_or_insert - .execute(key, move || async move { self.do_get_or_insert(&owned, f).await }) - .await - .unwrap_or_else(|panicked| Err(Error::from_source(panicked))) - } else { - self.do_get_or_insert(&owned, f).await + let request_id = next_request_id(); + let watch = self.clock.stopwatch(); + async { + let owned = key.to_owned(); + let (result, coalesced) = if let Some(mergers) = &self.mergers { + let result = mergers + .get_or_insert + .execute(key, move || async move { self.do_get_or_insert(&owned, f).await }) + .await + .unwrap_or_else(|panicked| Err(Error::from_source(panicked))); + (result, true) + } else { + (self.do_get_or_insert(&owned, f).await, false) + }; + self.telemetry + .complete_operation(request_id, self.name, "cache.get_or_insert", watch.elapsed(), coalesced); + result } + .with_request_id(request_id) + .await } async fn do_get_or_insert(&self, key: &K, f: impl FnOnce() -> Fut) -> Result, Error> @@ -650,16 +708,26 @@ where E: std::error::Error + Send + Sync + 'static, Fut: Future> + Send, { - let owned = key.to_owned(); - if let Some(mergers) = &self.mergers { - mergers - .try_get_or_insert - .execute(key, move || async move { self.do_try_get_or_insert(&owned, f).await }) - .await - .unwrap_or_else(|panicked| Err(Error::from_source(panicked))) - } else { - self.do_try_get_or_insert(&owned, f).await + let request_id = next_request_id(); + let watch = self.clock.stopwatch(); + async { + let owned = key.to_owned(); + let (result, coalesced) = if let Some(mergers) = &self.mergers { + let result = mergers + .try_get_or_insert + .execute(key, move || async move { self.do_try_get_or_insert(&owned, f).await }) + .await + .unwrap_or_else(|panicked| Err(Error::from_source(panicked))); + (result, true) + } else { + (self.do_try_get_or_insert(&owned, f).await, false) + }; + self.telemetry + .complete_operation(request_id, self.name, "cache.try_get_or_insert", watch.elapsed(), coalesced); + result } + .with_request_id(request_id) + .await } async fn do_try_get_or_insert(&self, key: &K, f: impl FnOnce() -> Fut) -> Result, Error> @@ -728,16 +796,26 @@ where Q: Hash + Eq + ToOwned + ?Sized + Send + Sync, Fut: Future> + Send, { - let owned = key.to_owned(); - if let Some(mergers) = &self.mergers { - mergers - .optionally_get_or_insert - .execute(key, move || async move { self.do_optionally_get_or_insert(&owned, f).await }) - .await - .unwrap_or_else(|panicked| Err(Error::from_source(panicked))) - } else { - self.do_optionally_get_or_insert(&owned, f).await + let request_id = next_request_id(); + let watch = self.clock.stopwatch(); + async { + let owned = key.to_owned(); + let (result, coalesced) = if let Some(mergers) = &self.mergers { + let result = mergers + .optionally_get_or_insert + .execute(key, move || async move { self.do_optionally_get_or_insert(&owned, f).await }) + .await + .unwrap_or_else(|panicked| Err(Error::from_source(panicked))); + (result, true) + } else { + (self.do_optionally_get_or_insert(&owned, f).await, false) + }; + self.telemetry + .complete_operation(request_id, self.name, "cache.optionally_get_or_insert", watch.elapsed(), coalesced); + result } + .with_request_id(request_id) + .await } async fn do_optionally_get_or_insert(&self, key: &K, f: impl FnOnce() -> Fut) -> Result>, Error> @@ -791,9 +869,13 @@ where #[cfg(test)] mod tests { + use std::sync::{Arc, Mutex}; + use cachet_tier::MockCache; use super::*; + use crate::telemetry::handler::RequestId; + use crate::{CacheEventHandler, CacheOperationEvent, CacheTierEvent}; fn block_on(f: F) -> F::Output { futures::executor::block_on(f) @@ -841,6 +923,88 @@ mod tests { }); } + #[test] + fn cache_event_handler_receives_fallback_tier_events() { + type EventRecord = Vec<(RequestId, String, String, bool)>; + + #[derive(Clone)] + struct RecordingHandler { + tier_events: Arc>, + operation_events: Arc>, + } + + impl CacheEventHandler for RecordingHandler { + fn on_tier_event(&self, event: &CacheTierEvent<'_>) { + self.tier_events.lock().expect("test handler mutex should not be poisoned").push(( + event.request_id, + event.tier_name.to_string(), + event.outcome.to_string(), + event.fallback, + )); + } + + fn on_operation_complete(&self, event: &CacheOperationEvent<'_>) { + self.operation_events + .lock() + .expect("test handler mutex should not be poisoned") + .push(( + event.request_id, + event.cache_name.to_string(), + event.operation.to_string(), + event.coalesced, + )); + } + } + + let tier_events = Arc::new(Mutex::new(Vec::new())); + let operation_events = Arc::new(Mutex::new(Vec::new())); + + block_on(async { + let clock = Clock::new_frozen(); + let handler = RecordingHandler { + tier_events: Arc::clone(&tier_events), + operation_events: Arc::clone(&operation_events), + }; + + let l2 = Cache::builder::(clock.clone()).storage(MockCache::new()).name("l2"); + let cache = Cache::builder::(clock) + .storage(MockCache::new()) + .name("l1") + .event_handler(handler) + .fallback(l2) + .build(); + + let result = cache.get("missing").await.unwrap(); + assert!(result.is_none()); + }); + + let tier_events = tier_events.lock().expect("test handler mutex should not be poisoned").clone(); + let operation_events = operation_events.lock().expect("test handler mutex should not be poisoned").clone(); + let request_id = operation_events[0].0; + + assert_eq!( + tier_events, + vec![ + ( + request_id, + "l1".to_string(), + crate::telemetry::attributes::EVENT_MISS.to_string(), + false + ), + ( + request_id, + "l2".to_string(), + crate::telemetry::attributes::EVENT_MISS.to_string(), + true + ), + ] + ); + assert_eq!( + operation_events, + vec![(request_id, "l1".to_string(), "cache.get".to_string(), false)] + ); + } + #[test] fn cache_insert_and_get() { block_on(async { diff --git a/crates/cachet/src/eviction.rs b/crates/cachet/src/eviction.rs index f586546d1..bc3cbfc8b 100644 --- a/crates/cachet/src/eviction.rs +++ b/crates/cachet/src/eviction.rs @@ -9,7 +9,6 @@ //! the cache is finally built. use std::sync::OnceLock; -use std::time::Duration; use cachet_memory::RemovalCause; @@ -42,13 +41,16 @@ impl EvictionHook { /// /// `Explicit` and `Replaced` are ignored because they are already covered /// by the wrapper's `cache.invalidated` / `cache.inserted` events. + /// + /// These events fire from moka's background thread with no parent span, + /// so they emit standalone tracing events rather than recording on a span. pub(crate) fn handle(&self, cause: RemovalCause) { let Some(state) = self.state.get() else { return; }; match cause { - RemovalCause::Size => state.telemetry.cache_eviction(state.name, Duration::ZERO), - RemovalCause::Expired => state.telemetry.cache_expired(state.name, Duration::ZERO), + RemovalCause::Size => state.telemetry.record_eviction(state.name), + RemovalCause::Expired => state.telemetry.record_background_expired(state.name), RemovalCause::Explicit | RemovalCause::Replaced => {} } } @@ -97,4 +99,20 @@ mod tests { hook.handle(RemovalCause::Expired); capture.assert_contains(attributes::EVENT_EXPIRED); } + + #[cfg_attr(miri, ignore)] + #[test] + fn handle_without_logging_emits_no_tracing_events() { + let capture = LogCapture::new(); + let _guard = tracing::subscriber::set_default(capture.subscriber()); + + // Logging disabled — covers the false branch of the logging_enabled check + let hook = Arc::new(EvictionHook::new()); + hook.init(CacheTelemetry::new(), "no_logs"); + + hook.handle(RemovalCause::Size); + hook.handle(RemovalCause::Expired); + + assert!(capture.output().is_empty(), "no log events should fire without logging enabled"); + } } diff --git a/crates/cachet/src/fallback.rs b/crates/cachet/src/fallback.rs index 272e7159d..734c78ab4 100644 --- a/crates/cachet/src/fallback.rs +++ b/crates/cachet/src/fallback.rs @@ -18,7 +18,6 @@ use crate::Error; use crate::cache::CacheName; use crate::refresh::TimeToRefresh; use crate::telemetry::CacheTelemetry; -use crate::telemetry::ext::ClockExt; pub(crate) struct FallbackCacheInner { pub(crate) name: CacheName, @@ -102,11 +101,7 @@ where /// /// Separated from [`get`](Self::get) to keep the hot path (primary hits) small. async fn get_from_fallback(&self, key: &K) -> Result>, Error> { - let timed = self.inner.clock.timed_async(self.inner.fallback.get(key)).await; - self.inner.telemetry.cache_fallback(self.inner.name, timed.duration); - - // Propagate any error from fallback - let fallback_value = timed.result?; + let fallback_value = self.inner.fallback.get(key).await?; if let Some(ref v) = fallback_value { // Insert errors are intentionally swallowed - a failed promotion should not @@ -193,7 +188,7 @@ mod tests { fn make_primary() -> TestPrimary { let clock = Clock::new_frozen(); let telemetry = CacheTelemetry::new(); - CacheWrapper::new("primary", MockCache::new(), clock, None, telemetry, InsertPolicy::default()) + CacheWrapper::new("primary", MockCache::new(), clock, None, telemetry, InsertPolicy::default(), false) } fn make_fallback_cache() -> TestFallbackCache { @@ -446,6 +441,7 @@ mod tests { None, telemetry.clone(), InsertPolicy::default(), + false, ); let fc = FallbackCache::new("test", primary, fallback_mock, clock, Some(refresh), telemetry); diff --git a/crates/cachet/src/lib.rs b/crates/cachet/src/lib.rs index 999c7f85a..88f672025 100644 --- a/crates/cachet/src/lib.rs +++ b/crates/cachet/src/lib.rs @@ -4,7 +4,7 @@ #![cfg_attr(docsrs, feature(doc_cfg))] //! A composable, multi-tier caching library with stampede protection, background -//! refresh, and built-in OpenTelemetry telemetry. +//! refresh, and structured telemetry. //! //! # Why Multi-Tier Caching? //! @@ -79,7 +79,7 @@ //! | Stampede protection | ❌ | ✅ | //! | Background refresh | ❌ | ✅ | //! | Service middleware integration | ❌ | ✅ | -//! | Structured telemetry (tracing) | ❌ | ✅ | +//! | Structured telemetry | ❌ | ✅ | //! | Pluggable storage backends | ❌ | ✅ | //! | Clock injection for testing | ❌ | ✅ | //! @@ -225,12 +225,16 @@ //! //! # Telemetry //! -//! Enable with the `logs` feature and `.enable_logs()` on the cache builder. +//! Cachet provides two complementary telemetry channels: +//! +//! ## Tracing events //! -//! Each cache operation emits a structured [`tracing`] event with fields -//! `cache.name`, `cache.event`, and `cache.duration_ns`. +//! Enable with the `logs` feature and `.enable_logs()` on the cache builder. +//! Each tier outcome and operation completion emits a structured [`tracing`] event. //! -//! ## Subscribing to events +//! **Tier events** carry `cache.name`, `cache.event`, and `cache.duration_ns`. +//! **Operation-complete events** carry `cache.name`, `cache.operation`, +//! `cache.duration_ns`, and `cache.coalesced`. //! //! Use [`telemetry::attributes`] constants to filter and match events in a //! custom `tracing_subscriber::Layer`: @@ -248,13 +252,24 @@ //! //! See the `telemetry_subscriber` example for a complete demonstration. //! -//! ## Event types +//! ### Event types //! //! | Level | Events | //! |-------|--------| //! | ERROR | `cache.get_error`, `cache.insert_error`, `cache.invalidate_error`, `cache.clear_error` | -//! | INFO | `cache.expired`, `cache.refresh_miss`, `cache.inserted`, `cache.insert_rejected`, `cache.invalidated`, `cache.fallback`, `cache.eviction` | +//! | INFO | `cache.expired`, `cache.refresh_miss`, `cache.inserted`, `cache.insert_rejected`, `cache.invalidated`, `cache.eviction` | //! | DEBUG | `cache.hit`, `cache.miss`, `cache.refresh_hit`, `cache.cleared` | +//! +//! ## Event handler callback API +//! +//! Register a [`CacheEventHandler`] via +//! `.event_handler(handler)` on the cache builder to receive typed +//! [`CacheTierEvent`] and +//! [`CacheOperationEvent`] callbacks. +//! Events carry a `request_id` for correlating tier outcomes with their parent +//! operation. Works independently of the `logs` feature. +//! +//! See the `telemetry_accumulator` example for a DashMap-based accumulation pattern. mod builder; mod cache; @@ -291,4 +306,6 @@ pub use policy::InsertPolicy; #[doc(inline)] pub use refresh::TimeToRefresh; #[doc(inline)] +pub use telemetry::handler::{CacheEventHandler, CacheOperationEvent, CacheTierEvent}; +#[doc(inline)] pub use transform::{Codec, DecodeOutcome, Encoder, TransformCodec, TransformEncoder, infallible, infallible_owned}; diff --git a/crates/cachet/src/policy.rs b/crates/cachet/src/policy.rs index 1a827159f..2f83da17b 100644 --- a/crates/cachet/src/policy.rs +++ b/crates/cachet/src/policy.rs @@ -13,7 +13,7 @@ type InsertPredicate = Arc) -> bool + Send + Sync>; /// The insert policy applies to all inserts into the tier, including direct /// [`Cache::insert`](crate::Cache::insert) calls, [`Cache::get_or_insert`](crate::Cache::get_or_insert), and promotion from a /// fallback tier. If the policy rejects an insert, the operation is skipped -/// and a `cache.rejected` telemetry event is recorded, with the operation recorded as `cache.insert`. +/// and a `cache.insert_rejected` telemetry event is recorded. /// /// # Examples /// diff --git a/crates/cachet/src/refresh.rs b/crates/cachet/src/refresh.rs index 647beb54b..d5345c03e 100644 --- a/crates/cachet/src/refresh.rs +++ b/crates/cachet/src/refresh.rs @@ -18,7 +18,7 @@ use cachet_tier::{CacheEntry, CacheTier}; use parking_lot::Mutex; use crate::fallback::{FallbackCache, FallbackCacheInner}; -use crate::telemetry::ext::ClockExt; +use crate::telemetry::cache::{WithRequestIdExt, next_request_id}; /// Configuration for background cache refresh. /// @@ -131,7 +131,8 @@ where } } }); - inner.fetch_and_promote(key).await; + let request_id = next_request_id(); + inner.fetch_and_promote(key).with_request_id(request_id).await; })); } } @@ -145,16 +146,15 @@ where F: CacheTier + Send + Sync + 'static, { pub(crate) async fn fetch_and_promote(&self, key: K) { - let timed = self.clock.timed_async(self.fallback.get(&key)).await; - - match timed.result { - Ok(Some(value)) => self.handle_fallback_hit(key, value, timed.duration).await, - Ok(None) | Err(_) => self.handle_fallback_miss(timed.duration), + let watch = self.clock.stopwatch(); + match self.fallback.get(&key).await { + Ok(Some(value)) => self.handle_fallback_hit(key, value, watch.elapsed()).await, + Ok(None) | Err(_) => self.handle_fallback_miss(watch.elapsed()), } } async fn handle_fallback_hit(&self, key: K, value: CacheEntry, fetch_duration: Duration) { - self.telemetry.refresh_hit(self.name, fetch_duration); + self.telemetry.record_refresh_hit(self.name, fetch_duration); self.promote_to_primary(key, value).await; } @@ -167,7 +167,7 @@ where } fn handle_fallback_miss(&self, duration: Duration) { - self.telemetry.refresh_miss(self.name, duration); + self.telemetry.record_refresh_miss(self.name, duration); } } @@ -385,6 +385,7 @@ mod fetch_and_promote_tests { None, telemetry.clone(), InsertPolicy::never(), + false, ); let fc = FallbackCache::new("test", primary, fallback, clock, None, telemetry); @@ -475,7 +476,7 @@ mod fetch_and_promote_tests { fn make_wrapper(mock: MockCache) -> MockWrapper { let clock = Clock::new_frozen(); let telemetry = CacheTelemetry::new(); - CacheWrapper::new("test_primary", mock, clock, None, telemetry, InsertPolicy::default()) + CacheWrapper::new("test_primary", mock, clock, None, telemetry, InsertPolicy::default(), false) } fn build_mock_fallback_cache( @@ -507,7 +508,15 @@ mod fetch_and_promote_tests { let telemetry = CacheTelemetry::new(); let refresh = TimeToRefresh::new(Duration::from_mins(1), Spawner::new_tokio()); - let primary_wrapper = CacheWrapper::new("primary", primary, clock.clone(), None, telemetry.clone(), InsertPolicy::default()); + let primary_wrapper = CacheWrapper::new( + "primary", + primary, + clock.clone(), + None, + telemetry.clone(), + InsertPolicy::default(), + false, + ); let fc = FallbackCache::new("test", primary_wrapper, fallback, clock, Some(refresh), telemetry); let key = "key".to_string(); diff --git a/crates/cachet/src/telemetry/attributes.rs b/crates/cachet/src/telemetry/attributes.rs index 24946216b..b867cf094 100644 --- a/crates/cachet/src/telemetry/attributes.rs +++ b/crates/cachet/src/telemetry/attributes.rs @@ -3,9 +3,16 @@ //! Public constants for cachet telemetry field names and event values. //! -//! Use these constants to filter or match cachet events in a custom -//! `tracing_subscriber::Layer`. All cachet events are emitted with -//! `FIELD_NAME`, `FIELD_EVENT`, and `FIELD_DURATION_NS` fields. +//! Use these constants to filter or match cachet telemetry events in a custom +//! `tracing_subscriber::Layer`. +//! +//! **Tier events** (hit, miss, expired, etc.) carry `FIELD_NAME`, `FIELD_EVENT`, +//! and `FIELD_DURATION_NS`. Some events intentionally omit `FIELD_DURATION_NS` +//! to indicate "not timed": `EVENT_INSERT_REJECTED`, `EVENT_EVICTION`, and +//! background `EVENT_EXPIRED` events emitted from eviction listeners. +//! +//! **Operation-complete events** carry `FIELD_NAME`, `FIELD_OPERATION`, +//! `FIELD_DURATION_NS`, and `FIELD_COALESCED`. //! //! # Example //! @@ -41,6 +48,12 @@ pub const FIELD_EVENT: &str = "cache.event"; /// Field name for the operation duration in nanoseconds. pub const FIELD_DURATION_NS: &str = "cache.duration_ns"; +/// Field name for the cache operation name. +pub const FIELD_OPERATION: &str = "cache.operation"; + +/// Field name recording whether stampede protection was enabled for the operation. +pub const FIELD_COALESCED: &str = "cache.coalesced"; + // -- Event values (emitted in the `cache.event` field) -- /// Cache entry was found and valid. @@ -55,9 +68,6 @@ pub const EVENT_EXPIRED: &str = "cache.expired"; /// An error occurred during a get operation. pub const EVENT_GET_ERROR: &str = "cache.get_error"; -/// A fallback tier was consulted. -pub const EVENT_FALLBACK: &str = "cache.fallback"; - /// An entry was successfully inserted. pub const EVENT_INSERTED: &str = "cache.inserted"; @@ -95,10 +105,12 @@ mod tests { #[test] fn field_constants_match_tracing_field_names() { - // These constants must match the field names used in tracing macros in cache.rs. + // These constants must match the field names used in tracing macros in telemetry/cache.rs. assert_eq!(FIELD_NAME, "cache.name"); assert_eq!(FIELD_EVENT, "cache.event"); assert_eq!(FIELD_DURATION_NS, "cache.duration_ns"); + assert_eq!(FIELD_OPERATION, "cache.operation"); + assert_eq!(FIELD_COALESCED, "cache.coalesced"); } #[test] @@ -108,7 +120,6 @@ mod tests { EVENT_MISS, EVENT_EXPIRED, EVENT_GET_ERROR, - EVENT_FALLBACK, EVENT_INSERTED, EVENT_INSERT_REJECTED, EVENT_INSERT_ERROR, diff --git a/crates/cachet/src/telemetry/cache.rs b/crates/cachet/src/telemetry/cache.rs index c93fff32a..0542b326a 100644 --- a/crates/cachet/src/telemetry/cache.rs +++ b/crates/cachet/src/telemetry/cache.rs @@ -3,241 +3,452 @@ //! Cache telemetry types and recording. +use std::cell::Cell; +use std::pin::Pin; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::task::{Context, Poll}; use std::time::Duration; -#[cfg(any(feature = "logs", test))] -use thread_aware::{Arc, PerCore}; +use pin_project_lite::pin_project; use crate::cache::CacheName; use crate::telemetry::attributes; +use crate::telemetry::handler::{CacheEventHandler, CacheOperationEvent, CacheTierEvent, RequestId}; -/// Internal state for cache telemetry when features are enabled. -#[cfg(any(feature = "logs", test))] -#[derive(Clone, Debug)] -pub(crate) struct CacheTelemetryInner { - pub(crate) logging_enabled: bool, +/// Process-wide counter for generating unique request IDs. +static NEXT_REQUEST_ID: AtomicU64 = AtomicU64::new(1); + +std::thread_local! { + static CURRENT_REQUEST_ID: Cell = const { Cell::new(0) }; } -#[cfg(any(feature = "logs", test))] -impl CacheTelemetryInner { - #[inline] - fn debug(&self, cache_name: CacheName, event: &'static str, duration: Duration) { - if self.logging_enabled { - tracing::debug!( - cache.name = cache_name, - cache.event = event, - cache.duration_ns = duration.as_nanos() - ); - } - } +/// Generates a unique request ID for correlating tier events with their parent operation. +pub(crate) fn next_request_id() -> RequestId { + NEXT_REQUEST_ID.fetch_add(1, Ordering::Relaxed) +} - #[inline] - fn info(&self, cache_name: CacheName, event: &'static str, duration: Duration) { - if self.logging_enabled { - tracing::info!( - cache.name = cache_name, - cache.event = event, - cache.duration_ns = duration.as_nanos() - ); - } +pin_project! { + /// A future wrapper that restores the request ID into the thread-local + /// on every poll. This ensures the correct request ID is available + /// even if the task migrates to a different thread between polls. + /// + /// Supports nesting (e.g., a `get_or_insert` closure calling another cache + /// operation) by saving and restoring the previous request ID. + pub(crate) struct WithRequestId { + #[pin] + inner: F, + request_id: RequestId, } +} - #[inline] - fn error(&self, cache_name: CacheName, event: &'static str, duration: Duration) { - if self.logging_enabled { - tracing::error!( - cache.name = cache_name, - cache.event = event, - cache.duration_ns = duration.as_nanos() - ); - } +/// RAII guard that restores the previous thread-local request ID on drop, +/// ensuring cleanup even if the inner future panics during poll. +struct RestoreRequestId(RequestId); + +impl Drop for RestoreRequestId { + fn drop(&mut self) { + CURRENT_REQUEST_ID.with(|cell| cell.set(self.0)); } } -/// Internal state for cache telemetry when no features are enabled (no-op). -#[cfg(not(any(feature = "logs", test)))] -#[derive(Clone, Debug, Default)] -pub(crate) struct CacheTelemetryInner; +impl Future for WithRequestId { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let prev = CURRENT_REQUEST_ID.with(|cell| cell.replace(*this.request_id)); + let _guard = RestoreRequestId(prev); + this.inner.poll(cx) + } +} -#[cfg(not(any(feature = "logs", test)))] -#[expect(clippy::unused_self, reason = "Methods must match the logs-enabled impl signature")] -impl CacheTelemetryInner { - #[inline] - fn debug(&self, _: CacheName, _: &'static str, _: Duration) {} +/// Extension trait for wrapping a future with a request ID. +pub(crate) trait WithRequestIdExt: Sized { + /// Wraps this future so that `request_id` is set in the thread-local + /// on every poll, surviving task migration across threads. + fn with_request_id(self, request_id: RequestId) -> WithRequestId; +} - #[inline] - fn info(&self, _: CacheName, _: &'static str, _: Duration) {} +impl WithRequestIdExt for F { + fn with_request_id(self, request_id: RequestId) -> WithRequestId { + WithRequestId { inner: self, request_id } + } +} - #[inline] - fn error(&self, _: CacheName, _: &'static str, _: Duration) {} +/// Converts a `Duration` to nanoseconds as `u64`, saturating at `u64::MAX`. +/// A `u64` of nanoseconds covers around 584 years - overflow is not a practical concern. +#[cfg(any(feature = "logs", test))] +fn saturating_nanos(duration: Duration) -> u64 { + u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX) } /// Cache telemetry provider. /// /// This type is created internally by the cache builder and handles -/// recording cache operations as structured tracing events. -#[derive(Clone, Debug)] +/// emitting structured tracing events and forwarding handler callbacks. +#[derive(Clone, Default)] pub struct CacheTelemetry { #[cfg(any(feature = "logs", test))] - pub(crate) inner: Arc, - #[cfg(not(any(feature = "logs", test)))] - pub(crate) inner: CacheTelemetryInner, + logging_enabled: bool, + handler: Option>, +} + +impl std::fmt::Debug for CacheTelemetry { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CacheTelemetry") + .field("logging_enabled", &{ + #[cfg(any(feature = "logs", test))] + { + self.logging_enabled + } + #[cfg(not(any(feature = "logs", test)))] + { + false + } + }) + .field("has_handler", &self.handler.is_some()) + .finish() + } } impl CacheTelemetry { /// Creates a new `CacheTelemetry` with logging disabled. #[must_use] pub(crate) fn new() -> Self { + Self { + #[cfg(any(feature = "logs", test))] + logging_enabled: false, + handler: None, + } + } + + #[must_use] + pub(crate) fn with_handler(mut self, handler: Arc) -> Self { + self.handler = Some(handler); + self + } + + pub(crate) fn current_request_id() -> RequestId { + CURRENT_REQUEST_ID.with(Cell::get) + } + + fn emit_tier_event(&self, request_id: RequestId, tier_name: CacheName, outcome: &'static str, duration: Duration, fallback: bool) { + if let Some(handler) = &self.handler { + handler.on_tier_event(&CacheTierEvent { + request_id, + tier_name, + outcome, + duration, + fallback, + }); + } + } + + #[cfg_attr( + not(feature = "logs"), + expect(clippy::unused_self, reason = "self.logging_enabled is used when logs is enabled") + )] + fn record_debug_with_duration(&self, cache_name: CacheName, event: &'static str, duration: Duration) { #[cfg(any(feature = "logs", test))] - { - Self { - inner: Arc::from_unaware(CacheTelemetryInner { logging_enabled: false }), - } + if self.logging_enabled { + let duration_ns = saturating_nanos(duration); + tracing::debug!(cache.name = cache_name, cache.event = event, cache.duration_ns = duration_ns); } #[cfg(not(any(feature = "logs", test)))] { - Self { - inner: CacheTelemetryInner, - } + let _ = (cache_name, event, duration); } } - /// Creates a new `CacheTelemetry` with logging enabled. - #[cfg(any(feature = "logs", test))] - #[must_use] - pub(crate) fn with_logging() -> Self { - Self { - inner: Arc::from_unaware(CacheTelemetryInner { logging_enabled: true }), + #[cfg_attr( + not(feature = "logs"), + expect(clippy::unused_self, reason = "self.logging_enabled is used when logs is enabled") + )] + fn record_info_with_duration(&self, cache_name: CacheName, event: &'static str, duration: Duration) { + #[cfg(any(feature = "logs", test))] + if self.logging_enabled { + let duration_ns = saturating_nanos(duration); + tracing::info!(cache.name = cache_name, cache.event = event, cache.duration_ns = duration_ns); + } + #[cfg(not(any(feature = "logs", test)))] + { + let _ = (cache_name, event, duration); } } - // -- Get -- + #[cfg_attr( + not(feature = "logs"), + expect(clippy::unused_self, reason = "self.logging_enabled is used when logs is enabled") + )] + fn record_error_with_duration(&self, cache_name: CacheName, event: &'static str, duration: Duration) { + #[cfg(any(feature = "logs", test))] + if self.logging_enabled { + let duration_ns = saturating_nanos(duration); + tracing::error!(cache.name = cache_name, cache.event = event, cache.duration_ns = duration_ns); + } + #[cfg(not(any(feature = "logs", test)))] + { + let _ = (cache_name, event, duration); + } + } - /// Records a cache hit (key found and not expired). - #[inline] - pub(crate) fn cache_hit(&self, cache_name: CacheName, duration: Duration) { - self.inner.debug(cache_name, attributes::EVENT_HIT, duration); + pub(crate) fn record_hit(&self, tier_name: CacheName, duration: Duration, fallback: bool) { + self.record_debug_with_duration(tier_name, attributes::EVENT_HIT, duration); + self.emit_tier_event(Self::current_request_id(), tier_name, attributes::EVENT_HIT, duration, fallback); } - /// Records a cache miss (key not found). - #[inline] - pub(crate) fn cache_miss(&self, cache_name: CacheName, duration: Duration) { - self.inner.debug(cache_name, attributes::EVENT_MISS, duration); + pub(crate) fn record_miss(&self, tier_name: CacheName, duration: Duration, fallback: bool) { + self.record_debug_with_duration(tier_name, attributes::EVENT_MISS, duration); + self.emit_tier_event(Self::current_request_id(), tier_name, attributes::EVENT_MISS, duration, fallback); } - /// Records a cache entry that was found but expired. - #[inline] - pub(crate) fn cache_expired(&self, cache_name: CacheName, duration: Duration) { - self.inner.info(cache_name, attributes::EVENT_EXPIRED, duration); + pub(crate) fn record_expired(&self, tier_name: CacheName, duration: Duration, fallback: bool) { + self.record_info_with_duration(tier_name, attributes::EVENT_EXPIRED, duration); + self.emit_tier_event(Self::current_request_id(), tier_name, attributes::EVENT_EXPIRED, duration, fallback); } - /// Records an error during a get operation. - #[inline] - pub(crate) fn get_error(&self, cache_name: CacheName, duration: Duration) { - self.inner.error(cache_name, attributes::EVENT_GET_ERROR, duration); + pub(crate) fn record_get_error(&self, tier_name: CacheName, duration: Duration, fallback: bool) { + self.record_error_with_duration(tier_name, attributes::EVENT_GET_ERROR, duration); + self.emit_tier_event( + Self::current_request_id(), + tier_name, + attributes::EVENT_GET_ERROR, + duration, + fallback, + ); } - /// Records a fallback tier lookup. - #[inline] - pub(crate) fn cache_fallback(&self, cache_name: CacheName, duration: Duration) { - self.inner.info(cache_name, attributes::EVENT_FALLBACK, duration); + pub(crate) fn record_inserted(&self, tier_name: CacheName, duration: Duration, fallback: bool) { + self.record_info_with_duration(tier_name, attributes::EVENT_INSERTED, duration); + self.emit_tier_event( + Self::current_request_id(), + tier_name, + attributes::EVENT_INSERTED, + duration, + fallback, + ); } - // -- Refresh -- + pub(crate) fn record_insert_error(&self, tier_name: CacheName, duration: Duration, fallback: bool) { + self.record_error_with_duration(tier_name, attributes::EVENT_INSERT_ERROR, duration); + self.emit_tier_event( + Self::current_request_id(), + tier_name, + attributes::EVENT_INSERT_ERROR, + duration, + fallback, + ); + } - /// Records a successful background refresh from fallback. - #[inline] - pub(crate) fn refresh_hit(&self, cache_name: CacheName, duration: Duration) { - self.inner.debug(cache_name, attributes::EVENT_REFRESH_HIT, duration); + pub(crate) fn record_invalidated(&self, tier_name: CacheName, duration: Duration, fallback: bool) { + self.record_info_with_duration(tier_name, attributes::EVENT_INVALIDATED, duration); + self.emit_tier_event( + Self::current_request_id(), + tier_name, + attributes::EVENT_INVALIDATED, + duration, + fallback, + ); } - /// Records a background refresh miss (fallback had no data or returned error). - #[inline] - pub(crate) fn refresh_miss(&self, cache_name: CacheName, duration: Duration) { - self.inner.info(cache_name, attributes::EVENT_REFRESH_MISS, duration); + pub(crate) fn record_invalidate_error(&self, tier_name: CacheName, duration: Duration, fallback: bool) { + self.record_error_with_duration(tier_name, attributes::EVENT_INVALIDATE_ERROR, duration); + self.emit_tier_event( + Self::current_request_id(), + tier_name, + attributes::EVENT_INVALIDATE_ERROR, + duration, + fallback, + ); } - // -- Insert -- + pub(crate) fn record_cleared(&self, tier_name: CacheName, duration: Duration, fallback: bool) { + self.record_debug_with_duration(tier_name, attributes::EVENT_CLEARED, duration); + self.emit_tier_event(Self::current_request_id(), tier_name, attributes::EVENT_CLEARED, duration, fallback); + } - /// Records a successful cache insert. - #[inline] - pub(crate) fn cache_inserted(&self, cache_name: CacheName, duration: Duration) { - self.inner.info(cache_name, attributes::EVENT_INSERTED, duration); + pub(crate) fn record_clear_error(&self, tier_name: CacheName, duration: Duration, fallback: bool) { + self.record_error_with_duration(tier_name, attributes::EVENT_CLEAR_ERROR, duration); + self.emit_tier_event( + Self::current_request_id(), + tier_name, + attributes::EVENT_CLEAR_ERROR, + duration, + fallback, + ); } - /// Records that an entry was evicted from the cache because it reached - /// the configured capacity limit. - #[cfg(any(feature = "memory", test))] - #[inline] - pub(crate) fn cache_eviction(&self, cache_name: CacheName, duration: Duration) { - self.inner.info(cache_name, attributes::EVENT_EVICTION, duration); + /// Records a successful background refresh from the fallback tier. + pub(crate) fn record_refresh_hit(&self, cache_name: CacheName, duration: Duration) { + self.record_debug_with_duration(cache_name, attributes::EVENT_REFRESH_HIT, duration); + self.emit_tier_event( + Self::current_request_id(), + cache_name, + attributes::EVENT_REFRESH_HIT, + duration, + true, + ); } - /// Records a cache insert that was rejected by the insert policy. - #[inline] - pub(crate) fn insert_rejected(&self, cache_name: CacheName, duration: Duration) { - self.inner.info(cache_name, attributes::EVENT_INSERT_REJECTED, duration); + /// Records a background refresh that found no data in the fallback tier. + pub(crate) fn record_refresh_miss(&self, cache_name: CacheName, duration: Duration) { + self.record_info_with_duration(cache_name, attributes::EVENT_REFRESH_MISS, duration); + self.emit_tier_event( + Self::current_request_id(), + cache_name, + attributes::EVENT_REFRESH_MISS, + duration, + true, + ); } - /// Records an error during an insert operation. - #[inline] - pub(crate) fn insert_error(&self, cache_name: CacheName, duration: Duration) { - self.inner.error(cache_name, attributes::EVENT_INSERT_ERROR, duration); + pub(crate) fn record_insert_rejected(&self, tier_name: CacheName, fallback: bool) { + #[cfg(any(feature = "logs", test))] + if self.logging_enabled { + tracing::info!(cache.name = tier_name, cache.event = attributes::EVENT_INSERT_REJECTED); + } + self.emit_tier_event( + Self::current_request_id(), + tier_name, + attributes::EVENT_INSERT_REJECTED, + Duration::ZERO, + fallback, + ); } - // -- Invalidate -- + /// Records that an entry was evicted from the cache due to capacity limits. + /// + /// When moka evicts during an `insert()`, the eviction listener runs + /// synchronously on the inserting thread, so the thread-local request ID + /// is still set. This allows correlating capacity evictions with the + /// insert that caused them. Background maintenance evictions will have + /// a request ID of 0. + #[cfg(any(feature = "memory", test))] + pub(crate) fn record_eviction(&self, cache_name: CacheName) { + #[cfg(any(feature = "logs", test))] + if self.logging_enabled { + tracing::info!(cache.name = cache_name, cache.event = attributes::EVENT_EVICTION); + } - /// Records a successful cache invalidation. - #[inline] - pub(crate) fn cache_invalidated(&self, cache_name: CacheName, duration: Duration) { - self.inner.info(cache_name, attributes::EVENT_INVALIDATED, duration); + self.emit_tier_event( + Self::current_request_id(), + cache_name, + attributes::EVENT_EVICTION, + Duration::ZERO, + false, + ); } - /// Records an error during an invalidate operation. - #[inline] - pub(crate) fn invalidate_error(&self, cache_name: CacheName, duration: Duration) { - self.inner.error(cache_name, attributes::EVENT_INVALIDATE_ERROR, duration); + /// Records that an entry expired in the background (moka eviction listener). + /// + /// Unlike [`record_expired`](Self::record_expired), this fires from a + /// background thread with no parent operation context, so it emits a standalone event. + /// Like [`record_eviction`](Self::record_eviction), the request ID is + /// read from the thread-local (non-zero when triggered synchronously + /// during a cache operation). + #[cfg(feature = "memory")] + pub(crate) fn record_background_expired(&self, cache_name: CacheName) { + #[cfg(any(feature = "logs", test))] + if self.logging_enabled { + tracing::info!(cache.name = cache_name, cache.event = attributes::EVENT_EXPIRED); + } + + self.emit_tier_event( + Self::current_request_id(), + cache_name, + attributes::EVENT_EXPIRED, + Duration::ZERO, + false, + ); } - // -- Clear -- + pub(crate) fn complete_operation( + &self, + request_id: RequestId, + cache_name: CacheName, + operation: &'static str, + duration: Duration, + coalesced: bool, + ) { + #[cfg(any(feature = "logs", test))] + if self.logging_enabled { + let duration_ns = saturating_nanos(duration); + tracing::debug!( + cache.name = cache_name, + cache.operation = operation, + cache.duration_ns = duration_ns, + cache.coalesced = coalesced + ); + } - /// Records a successful cache clear. - #[inline] - pub(crate) fn cache_cleared(&self, cache_name: CacheName, duration: Duration) { - self.inner.debug(cache_name, attributes::EVENT_CLEARED, duration); + if let Some(handler) = &self.handler { + handler.on_operation_complete(&CacheOperationEvent { + request_id, + cache_name, + operation, + duration, + coalesced, + }); + } } +} - /// Records an error during a clear operation. - #[inline] - pub(crate) fn clear_error(&self, cache_name: CacheName, duration: Duration) { - self.inner.error(cache_name, attributes::EVENT_CLEAR_ERROR, duration); +#[cfg(any(feature = "logs", test))] +impl CacheTelemetry { + #[cfg(test)] + #[must_use] + pub(crate) fn with_logging() -> Self { + Self::new().enable_logging() + } + + #[must_use] + pub(crate) fn enable_logging(mut self) -> Self { + self.logging_enabled = true; + self } } #[cfg(test)] mod tests { + use std::sync::Mutex; + use testing_aids::LogCapture; + use tracing_subscriber::layer::SubscriberExt; use super::*; + fn subscriber(capture: &LogCapture) -> impl tracing::Subscriber { + tracing_subscriber::registry().with(tracing_subscriber::fmt::layer().with_writer(capture.clone()).with_ansi(false)) + } + #[cfg_attr(miri, ignore)] #[test] fn logs_emit_contains_all_fields_and_values() { let capture = LogCapture::new(); - let _guard = tracing::subscriber::set_default(capture.subscriber()); - + let _guard = tracing::subscriber::set_default(subscriber(&capture)); let telemetry = CacheTelemetry::with_logging(); - telemetry.invalidate_error("my_test_cache", Duration::from_nanos(12345)); - // Verify field names match public constants + let request_id = next_request_id(); + futures::executor::block_on(async { + async { + telemetry.record_hit("my_test_cache", Duration::from_nanos(12345), false); + telemetry.complete_operation(request_id, "my_test_cache", "cache.get", Duration::from_nanos(12345), true); + } + .with_request_id(request_id) + .await; + }); + capture.assert_contains(attributes::FIELD_NAME); capture.assert_contains(attributes::FIELD_EVENT); capture.assert_contains(attributes::FIELD_DURATION_NS); - - // Verify values + capture.assert_contains(attributes::FIELD_OPERATION); + capture.assert_contains(attributes::FIELD_COALESCED); capture.assert_contains("my_test_cache"); - capture.assert_contains(attributes::EVENT_INVALIDATE_ERROR); + capture.assert_contains(attributes::EVENT_HIT); + capture.assert_contains("cache.get"); capture.assert_contains("12345"); + capture.assert_contains("true"); } #[cfg_attr(miri, ignore)] @@ -245,22 +456,34 @@ mod tests { fn logs_emit_at_correct_severity_levels() { let telemetry = CacheTelemetry::with_logging(); - // Error level let capture = LogCapture::new(); - let _guard = tracing::subscriber::set_default(capture.subscriber()); - telemetry.get_error("cache", Duration::ZERO); + let _guard = tracing::subscriber::set_default(subscriber(&capture)); + let request_id = next_request_id(); + futures::executor::block_on(async { + async { telemetry.record_get_error("cache", Duration::ZERO, false) } + .with_request_id(request_id) + .await; + }); capture.assert_contains("ERROR"); - // Info level let capture = LogCapture::new(); - let _guard = tracing::subscriber::set_default(capture.subscriber()); - telemetry.cache_expired("cache", Duration::ZERO); + let _guard = tracing::subscriber::set_default(subscriber(&capture)); + let request_id = next_request_id(); + futures::executor::block_on(async { + async { telemetry.record_expired("cache", Duration::ZERO, false) } + .with_request_id(request_id) + .await; + }); capture.assert_contains("INFO"); - // Debug level let capture = LogCapture::new(); - let _guard = tracing::subscriber::set_default(capture.subscriber()); - telemetry.cache_hit("cache", Duration::ZERO); + let _guard = tracing::subscriber::set_default(subscriber(&capture)); + let request_id = next_request_id(); + futures::executor::block_on(async { + async { telemetry.record_hit("cache", Duration::ZERO, false) } + .with_request_id(request_id) + .await; + }); capture.assert_contains("DEBUG"); } @@ -268,42 +491,368 @@ mod tests { #[test] fn telemetry_disabled_emits_nothing() { let telemetry = CacheTelemetry::new(); - let capture = LogCapture::new(); - let _guard = tracing::subscriber::set_default(capture.subscriber()); + let _guard = tracing::subscriber::set_default(subscriber(&capture)); - telemetry.cache_hit("cache", Duration::from_secs(1)); + let request_id = next_request_id(); + futures::executor::block_on(async { + async { telemetry.record_hit("cache", Duration::from_secs(1), false) } + .with_request_id(request_id) + .await; + }); assert!(capture.output().is_empty()); } - /// Asserts that a telemetry helper emits the expected event string. + #[test] + fn logging_enabled_without_subscriber_is_noop() { + // logging_enabled=true but no tracing subscriber. + // No panic means tracing events degrade to a no-op cleanly. + let telemetry = CacheTelemetry::with_logging(); + let request_id = next_request_id(); + futures::executor::block_on( + async { + telemetry.record_hit("c", Duration::ZERO, false); + telemetry.record_get_error("c", Duration::ZERO, false); + telemetry.record_insert_rejected("c", false); + telemetry.complete_operation(request_id, "c", "cache.get", Duration::ZERO, true); + } + .with_request_id(request_id), + ); + // No panic = all paths handled gracefully without a subscriber. + } + #[cfg_attr(miri, ignore)] - fn assert_emits(f: impl FnOnce(&CacheTelemetry), expected: &str) { + fn assert_emits(expected: &str, f: impl FnOnce(&CacheTelemetry, RequestId)) { let capture = LogCapture::new(); - let _guard = tracing::subscriber::set_default(capture.subscriber()); + let _guard = tracing::subscriber::set_default(subscriber(&capture)); let telemetry = CacheTelemetry::with_logging(); - f(&telemetry); + let request_id = next_request_id(); + f(&telemetry, request_id); capture.assert_contains(expected); } #[cfg_attr(miri, ignore)] #[test] fn every_helper_emits_its_event() { - assert_emits(|t| t.cache_hit("c", Duration::ZERO), attributes::EVENT_HIT); - assert_emits(|t| t.cache_miss("c", Duration::ZERO), attributes::EVENT_MISS); - assert_emits(|t| t.cache_expired("c", Duration::ZERO), attributes::EVENT_EXPIRED); - assert_emits(|t| t.get_error("c", Duration::ZERO), attributes::EVENT_GET_ERROR); - assert_emits(|t| t.cache_fallback("c", Duration::ZERO), attributes::EVENT_FALLBACK); - assert_emits(|t| t.refresh_hit("c", Duration::ZERO), attributes::EVENT_REFRESH_HIT); - assert_emits(|t| t.refresh_miss("c", Duration::ZERO), attributes::EVENT_REFRESH_MISS); - assert_emits(|t| t.cache_inserted("c", Duration::ZERO), attributes::EVENT_INSERTED); - assert_emits(|t| t.cache_eviction("c", Duration::ZERO), attributes::EVENT_EVICTION); - assert_emits(|t| t.insert_rejected("c", Duration::ZERO), attributes::EVENT_INSERT_REJECTED); - assert_emits(|t| t.insert_error("c", Duration::ZERO), attributes::EVENT_INSERT_ERROR); - assert_emits(|t| t.cache_invalidated("c", Duration::ZERO), attributes::EVENT_INVALIDATED); - assert_emits(|t| t.invalidate_error("c", Duration::ZERO), attributes::EVENT_INVALIDATE_ERROR); - assert_emits(|t| t.cache_cleared("c", Duration::ZERO), attributes::EVENT_CLEARED); - assert_emits(|t| t.clear_error("c", Duration::ZERO), attributes::EVENT_CLEAR_ERROR); + assert_emits(attributes::EVENT_HIT, |t, request_id| { + futures::executor::block_on(async { + async { t.record_hit("c", Duration::ZERO, false) }.with_request_id(request_id).await; + }); + }); + assert_emits(attributes::EVENT_MISS, |t, request_id| { + futures::executor::block_on(async { + async { t.record_miss("c", Duration::ZERO, false) } + .with_request_id(request_id) + .await; + }); + }); + assert_emits(attributes::EVENT_EXPIRED, |t, request_id| { + futures::executor::block_on(async { + async { t.record_expired("c", Duration::ZERO, false) } + .with_request_id(request_id) + .await; + }); + }); + assert_emits(attributes::EVENT_GET_ERROR, |t, request_id| { + futures::executor::block_on(async { + async { t.record_get_error("c", Duration::ZERO, false) } + .with_request_id(request_id) + .await; + }); + }); + assert_emits(attributes::EVENT_REFRESH_HIT, |t, request_id| { + futures::executor::block_on(async { + async { t.record_refresh_hit("c", Duration::ZERO) } + .with_request_id(request_id) + .await; + }); + }); + assert_emits(attributes::EVENT_REFRESH_MISS, |t, request_id| { + futures::executor::block_on(async { + async { t.record_refresh_miss("c", Duration::ZERO) } + .with_request_id(request_id) + .await; + }); + }); + assert_emits(attributes::EVENT_INSERTED, |t, request_id| { + futures::executor::block_on(async { + async { t.record_inserted("c", Duration::ZERO, false) } + .with_request_id(request_id) + .await; + }); + }); + assert_emits(attributes::EVENT_INSERT_REJECTED, |t, request_id| { + futures::executor::block_on(async { + async { t.record_insert_rejected("c", false) }.with_request_id(request_id).await; + }); + }); + assert_emits(attributes::EVENT_INSERT_ERROR, |t, request_id| { + futures::executor::block_on(async { + async { t.record_insert_error("c", Duration::ZERO, false) } + .with_request_id(request_id) + .await; + }); + }); + assert_emits(attributes::EVENT_INVALIDATED, |t, request_id| { + futures::executor::block_on(async { + async { t.record_invalidated("c", Duration::ZERO, false) } + .with_request_id(request_id) + .await; + }); + }); + assert_emits(attributes::EVENT_INVALIDATE_ERROR, |t, request_id| { + futures::executor::block_on(async { + async { t.record_invalidate_error("c", Duration::ZERO, false) } + .with_request_id(request_id) + .await; + }); + }); + assert_emits(attributes::EVENT_CLEARED, |t, request_id| { + futures::executor::block_on(async { + async { t.record_cleared("c", Duration::ZERO, false) } + .with_request_id(request_id) + .await; + }); + }); + assert_emits(attributes::EVENT_CLEAR_ERROR, |t, request_id| { + futures::executor::block_on(async { + async { t.record_clear_error("c", Duration::ZERO, false) } + .with_request_id(request_id) + .await; + }); + }); + assert_emits(attributes::EVENT_EVICTION, |t, request_id| { + futures::executor::block_on(async { + async { t.record_eviction("c") }.with_request_id(request_id).await; + }); + }); + } + + #[test] + fn handler_receives_tier_and_operation_events_without_logging() { + type EventRecord = Vec<(RequestId, String, String, u128, bool)>; + + #[derive(Clone)] + struct RecordingHandler { + tier_events: Arc>, + operation_events: Arc>, + } + + impl CacheEventHandler for RecordingHandler { + fn on_tier_event(&self, event: &CacheTierEvent<'_>) { + self.tier_events.lock().expect("test handler mutex should not be poisoned").push(( + event.request_id, + event.tier_name.to_string(), + event.outcome.to_string(), + event.duration.as_nanos(), + event.fallback, + )); + } + + fn on_operation_complete(&self, event: &CacheOperationEvent<'_>) { + self.operation_events + .lock() + .expect("test handler mutex should not be poisoned") + .push(( + event.request_id, + event.cache_name.to_string(), + event.operation.to_string(), + event.duration.as_nanos(), + event.coalesced, + )); + } + } + + let tier_events = Arc::new(Mutex::new(Vec::new())); + let operation_events = Arc::new(Mutex::new(Vec::new())); + let telemetry = CacheTelemetry::new().with_handler(Arc::new(RecordingHandler { + tier_events: Arc::clone(&tier_events), + operation_events: Arc::clone(&operation_events), + })); + + let request_id = next_request_id(); + futures::executor::block_on( + async { + telemetry.record_hit("l2", Duration::from_nanos(7), true); + telemetry.complete_operation(request_id, "cache", "cache.get", Duration::from_nanos(11), true); + } + .with_request_id(request_id), + ); + + assert_eq!( + *tier_events.lock().expect("test handler mutex should not be poisoned"), + vec![(request_id, "l2".to_string(), attributes::EVENT_HIT.to_string(), 7, true)] + ); + assert_eq!( + *operation_events.lock().expect("test handler mutex should not be poisoned"), + vec![(request_id, "cache".to_string(), "cache.get".to_string(), 11, true)] + ); + } + + #[test] + fn next_request_id_returns_unique_incrementing_values() { + let a = next_request_id(); + let b = next_request_id(); + let c = next_request_id(); + assert!(b > a, "request IDs must increment: got {a} then {b}"); + assert!(c > b, "request IDs must increment: got {b} then {c}"); + } + + #[test] + fn with_request_id_resets_thread_local_after_completion() { + let request_id = next_request_id(); + futures::executor::block_on( + async { + assert_eq!( + CacheTelemetry::current_request_id(), + request_id, + "request_id should be set during poll" + ); + } + .with_request_id(request_id), + ); + assert_eq!( + CacheTelemetry::current_request_id(), + 0, + "request_id should be reset to 0 after WithRequestId completes" + ); + } + + #[test] + fn nested_with_request_id_restores_outer_id() { + use std::task::{Context, Poll, Waker}; + + let outer_id = next_request_id(); + let inner_id = next_request_id(); + + let waker = Waker::noop(); + + // Poll outer WithRequestId, which sets outer_id + let mut outer = std::pin::pin!( + async { + assert_eq!(CacheTelemetry::current_request_id(), outer_id); + + // Poll inner WithRequestId — sets inner_id, should restore outer_id on completion + let mut inner = std::pin::pin!( + async { + assert_eq!(CacheTelemetry::current_request_id(), inner_id); + } + .with_request_id(inner_id) + ); + let mut inner_cx = Context::from_waker(waker); + assert!(matches!(inner.as_mut().poll(&mut inner_cx), Poll::Ready(()))); + + // After inner completes, outer_id should be restored + assert_eq!( + CacheTelemetry::current_request_id(), + outer_id, + "outer request_id should be restored after nested WithRequestId" + ); + } + .with_request_id(outer_id) + ); + let mut outer_cx = Context::from_waker(waker); + assert!(matches!(outer.as_mut().poll(&mut outer_cx), Poll::Ready(()))); + + // After outer completes, should be reset to 0 + assert_eq!(CacheTelemetry::current_request_id(), 0); + } + + #[test] + fn eviction_handler_receives_request_id_from_calling_thread() { + type TierRecord = (RequestId, String, String); + type OpRecord = (RequestId, String, String); + + struct EvictionRecorder { + tier_events: Arc>>, + operation_events: Arc>>, + } + impl CacheEventHandler for EvictionRecorder { + fn on_tier_event(&self, event: &CacheTierEvent<'_>) { + self.tier_events.lock().expect("test mutex should not be poisoned").push(( + event.request_id, + event.tier_name.to_string(), + event.outcome.to_string(), + )); + } + fn on_operation_complete(&self, event: &CacheOperationEvent<'_>) { + self.operation_events.lock().expect("test mutex should not be poisoned").push(( + event.request_id, + event.cache_name.to_string(), + event.operation.to_string(), + )); + } + } + + let tier_events = Arc::new(Mutex::new(Vec::new())); + let operation_events = Arc::new(Mutex::new(Vec::new())); + let telemetry = CacheTelemetry::new().with_handler(Arc::new(EvictionRecorder { + tier_events: Arc::clone(&tier_events), + operation_events: Arc::clone(&operation_events), + })); + + let request_id = next_request_id(); + futures::executor::block_on( + async { + telemetry.record_eviction("my_cache"); + telemetry.complete_operation(request_id, "my_cache", "cache.insert", Duration::ZERO, false); + } + .with_request_id(request_id), + ); + + let tiers = tier_events.lock().expect("test mutex should not be poisoned"); + assert_eq!(tiers.len(), 1, "expected exactly one eviction tier event"); + assert_eq!(tiers[0].0, request_id, "eviction should carry the inserting thread's request_id"); + assert_eq!(tiers[0].2, attributes::EVENT_EVICTION); + + let ops = operation_events.lock().expect("test mutex should not be poisoned"); + assert_eq!(ops.len(), 1, "expected one operation complete event"); + assert_eq!(ops[0].0, request_id); + assert_eq!(ops[0].2, "cache.insert"); + } + + #[test] + fn eviction_without_request_context_has_zero_id() { + type TierRecord = (RequestId, String); + type OpRecord = (RequestId, String); + + struct IdRecorder { + tier_events: Arc>>, + operation_events: Arc>>, + } + impl CacheEventHandler for IdRecorder { + fn on_tier_event(&self, event: &CacheTierEvent<'_>) { + self.tier_events + .lock() + .expect("test mutex should not be poisoned") + .push((event.request_id, event.outcome.to_string())); + } + fn on_operation_complete(&self, event: &CacheOperationEvent<'_>) { + self.operation_events + .lock() + .expect("test mutex should not be poisoned") + .push((event.request_id, event.operation.to_string())); + } + } + + let tier_events = Arc::new(Mutex::new(Vec::new())); + let operation_events = Arc::new(Mutex::new(Vec::new())); + let telemetry = CacheTelemetry::new().with_handler(Arc::new(IdRecorder { + tier_events: Arc::clone(&tier_events), + operation_events: Arc::clone(&operation_events), + })); + + // No WithRequestId wrapper — simulates background maintenance thread + telemetry.record_eviction("bg_cache"); + telemetry.complete_operation(0, "bg_cache", "background", Duration::ZERO, false); + + let tiers = tier_events.lock().expect("test mutex should not be poisoned"); + assert_eq!(tiers.len(), 1); + assert_eq!(tiers[0].0, 0, "background eviction should have request_id 0"); + + let ops = operation_events.lock().expect("test mutex should not be poisoned"); + assert_eq!(ops.len(), 1); + assert_eq!(ops[0].0, 0); } } diff --git a/crates/cachet/src/telemetry/ext.rs b/crates/cachet/src/telemetry/ext.rs deleted file mode 100644 index 1f1c18bc6..000000000 --- a/crates/cachet/src/telemetry/ext.rs +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -//! Extension traits for telemetry recording. - -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; - -use pin_project_lite::pin_project; -use tick::{Clock, Stopwatch}; - -/// Result of a timed async operation. -#[derive(Debug, Clone, Copy)] -pub struct TimedResult { - /// The result of the operation. - pub result: R, - /// The duration of the operation. - pub duration: Duration, -} - -pin_project! { - /// A future that times the inner future's execution. - #[must_use = "futures do nothing unless polled"] - pub struct Timed { - #[pin] - inner: F, - watch: Stopwatch, - } -} - -impl Future for Timed { - type Output = TimedResult; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - match this.inner.poll(cx) { - Poll::Ready(result) => Poll::Ready(TimedResult { - result, - duration: this.watch.elapsed(), - }), - Poll::Pending => Poll::Pending, - } - } -} - -/// Extension trait for timing async operations. -pub trait ClockExt { - /// Times an async operation and returns both the result and elapsed duration. - fn timed_async(&self, f: F) -> Timed - where - F: Future; -} - -impl ClockExt for Clock { - fn timed_async(&self, f: F) -> Timed - where - F: Future, - { - Timed { - inner: f, - watch: self.stopwatch(), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - fn block_on(f: F) -> F::Output { - futures::executor::block_on(f) - } - - #[test] - fn clock_ext_timed_async_measures_duration() { - block_on(async { - let control = tick::ClockControl::new(); - let clock = control.to_clock(); - - let timed = clock - .timed_async(async { - control.advance(Duration::from_millis(100)); - 42 - }) - .await; - - assert_eq!(timed.result, 42); - assert_eq!(timed.duration, Duration::from_millis(100)); - }); - } - - #[test] - fn clock_ext_timed_async_handles_pending() { - use std::pin::Pin; - use std::sync::Arc; - use std::sync::atomic::{AtomicBool, Ordering}; - use std::task::{Context, Poll}; - - /// A future that returns Pending on the first poll, then Ready on the second. - struct YieldOnce { - yielded: Arc, - } - - impl std::future::Future for YieldOnce { - type Output = i32; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.yielded.swap(true, Ordering::SeqCst) { - Poll::Ready(99) - } else { - cx.waker().wake_by_ref(); - Poll::Pending - } - } - } - - block_on(async { - let control = tick::ClockControl::new(); - let clock = control.to_clock(); - - let timed = clock - .timed_async(YieldOnce { - yielded: Arc::new(AtomicBool::new(false)), - }) - .await; - - assert_eq!(timed.result, 99); - }); - } -} diff --git a/crates/cachet/src/telemetry/handler.rs b/crates/cachet/src/telemetry/handler.rs new file mode 100644 index 000000000..6cedb1774 --- /dev/null +++ b/crates/cachet/src/telemetry/handler.rs @@ -0,0 +1,72 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +use std::time::Duration; + +/// Unique identifier for a cache operation, used to correlate tier events +/// with their parent operation. Generated from a process-wide atomic counter. +pub type RequestId = u64; + +/// Data from a per-tier cache operation. +#[non_exhaustive] +#[derive(Debug, Clone)] +pub struct CacheTierEvent<'a> { + /// Identifies which top-level operation this tier event belongs to. + pub request_id: RequestId, + /// Name of the cache tier (for example, "L1" or "L2"). + pub tier_name: &'a str, + /// Outcome event name (e.g., `attributes::EVENT_HIT`). + pub outcome: &'a str, + /// How long the tier operation took. + pub duration: Duration, + /// Whether this tier was consulted as a fallback. + pub fallback: bool, +} + +/// Data from a completed top-level cache operation. +#[non_exhaustive] +#[derive(Debug, Clone)] +pub struct CacheOperationEvent<'a> { + /// Identifies this operation. Matches `request_id` on associated tier events. + pub request_id: RequestId, + /// Name of the cache. + pub cache_name: &'a str, + /// The operation name (e.g., "cache.get", "cache.insert"). + pub operation: &'a str, + /// Total duration of the operation. + pub duration: Duration, + /// Whether this operation ran with stampede protection enabled. + pub coalesced: bool, +} + +/// Trait for consuming cachet telemetry events. +/// +/// Implement this trait to receive structured callbacks for cache operations. +/// Register via [`CacheBuilder::event_handler`](crate::CacheBuilder::event_handler). +/// +/// # Example +/// +/// ```ignore +/// use cachet::telemetry::handler::{CacheEventHandler, CacheOperationEvent, CacheTierEvent}; +/// +/// struct MyHandler; +/// +/// impl CacheEventHandler for MyHandler { +/// fn on_tier_event(&self, event: &CacheTierEvent<'_>) { +/// println!("tier {} = {} ({}ns)", event.tier_name, event.outcome, event.duration.as_nanos()); +/// } +/// +/// fn on_operation_complete(&self, event: &CacheOperationEvent<'_>) { +/// println!("op {} took {}ns", event.operation, event.duration.as_nanos()); +/// } +/// } +/// ``` +pub trait CacheEventHandler: Send + Sync { + /// Called for each per-tier sub-operation. + /// + /// May be called multiple times per top-level operation (once per tier). + fn on_tier_event(&self, event: &CacheTierEvent<'_>); + + /// Called once when the top-level cache operation completes. + fn on_operation_complete(&self, event: &CacheOperationEvent<'_>); +} diff --git a/crates/cachet/src/telemetry/mod.rs b/crates/cachet/src/telemetry/mod.rs index 6848ee95f..d2048e577 100644 --- a/crates/cachet/src/telemetry/mod.rs +++ b/crates/cachet/src/telemetry/mod.rs @@ -4,16 +4,18 @@ //! Cache telemetry integration. //! //! This module provides telemetry recording for cache operations via the -//! `tracing` crate. Enable structured logging through the cache builder's -//! `enable_logs()` method. +//! `tracing` crate and the [`handler`] callback API. Enable structured logging +//! through the cache builder's `enable_logs()` method. //! //! Consumers can subscribe to cache events using a custom -//! `tracing_subscriber::Layer` and the public constants in [`attributes`]. +//! `tracing_subscriber::Layer` and the public constants in [`attributes`], or +//! register a [`handler::CacheEventHandler`] with the cache builder. //! See the `telemetry_subscriber` example for a complete demonstration. pub mod attributes; pub(crate) mod cache; -pub(crate) mod ext; +/// Callback-based telemetry handlers. +pub mod handler; #[doc(inline)] pub use cache::CacheTelemetry; diff --git a/crates/cachet/src/wrapper.rs b/crates/cachet/src/wrapper.rs index 60c1f27c1..6317fcef3 100644 --- a/crates/cachet/src/wrapper.rs +++ b/crates/cachet/src/wrapper.rs @@ -15,7 +15,6 @@ use tick::Clock; use crate::cache::CacheName; use crate::telemetry::CacheTelemetry; -use crate::telemetry::ext::ClockExt; use crate::{CacheEntry, Error, InsertPolicy}; /// Wraps a cache tier with telemetry and TTL expiration. @@ -51,6 +50,7 @@ pub struct CacheWrapper { pub(crate) ttl: Option, pub(crate) telemetry: CacheTelemetry, pub(crate) policy: InsertPolicy, + pub(crate) fallback: bool, _phantom: PhantomData<(K, V)>, } @@ -62,6 +62,7 @@ impl CacheWrapper { ttl: Option, telemetry: CacheTelemetry, policy: InsertPolicy, + fallback: bool, ) -> Self { Self { name, @@ -70,6 +71,7 @@ impl CacheWrapper { ttl, telemetry, policy, + fallback, _phantom: PhantomData, } } @@ -107,14 +109,14 @@ where fn handle_get_result(&self, value: Option>, duration: Duration) -> Option> { if let Some(entry) = value { if self.is_expired(&entry) { - self.telemetry.cache_expired(self.name, duration); + self.telemetry.record_expired(self.name, duration, self.fallback); None } else { - self.telemetry.cache_hit(self.name, duration); + self.telemetry.record_hit(self.name, duration, self.fallback); Some(entry) } } else { - self.telemetry.cache_miss(self.name, duration); + self.telemetry.record_miss(self.name, duration, self.fallback); None } } @@ -127,11 +129,11 @@ where CT: CacheTier + Send + Sync, { async fn get(&self, key: &K) -> Result>, Error> { - let timed = self.clock.timed_async(self.inner.get(key)).await; - match timed.result { - Ok(value) => Ok(self.handle_get_result(value, timed.duration)), + let watch = self.clock.stopwatch(); + match self.inner.get(key).await { + Ok(value) => Ok(self.handle_get_result(value, watch.elapsed())), Err(e) => { - self.telemetry.get_error(self.name, timed.duration); + self.telemetry.record_get_error(self.name, watch.elapsed(), self.fallback); Err(e) } } @@ -140,46 +142,37 @@ where async fn insert(&self, key: K, mut entry: CacheEntry) -> Result<(), Error> { entry.ensure_cached_at(self.clock.system_time()); if !self.policy.should_insert(&entry) { - self.telemetry.insert_rejected(self.name, Duration::default()); + self.telemetry.record_insert_rejected(self.name, self.fallback); return Ok(()); } - let timed = self.clock.timed_async(self.inner.insert(key, entry)).await; - match &timed.result { - Ok(()) => { - self.telemetry.cache_inserted(self.name, timed.duration); - } - Err(_) => { - self.telemetry.insert_error(self.name, timed.duration); - } + let watch = self.clock.stopwatch(); + let result = self.inner.insert(key, entry).await; + match &result { + Ok(()) => self.telemetry.record_inserted(self.name, watch.elapsed(), self.fallback), + Err(_) => self.telemetry.record_insert_error(self.name, watch.elapsed(), self.fallback), } - timed.result + result } async fn invalidate(&self, key: &K) -> Result<(), Error> { - let timed = self.clock.timed_async(self.inner.invalidate(key)).await; - match &timed.result { - Ok(()) => { - self.telemetry.cache_invalidated(self.name, timed.duration); - } - Err(_) => { - self.telemetry.invalidate_error(self.name, timed.duration); - } + let watch = self.clock.stopwatch(); + let result = self.inner.invalidate(key).await; + match &result { + Ok(()) => self.telemetry.record_invalidated(self.name, watch.elapsed(), self.fallback), + Err(_) => self.telemetry.record_invalidate_error(self.name, watch.elapsed(), self.fallback), } - timed.result + result } async fn clear(&self) -> Result<(), Error> { - let timed = self.clock.timed_async(self.inner.clear()).await; - match &timed.result { - Ok(()) => { - self.telemetry.cache_cleared(self.name, timed.duration); - } - Err(_) => { - self.telemetry.clear_error(self.name, timed.duration); - } + let watch = self.clock.stopwatch(); + let result = self.inner.clear().await; + match &result { + Ok(()) => self.telemetry.record_cleared(self.name, watch.elapsed(), self.fallback), + Err(_) => self.telemetry.record_clear_error(self.name, watch.elapsed(), self.fallback), } - timed.result + result } async fn len(&self) -> Result { @@ -198,7 +191,8 @@ mod tests { let clock = Clock::new_frozen(); let inner = MockCache::::new(); let telemetry = CacheTelemetry::new(); - let wrapper: CacheWrapper = CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default()); + let wrapper: CacheWrapper = + CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default(), false); // Entry without TTL should not be expired let entry = CacheEntry::new(42); @@ -217,6 +211,7 @@ mod tests { Some(Duration::from_mins(1)), telemetry, InsertPolicy::default(), + false, ); // Entry without cached_at should be expired if TTL is configured (treat as expired to be safe) @@ -233,8 +228,15 @@ mod tests { let telemetry = CacheTelemetry::new(); let tier_ttl = Duration::from_mins(1); let entry_ttl = Duration::from_secs(30); - let wrapper: CacheWrapper = - CacheWrapper::new("test", inner, clock.clone(), Some(tier_ttl), telemetry, InsertPolicy::default()); + let wrapper: CacheWrapper = CacheWrapper::new( + "test", + inner, + clock.clone(), + Some(tier_ttl), + telemetry, + InsertPolicy::default(), + false, + ); let entry = CacheEntry::expires_at(42, entry_ttl, clock.system_time()); @@ -251,7 +253,8 @@ mod tests { let inner = MockCache::::new(); let inner_check = inner.clone(); let telemetry = CacheTelemetry::new(); - let wrapper: CacheWrapper = CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default()); + let wrapper: CacheWrapper = + CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default(), false); let entry = CacheEntry::new(42); wrapper.insert("key".to_string(), entry).await.unwrap(); @@ -269,7 +272,7 @@ mod tests { let telemetry = CacheTelemetry::new(); let tier_ttl = Duration::from_mins(1); let wrapper: CacheWrapper = - CacheWrapper::new("test", inner, clock, Some(tier_ttl), telemetry, InsertPolicy::default()); + CacheWrapper::new("test", inner, clock, Some(tier_ttl), telemetry, InsertPolicy::default(), false); let entry = CacheEntry::new(42); wrapper.insert("key".to_string(), entry).await.unwrap(); @@ -290,6 +293,7 @@ mod tests { Some(Duration::from_mins(1)), telemetry, InsertPolicy::default(), + false, ); // Entry with cached_at in the future simulates clock going backward @@ -304,7 +308,7 @@ mod tests { let telemetry = CacheTelemetry::new(); let ttl = Duration::from_mins(1); let wrapper: CacheWrapper = - CacheWrapper::new("test", inner, clock.clone(), Some(ttl), telemetry, InsertPolicy::default()); + CacheWrapper::new("test", inner, clock.clone(), Some(ttl), telemetry, InsertPolicy::default(), false); // Entry cached exactly TTL ago → elapsed == ttl → should NOT be expired (uses >) let entry = CacheEntry::expires_at(42, ttl, clock.system_time() - ttl); @@ -316,7 +320,8 @@ mod tests { let clock = Clock::new_frozen(); let inner = MockCache::::new(); let telemetry = CacheTelemetry::new(); - let wrapper: CacheWrapper = CacheWrapper::new("mock_test", inner, clock, None, telemetry, InsertPolicy::default()); + let wrapper: CacheWrapper = + CacheWrapper::new("mock_test", inner, clock, None, telemetry, InsertPolicy::default(), false); assert_eq!(wrapper.name(), "mock_test"); } @@ -325,7 +330,8 @@ mod tests { let clock = Clock::new_frozen(); let inner = MockCache::::new(); let telemetry = CacheTelemetry::new(); - let wrapper: CacheWrapper = CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default()); + let wrapper: CacheWrapper = + CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default(), false); let result = wrapper.handle_get_result(None, Duration::from_secs(0)); assert!(result.is_none()); } @@ -342,6 +348,7 @@ mod tests { Some(Duration::from_mins(1)), telemetry, InsertPolicy::default(), + false, ); // Entry without cached_at → considered expired let entry = CacheEntry::new(42); @@ -354,7 +361,8 @@ mod tests { let clock = Clock::new_frozen(); let inner = MockCache::::new(); let telemetry = CacheTelemetry::new(); - let wrapper: CacheWrapper = CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default()); + let wrapper: CacheWrapper = + CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default(), false); let entry = CacheEntry::new(42); let result = wrapper.handle_get_result(Some(entry), Duration::from_secs(0)); assert!(result.is_some()); @@ -366,7 +374,8 @@ mod tests { let clock = Clock::new_frozen(); let inner = MockCache::::new(); let telemetry = CacheTelemetry::new(); - let wrapper: CacheWrapper = CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default()); + let wrapper: CacheWrapper = + CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default(), false); // get miss assert!(wrapper.get(&"key".to_string()).await.unwrap().is_none()); @@ -392,7 +401,8 @@ mod tests { let clock = Clock::new_frozen(); let inner = MockCache::::new(); let telemetry = CacheTelemetry::new(); - let wrapper: CacheWrapper = CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default()); + let wrapper: CacheWrapper = + CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default(), false); assert_eq!(wrapper.len().await.expect("len should return Ok"), 0); wrapper.insert("key".to_string(), CacheEntry::new(1)).await.unwrap(); assert_eq!(wrapper.len().await.expect("len should return Ok"), 1); @@ -405,7 +415,8 @@ mod tests { let inner = MockCache::::new(); inner.fail_when(|op| matches!(op, cachet_tier::CacheOp::Get(_))); let telemetry = CacheTelemetry::new(); - let wrapper: CacheWrapper = CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default()); + let wrapper: CacheWrapper = + CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default(), false); let result = wrapper.get(&"key".to_string()).await; result.unwrap_err(); } @@ -417,7 +428,8 @@ mod tests { let inner = MockCache::::new(); inner.fail_when(|op| matches!(op, cachet_tier::CacheOp::Insert { .. })); let telemetry = CacheTelemetry::new(); - let wrapper: CacheWrapper = CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default()); + let wrapper: CacheWrapper = + CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default(), false); let result = wrapper.insert("key".to_string(), CacheEntry::new(1)).await; result.unwrap_err(); } @@ -429,7 +441,8 @@ mod tests { let inner = MockCache::::new(); inner.fail_when(|op| matches!(op, cachet_tier::CacheOp::Invalidate(_))); let telemetry = CacheTelemetry::new(); - let wrapper: CacheWrapper = CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default()); + let wrapper: CacheWrapper = + CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default(), false); let result = wrapper.invalidate(&"key".to_string()).await; result.unwrap_err(); } @@ -441,7 +454,8 @@ mod tests { let inner = MockCache::::new(); inner.fail_when(|op| matches!(op, cachet_tier::CacheOp::Clear)); let telemetry = CacheTelemetry::new(); - let wrapper: CacheWrapper = CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default()); + let wrapper: CacheWrapper = + CacheWrapper::new("test", inner, clock, None, telemetry, InsertPolicy::default(), false); let result = wrapper.clear().await; result.unwrap_err(); } diff --git a/crates/cachet/tests/cache.rs b/crates/cachet/tests/cache.rs index 22adda07b..9606ad39c 100644 --- a/crates/cachet/tests/cache.rs +++ b/crates/cachet/tests/cache.rs @@ -911,3 +911,48 @@ mod service_tests { assert!(cache.get(&"key".to_string()).await.unwrap().is_none()); } } + +#[cfg(feature = "logs")] +#[cfg_attr(miri, ignore)] +#[tokio::test] +async fn get_or_insert_with_logging_emits_operation_and_events() { + let capture = testing_aids::LogCapture::new(); + let _guard = tracing::subscriber::set_default(capture.subscriber()); + + let clock = Clock::new_frozen(); + let cache = Cache::builder::(clock).memory().enable_logs().build(); + + let _ = cache.get_or_insert(&"k".to_string(), || async { 1 }).await; + capture.assert_contains("cache.get_or_insert"); + capture.assert_contains("cache.miss"); +} + +#[cfg(feature = "logs")] +#[cfg_attr(miri, ignore)] +#[tokio::test] +async fn try_get_or_insert_with_logging_emits_operation_and_events() { + let capture = testing_aids::LogCapture::new(); + let _guard = tracing::subscriber::set_default(capture.subscriber()); + + let clock = Clock::new_frozen(); + let cache = Cache::builder::(clock).memory().enable_logs().build(); + + let _ = cache.try_get_or_insert(&"k".to_string(), || async { Ok::<_, Error>(1) }).await; + capture.assert_contains("cache.try_get_or_insert"); + capture.assert_contains("cache.miss"); +} + +#[cfg(feature = "logs")] +#[cfg_attr(miri, ignore)] +#[tokio::test] +async fn optionally_get_or_insert_with_logging_emits_operation_and_events() { + let capture = testing_aids::LogCapture::new(); + let _guard = tracing::subscriber::set_default(capture.subscriber()); + + let clock = Clock::new_frozen(); + let cache = Cache::builder::(clock).memory().enable_logs().build(); + + let _ = cache.optionally_get_or_insert(&"k".to_string(), || async { Some(1) }).await; + capture.assert_contains("cache.optionally_get_or_insert"); + capture.assert_contains("cache.miss"); +}