Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ rust-version.workspace = true
workspace = true

[dependencies]
tokio = { workspace = true, optional = true }
tokio.workspace = true
async-task = { version = "4.4", default-features = false, optional = true }
spin = { version = "0.9", default-features = false, features = ["mutex", "spin_mutex"], optional = true }
libc = { version = "0.2", optional = true }
Expand All @@ -20,5 +20,5 @@ futures.workspace = true

[features]
default = ["tokio"]
tokio = ["dep:tokio"]
tokio = []
simulation = ["dep:async-task", "dep:spin", "dep:libc"]
84 changes: 56 additions & 28 deletions crates/runtime/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
#[cfg(all(feature = "tokio", feature = "simulation"))]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[cfg(all(feature = "tokio", feature = "simulation"))]
#[cfg(or(all(feature = "tokio", feature = "simulation"), not(any(feature = "tokio", feature = "simulation")))]

Not that this is much clearer, feel free to ignore 😅

compile_error!(
"spacetimedb-runtime requires exactly one runtime backend: enable either `tokio` or `simulation`, not both"
);

#[cfg(not(any(feature = "tokio", feature = "simulation")))]
compile_error!("spacetimedb-runtime requires exactly one runtime backend: enable either `tokio` or `simulation`");

#[cfg(feature = "simulation")]
extern crate alloc;

Expand All @@ -15,8 +23,19 @@ pub mod sim;
#[cfg(feature = "simulation")]
pub mod sim_std;

#[cfg(feature = "tokio")]
pub type TokioHandle = tokio::runtime::Handle;
pub type TokioRuntime = tokio::runtime::Runtime;
pub type TokioRuntimeBuilder = tokio::runtime::Builder;

// We intentionally re-export `tokio::sync` even when the simulation backend is
// selected. Async and non-blocking synchronization operations are
// executor-agnostic, so driving them from the deterministic simulation runtime
// remains deterministic.
//
// Callers must avoid APIs that block or park OS threads on their own, such as

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any other known besides blocking_send? We should at least be documenting those very loudly to avoid surprises down the line.

// `blocking_send`, because those semantics are outside the simulation runtime's
// deterministic scheduler.
pub use tokio::sync;

#[derive(Clone)]
pub enum Handle {
Expand Down Expand Up @@ -74,15 +93,22 @@ enum JoinErrorInner {
Simulation(sim::JoinError),
}

#[cfg(feature = "tokio")]
impl From<tokio::task::AbortHandle> for AbortHandle {
fn from(handle: tokio::task::AbortHandle) -> Self {
Self {
inner: AbortHandleInner::Tokio(handle),
}
}
}

impl AbortHandle {
pub fn abort(&self) {
match &self.inner {
#[cfg(feature = "tokio")]
AbortHandleInner::Tokio(handle) => handle.abort(),
#[cfg(feature = "simulation")]
AbortHandleInner::Simulation(handle) => handle.abort(),
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
_ => unreachable!("runtime abort handle has no enabled backend"),
}
}
}
Expand All @@ -100,16 +126,10 @@ impl JoinErrorInner {

impl fmt::Display for JoinError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
let _ = f;
#[cfg(any(feature = "tokio", feature = "simulation"))]
return self.inner.fmt(f);
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
unreachable!("runtime join error has no enabled backend")
self.inner.fmt(f)
}
}

#[cfg(any(feature = "tokio", feature = "simulation"))]
impl std::error::Error for JoinError {}

impl<T> JoinHandleInner<T> {
Expand Down Expand Up @@ -160,8 +180,6 @@ impl<T> Future for JoinHandle<T> {
type Output = Result<T, JoinError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
let _ = cx;
match self.inner.poll_result(cx) {
Poll::Ready(Ok(output)) => {
self.inner = JoinHandleInner::Detached(PhantomData);
Expand Down Expand Up @@ -197,17 +215,30 @@ impl fmt::Display for RuntimeTimeout {
}
}

#[cfg(any(feature = "tokio", feature = "simulation"))]
impl std::error::Error for RuntimeTimeout {}

#[cfg(feature = "tokio")]
impl Handle {
pub fn tokio(handle: TokioHandle) -> Self {
Self::Tokio(handle)
#[cfg(feature = "tokio")]
{
Self::Tokio(handle)
}
#[cfg(not(feature = "tokio"))]
{
let _ = handle;
panic!("spacetimedb-runtime tokio handle requested without the `tokio` backend enabled")
}
}

pub fn tokio_current() -> Self {
Self::tokio(TokioHandle::current())
#[cfg(feature = "tokio")]
{
Self::tokio(TokioHandle::current())
}
#[cfg(not(feature = "tokio"))]
{
panic!("spacetimedb-runtime current tokio handle requested without the `tokio` backend enabled")
}
}
}

Expand All @@ -220,8 +251,6 @@ impl Handle {

impl Handle {
pub fn spawn<T: Send + 'static>(&self, future: impl Future<Output = T> + Send + 'static) -> JoinHandle<T> {
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
let _ = future;
match self {
#[cfg(feature = "tokio")]
Self::Tokio(handle) => JoinHandle {
Expand All @@ -231,8 +260,6 @@ impl Handle {
Self::Simulation(handle) => JoinHandle {
inner: JoinHandleInner::Simulation(handle.spawn_on(sim::NodeId::MAIN, future)),
},
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
_ => unreachable!("runtime dispatch has no enabled backend"),
}
}

Expand All @@ -241,8 +268,6 @@ impl Handle {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
let _ = &f;
match self {
#[cfg(feature = "tokio")]
Self::Tokio(_) => tokio::task::spawn_blocking(f)
Expand All @@ -261,8 +286,6 @@ impl Handle {
.spawn_on(sim::NodeId::MAIN, async move { f() })
.await
.expect("simulation spawn_blocking task should not be cancelled"),
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
_ => unreachable!("runtime dispatch has no enabled backend"),
}
}

Expand All @@ -271,17 +294,22 @@ impl Handle {
timeout_after: Duration,
future: impl Future<Output = T>,
) -> Result<T, RuntimeTimeout> {
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
let _ = (timeout_after, future);
match self {
#[cfg(feature = "tokio")]
Self::Tokio(_) => tokio::time::timeout(timeout_after, future)
.await
.map_err(|_| RuntimeTimeout),
#[cfg(feature = "simulation")]
Self::Simulation(handle) => handle.timeout(timeout_after, future).await.map_err(|_| RuntimeTimeout),
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
_ => unreachable!("runtime dispatch has no enabled backend"),
}
}

pub async fn sleep(&self, duration: Duration) {
match self {
#[cfg(feature = "tokio")]
Self::Tokio(_) => tokio::time::sleep(duration).await,
#[cfg(feature = "simulation")]
Self::Simulation(handle) => handle.sleep(duration).await,
}
}
}
Expand Down
Loading