Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fuzzy-ravens-listen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/cluster": patch
---

Fix entity defect restarts losing active durable requests during replay.
23 changes: 20 additions & 3 deletions packages/cluster/src/internal/entityManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ export const make = Effect.fnUntraced(function*<

const activeRequests: EntityState["activeRequests"] = new Map()
let defectRequestIds: Array<bigint> = []
let isRestartingAfterDefect = false
const withDefectRestartLock = Effect.unsafeMakeSemaphore(1).withPermits(1)

// the server is stored in a ref, so if there is a defect, we can
// swap the server without losing the active requests
Expand Down Expand Up @@ -192,7 +194,11 @@ export const make = Effect.fnUntraced(function*<
Exit.isInterrupted(response.exit) &&
(isShuttingDown || Uninterruptible.forServer(request.rpc.annotations))
) {
if (!isShuttingDown) {
if (isRestartingAfterDefect && isShuttingDown) {
// Closing the old server during a defect restart interrupts active handlers.
// Keep durable requests registered so the new server can replay them below.
return Effect.void
} else if (!isShuttingDown) {
return server.write(0, {
...request.message.envelope,
id: RequestId(request.message.envelope.requestId),
Expand Down Expand Up @@ -279,7 +285,9 @@ export const make = Effect.fnUntraced(function*<

if (defectRequestIds.length > 0) {
for (const id of defectRequestIds) {
const { lastSentChunk, message } = activeRequests.get(id)!
const request = activeRequests.get(id)
if (!request) continue
const { lastSentChunk, message } = request
yield* server.write(0, {
...message.envelope,
id: RequestId(message.envelope.requestId),
Expand All @@ -298,9 +306,16 @@ export const make = Effect.fnUntraced(function*<
)

function onDefect(cause: Cause.Cause<never>): Effect.Effect<void> {
return withDefectRestartLock(Effect.suspend(() => restartOnDefect(cause))).pipe(
Effect.catchAllCause(onDefect)
)
}

function restartOnDefect(cause: Cause.Cause<never>): Effect.Effect<void> {
if (!activeServers.has(address.entityId)) {
return endLatch.open
}
isRestartingAfterDefect = true
const effect = writeRef.unsafeRebuild()
defectRequestIds = Array.from(activeRequests.keys())
return Effect.logError("Defect in entity, restarting", cause).pipe(
Expand All @@ -311,7 +326,9 @@ export const make = Effect.fnUntraced(function*<
address,
runner: options.runnerAddress
}),
Effect.catchAllCause(onDefect)
Effect.ensuring(Effect.sync(() => {
isRestartingAfterDefect = false
}))
)
}

Expand Down
18 changes: 18 additions & 0 deletions packages/cluster/test/Sharding.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,24 @@ describe.concurrent("Sharding", () => {
expect(result).toEqual(new User({ id: 123, name: "User 123" }))
expect(state.layerBuilds.current).toEqual(2)
}).pipe(Effect.provide(TestSharding)))

it.scoped("restart on defect with another active request", () =>
Effect.gen(function*() {
yield* TestClock.adjust(1)
const state = yield* TestEntityState
const makeClient = yield* TestEntity.client
const client = makeClient("1")

const fiber = yield* client.NeverFork().pipe(Effect.fork)
yield* TestClock.adjust(1)

MutableRef.set(state.defectTrigger, true)
const result = yield* client.GetUser({ id: 123 })
expect(result).toEqual(new User({ id: 123, name: "User 123" }))
expect(state.layerBuilds.current).toEqual(2)

yield* Fiber.interrupt(fiber)
}).pipe(Effect.provide(TestSharding)))
})

const TestShardingConfig = ShardingConfig.layer({
Expand Down
Loading