diff --git a/dotnet/agent-framework-dotnet.slnx b/dotnet/agent-framework-dotnet.slnx index 246b3e7e7b..1cee9a6ae6 100644 --- a/dotnet/agent-framework-dotnet.slnx +++ b/dotnet/agent-framework-dotnet.slnx @@ -55,10 +55,12 @@ + + @@ -475,4 +477,4 @@ - \ No newline at end of file + diff --git a/dotnet/samples/Durable/Workflow/AzureFunctions/03_WorkflowHITL/03_WorkflowHITL.csproj b/dotnet/samples/Durable/Workflow/AzureFunctions/03_WorkflowHITL/03_WorkflowHITL.csproj new file mode 100644 index 0000000000..c569deacd0 --- /dev/null +++ b/dotnet/samples/Durable/Workflow/AzureFunctions/03_WorkflowHITL/03_WorkflowHITL.csproj @@ -0,0 +1,43 @@ + + + net10.0 + v4 + Exe + enable + enable + + WorkflowHITLFunctions + WorkflowHITLFunctions + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/dotnet/samples/Durable/Workflow/AzureFunctions/03_WorkflowHITL/Executors.cs b/dotnet/samples/Durable/Workflow/AzureFunctions/03_WorkflowHITL/Executors.cs new file mode 100644 index 0000000000..c299ee2cd5 --- /dev/null +++ b/dotnet/samples/Durable/Workflow/AzureFunctions/03_WorkflowHITL/Executors.cs @@ -0,0 +1,63 @@ +// Copyright (c) Microsoft. All rights reserved. + +using Microsoft.Agents.AI.Workflows; + +namespace WorkflowHITLFunctions; + +/// Expense approval request passed to the RequestPort. +public record ApprovalRequest(string ExpenseId, decimal Amount, string EmployeeName); + +/// Approval response received from the RequestPort. +public record ApprovalResponse(bool Approved, string? Comments); + +/// Looks up expense details and creates an approval request. +internal sealed class CreateApprovalRequest() : Executor("RetrieveRequest") +{ + public override ValueTask HandleAsync( + string message, + IWorkflowContext context, + CancellationToken cancellationToken = default) + { + // In a real scenario, this would look up expense details from a database + return new ValueTask(new ApprovalRequest(message, 1500.00m, "Jerry")); + } +} + +/// Prepares the approval request for finance review after manager approval. +internal sealed class PrepareFinanceReview() : Executor("PrepareFinanceReview") +{ + public override ValueTask HandleAsync( + ApprovalResponse message, + IWorkflowContext context, + CancellationToken cancellationToken = default) + { + if (!message.Approved) + { + throw new InvalidOperationException("Cannot proceed to finance review — manager denied the expense."); + } + + // In a real scenario, this would retrieve the original expense details + return new ValueTask(new ApprovalRequest("EXP-2025-001", 1500.00m, "Jerry")); + } +} + +/// Processes the expense reimbursement based on the parallel approval responses. +internal sealed class ExpenseReimburse() : Executor("Reimburse") +{ + public override async ValueTask HandleAsync( + ApprovalResponse[] message, + IWorkflowContext context, + CancellationToken cancellationToken = default) + { + // Check that all parallel approvals passed + ApprovalResponse? denied = Array.Find(message, r => !r.Approved); + if (denied is not null) + { + return $"Expense reimbursement denied. Comments: {denied.Comments}"; + } + + // Simulate payment processing + await Task.Delay(1000, cancellationToken); + return $"Expense reimbursed at {DateTime.UtcNow:O}"; + } +} diff --git a/dotnet/samples/Durable/Workflow/AzureFunctions/03_WorkflowHITL/Program.cs b/dotnet/samples/Durable/Workflow/AzureFunctions/03_WorkflowHITL/Program.cs new file mode 100644 index 0000000000..65f4b7acba --- /dev/null +++ b/dotnet/samples/Durable/Workflow/AzureFunctions/03_WorkflowHITL/Program.cs @@ -0,0 +1,51 @@ +// Copyright (c) Microsoft. All rights reserved. + +// This sample demonstrates a Human-in-the-Loop (HITL) workflow hosted in Azure Functions. +// +// ┌──────────────────────┐ ┌────────────────┐ ┌─────────────────────┐ ┌────────────────────┐ +// │ CreateApprovalRequest│──►│ManagerApproval │──►│PrepareFinanceReview │──┬►│ BudgetApproval │──┐ +// └──────────────────────┘ │ (RequestPort) │ └─────────────────────┘ │ │ (RequestPort) │ │ +// └────────────────┘ │ └────────────────────┘ │ ┌─────────────────┐ +// │ ├─►│ExpenseReimburse │ +// │ ┌────────────────────┐ │ └─────────────────┘ +// └►│ComplianceApproval │──┘ +// │ (RequestPort) │ +// └────────────────────┘ +// +// The workflow pauses at three RequestPorts — one for the manager, then two in parallel for finance. +// After manager approval, BudgetApproval and ComplianceApproval run concurrently via fan-out/fan-in. +// The framework auto-generates three HTTP endpoints for each workflow: +// POST /api/workflows/{name}/run - Start the workflow +// GET /api/workflows/{name}/status/{id} - Check status and pending approvals +// POST /api/workflows/{name}/respond/{id} - Send approval response to resume + +using Microsoft.Agents.AI.Hosting.AzureFunctions; +using Microsoft.Agents.AI.Workflows; +using Microsoft.Azure.Functions.Worker.Builder; +using Microsoft.Extensions.Hosting; +using WorkflowHITLFunctions; + +// Define executors and RequestPorts for the three HITL pause points +CreateApprovalRequest createRequest = new(); +RequestPort managerApproval = RequestPort.Create("ManagerApproval"); +PrepareFinanceReview prepareFinanceReview = new(); +RequestPort budgetApproval = RequestPort.Create("BudgetApproval"); +RequestPort complianceApproval = RequestPort.Create("ComplianceApproval"); +ExpenseReimburse reimburse = new(); + +// Build the workflow: CreateApprovalRequest -> ManagerApproval -> PrepareFinanceReview -> [BudgetApproval AND ComplianceApproval] -> ExpenseReimburse +Workflow expenseApproval = new WorkflowBuilder(createRequest) + .WithName("ExpenseReimbursement") + .WithDescription("Expense reimbursement with manager and parallel finance approvals") + .AddEdge(createRequest, managerApproval) + .AddEdge(managerApproval, prepareFinanceReview) + .AddFanOutEdge(prepareFinanceReview, [budgetApproval, complianceApproval]) + .AddFanInEdge([budgetApproval, complianceApproval], reimburse) + .Build(); + +using IHost app = FunctionsApplication + .CreateBuilder(args) + .ConfigureFunctionsWebApplication() + .ConfigureDurableWorkflows(workflows => workflows.AddWorkflow(expenseApproval, exposeStatusEndpoint: true)) + .Build(); +app.Run(); diff --git a/dotnet/samples/Durable/Workflow/AzureFunctions/03_WorkflowHITL/README.md b/dotnet/samples/Durable/Workflow/AzureFunctions/03_WorkflowHITL/README.md new file mode 100644 index 0000000000..c7634a09d6 --- /dev/null +++ b/dotnet/samples/Durable/Workflow/AzureFunctions/03_WorkflowHITL/README.md @@ -0,0 +1,196 @@ +# Human-in-the-Loop (HITL) Workflow — Azure Functions + +This sample demonstrates a durable workflow with Human-in-the-Loop support hosted in Azure Functions. The workflow pauses at three `RequestPort` nodes — one sequential manager approval, then two parallel finance approvals (budget and compliance) via fan-out/fan-in. Approval responses are sent via HTTP endpoints. + +## Key Concepts Demonstrated + +- Using multiple `RequestPort` nodes for sequential and parallel human-in-the-loop interactions in a durable workflow +- Fan-out/fan-in pattern for parallel approval steps +- Auto-generated HTTP endpoints for running workflows, checking status, and sending HITL responses +- Pausing orchestrations via `WaitForExternalEvent` and resuming via `RaiseEventAsync` +- Viewing inputs the workflow is waiting for via the status endpoint + +## Workflow + +This sample implements the following workflow: + +``` +┌──────────────────────┐ ┌────────────────┐ ┌─────────────────────┐ ┌────────────────────┐ +│ CreateApprovalRequest│──►│ManagerApproval │──►│PrepareFinanceReview │──┬►│ BudgetApproval │──┐ +└──────────────────────┘ │ (RequestPort) │ └─────────────────────┘ │ │ (RequestPort) │ │ + └────────────────┘ │ └────────────────────┘ │ ┌─────────────────┐ + │ ├─►│ExpenseReimburse │ + │ ┌────────────────────┐ │ └─────────────────┘ + └►│ComplianceApproval │──┘ + │ (RequestPort) │ + └────────────────────┘ +``` + +## HTTP Endpoints + +The framework auto-generates these endpoints for workflows with `RequestPort` nodes: + +| Method | Endpoint | Description | +|--------|----------|-------------| +| POST | `/api/workflows/ExpenseReimbursement/run` | Start the workflow | +| GET | `/api/workflows/ExpenseReimbursement/status/{runId}` | Check status and inputs the workflow is waiting for | +| POST | `/api/workflows/ExpenseReimbursement/respond/{runId}` | Send approval response to resume | + +## Environment Setup + +See the [README.md](../../README.md) file in the parent directory for information on how to configure the environment, including how to install and run the Durable Task Scheduler. + +## Running the Sample + +With the environment setup and function app running, you can test the sample by sending HTTP requests to the workflow endpoints. + +You can use the `demo.http` file to trigger the workflow, or a command line tool like `curl` as shown below: + +### Step 1: Start the Workflow + +Bash (Linux/macOS/WSL): + +```bash +curl -X POST http://localhost:7071/api/workflows/ExpenseReimbursement/run \ + -H "Content-Type: text/plain" -d "EXP-2025-001" +``` + +PowerShell: + +```powershell +Invoke-RestMethod -Method Post ` + -Uri http://localhost:7071/api/workflows/ExpenseReimbursement/run ` + -ContentType text/plain ` + -Body "EXP-2025-001" +``` + +The response will confirm the workflow orchestration has started: + +```text +Workflow orchestration started for ExpenseReimbursement. Orchestration runId: abc123def456 +``` + +> **Tip:** You can provide a custom run ID by appending a `runId` query parameter: +> +> ```bash +> curl -X POST "http://localhost:7071/api/workflows/ExpenseReimbursement/run?runId=expense-001" \ +> -H "Content-Type: text/plain" -d "EXP-2025-001" +> ``` +> +> If not provided, a unique run ID is auto-generated. + +### Step 2: Check Workflow Status + +The workflow pauses at the `ManagerApproval` RequestPort. Query the status endpoint to see what input it is waiting for: + +```bash +curl http://localhost:7071/api/workflows/ExpenseReimbursement/status/{runId} +``` + +```json +{ + "runId": "{runId}", + "status": "Running", + "waitingForInput": [ + { "eventName": "ManagerApproval", "input": { "ExpenseId": "EXP-2025-001", "Amount": 1500.00, "EmployeeName": "Jerry" } } + ] +} +``` + +> **Tip:** You can also verify this in the DTS dashboard at `http://localhost:8082`. Find the orchestration by its `runId` and you will see it is in a "Running" state, paused at a `WaitForExternalEvent` call for the `ManagerApproval` event. + +### Step 3: Send Manager Approval Response + +```bash +curl -X POST http://localhost:7071/api/workflows/ExpenseReimbursement/respond/{runId} \ + -H "Content-Type: application/json" \ + -d '{"eventName": "ManagerApproval", "response": {"Approved": true, "Comments": "Approved by manager."}}' +``` + +```json +{ + "message": "Response sent to workflow.", + "runId": "{runId}", + "eventName": "ManagerApproval", + "validated": true +} +``` + +### Step 4: Check Workflow Status Again + +The workflow now pauses at both the `BudgetApproval` and `ComplianceApproval` RequestPorts in parallel: + +```bash +curl http://localhost:7071/api/workflows/ExpenseReimbursement/status/{runId} +``` + +```json +{ + "runId": "{runId}", + "status": "Running", + "waitingForInput": [ + { "eventName": "BudgetApproval", "input": { "ExpenseId": "EXP-2025-001", "Amount": 1500.00, "EmployeeName": "Jerry" } }, + { "eventName": "ComplianceApproval", "input": { "ExpenseId": "EXP-2025-001", "Amount": 1500.00, "EmployeeName": "Jerry" } } + ] +} +``` + +### Step 5a: Send Budget Approval Response + +```bash +curl -X POST http://localhost:7071/api/workflows/ExpenseReimbursement/respond/{runId} \ + -H "Content-Type: application/json" \ + -d '{"eventName": "BudgetApproval", "response": {"Approved": true, "Comments": "Budget approved."}}' +``` + +```json +{ + "message": "Response sent to workflow.", + "runId": "{runId}", + "eventName": "BudgetApproval", + "validated": true +} +``` + +### Step 5b: Send Compliance Approval Response + +```bash +curl -X POST http://localhost:7071/api/workflows/ExpenseReimbursement/respond/{runId} \ + -H "Content-Type: application/json" \ + -d '{"eventName": "ComplianceApproval", "response": {"Approved": true, "Comments": "Compliance approved."}}' +``` + +```json +{ + "message": "Response sent to workflow.", + "runId": "{runId}", + "eventName": "ComplianceApproval", + "validated": true +} +``` + +### Step 6: Check Final Status + +After all approvals, the workflow completes and the expense is reimbursed: + +```bash +curl http://localhost:7071/api/workflows/ExpenseReimbursement/status/{runId} +``` + +```json +{ + "runId": "{runId}", + "status": "Completed", + "waitingForInput": null +} +``` + +### Viewing Workflows in the DTS Dashboard + +After running a workflow, you can navigate to the Durable Task Scheduler (DTS) dashboard to visualize the orchestration and inspect its execution history. + +If you are using the DTS emulator, the dashboard is available at `http://localhost:8082`. + +1. Open the dashboard and look for the orchestration instance matching the `runId` returned in Step 1 (e.g., `abc123def456` or your custom ID like `expense-001`). +2. Click into the instance to see the execution timeline, which shows each executor activity and the `WaitForExternalEvent` pauses where the workflow waited for human input — including the two parallel finance approvals. +3. Expand individual activity steps to inspect inputs and outputs — for example, the `ManagerApproval`, `BudgetApproval`, and `ComplianceApproval` external events will show the approval request sent and the response received. diff --git a/dotnet/samples/Durable/Workflow/AzureFunctions/03_WorkflowHITL/demo.http b/dotnet/samples/Durable/Workflow/AzureFunctions/03_WorkflowHITL/demo.http new file mode 100644 index 0000000000..5e2993ac1c --- /dev/null +++ b/dotnet/samples/Durable/Workflow/AzureFunctions/03_WorkflowHITL/demo.http @@ -0,0 +1,53 @@ +# Default endpoint address for local testing +@authority=http://localhost:7071 + +### Step 1: Start the expense reimbursement workflow +POST {{authority}}/api/workflows/ExpenseReimbursement/run +Content-Type: text/plain + +EXP-2025-001 + +### Step 1 (alternative): Start the workflow with a custom run ID +POST {{authority}}/api/workflows/ExpenseReimbursement/run?runId=expense-001 +Content-Type: text/plain + +EXP-2025-001 + +### Step 2: Check workflow status (replace {runId} with actual run ID from Step 1) +GET {{authority}}/api/workflows/ExpenseReimbursement/status/{runId} + +### Step 3: Send manager approval (replace {runId} with actual run ID from Step 1) +POST {{authority}}/api/workflows/ExpenseReimbursement/respond/{runId} +Content-Type: application/json + +{"eventName": "ManagerApproval", "response": {"Approved": true, "Comments": "Approved by manager."}} + +### Step 3 (alternative): Deny the expense at manager level +POST {{authority}}/api/workflows/ExpenseReimbursement/respond/{runId} +Content-Type: application/json + +{"eventName": "ManagerApproval", "response": {"Approved": false, "Comments": "Insufficient documentation. Please resubmit."}} + +### Step 4: Check workflow status after manager approval (now waiting for parallel finance approvals) +GET {{authority}}/api/workflows/ExpenseReimbursement/status/{runId} + +### Step 5a: Send budget approval (replace {runId} with actual run ID from Step 1) +POST {{authority}}/api/workflows/ExpenseReimbursement/respond/{runId} +Content-Type: application/json + +{"eventName": "BudgetApproval", "response": {"Approved": true, "Comments": "Budget approved."}} + +### Step 5b: Send compliance approval (replace {runId} with actual run ID from Step 1) +POST {{authority}}/api/workflows/ExpenseReimbursement/respond/{runId} +Content-Type: application/json + +{"eventName": "ComplianceApproval", "response": {"Approved": true, "Comments": "Compliance approved."}} + +### Step 5b (alternative): Deny the expense at compliance level +POST {{authority}}/api/workflows/ExpenseReimbursement/respond/{runId} +Content-Type: application/json + +{"eventName": "ComplianceApproval", "response": {"Approved": false, "Comments": "Compliance requirements not met."}} + +### Step 6: Check final workflow status after all approvals +GET {{authority}}/api/workflows/ExpenseReimbursement/status/{runId} diff --git a/dotnet/samples/Durable/Workflow/AzureFunctions/03_WorkflowHITL/host.json b/dotnet/samples/Durable/Workflow/AzureFunctions/03_WorkflowHITL/host.json new file mode 100644 index 0000000000..9384a0a583 --- /dev/null +++ b/dotnet/samples/Durable/Workflow/AzureFunctions/03_WorkflowHITL/host.json @@ -0,0 +1,20 @@ +{ + "version": "2.0", + "logging": { + "logLevel": { + "Microsoft.Agents.AI.DurableTask": "Information", + "Microsoft.Agents.AI.Hosting.AzureFunctions": "Information", + "DurableTask": "Information", + "Microsoft.DurableTask": "Information" + } + }, + "extensions": { + "durableTask": { + "hubName": "default", + "storageProvider": { + "type": "AzureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + } + } + } +} diff --git a/dotnet/samples/Durable/Workflow/ConsoleApps/08_WorkflowHITL/08_WorkflowHITL.csproj b/dotnet/samples/Durable/Workflow/ConsoleApps/08_WorkflowHITL/08_WorkflowHITL.csproj new file mode 100644 index 0000000000..a9103b6e48 --- /dev/null +++ b/dotnet/samples/Durable/Workflow/ConsoleApps/08_WorkflowHITL/08_WorkflowHITL.csproj @@ -0,0 +1,28 @@ + + + net10.0 + Exe + enable + enable + WorkflowHITL + WorkflowHITL + + + + + + + + + + + + + + + diff --git a/dotnet/samples/Durable/Workflow/ConsoleApps/08_WorkflowHITL/Executors.cs b/dotnet/samples/Durable/Workflow/ConsoleApps/08_WorkflowHITL/Executors.cs new file mode 100644 index 0000000000..db1a873260 --- /dev/null +++ b/dotnet/samples/Durable/Workflow/ConsoleApps/08_WorkflowHITL/Executors.cs @@ -0,0 +1,81 @@ +// Copyright (c) Microsoft. All rights reserved. + +using Microsoft.Agents.AI.Workflows; + +namespace WorkflowHITL; + +/// +/// Represents an expense approval request. +/// +/// The unique identifier of the expense. +/// The amount of the expense. +/// The name of the employee submitting the expense. +public record ApprovalRequest(string ExpenseId, decimal Amount, string EmployeeName); + +/// +/// Represents the response to an approval request. +/// +/// Whether the expense was approved. +/// Optional comments from the approver. +public record ApprovalResponse(bool Approved, string? Comments); + +/// +/// Retrieves expense details and creates an approval request. +/// +internal sealed class CreateApprovalRequest() : Executor("RetrieveRequest") +{ + /// + public override ValueTask HandleAsync( + string message, + IWorkflowContext context, + CancellationToken cancellationToken = default) + { + // In a real scenario, this would look up expense details from a database + return new ValueTask(new ApprovalRequest(message, 1500.00m, "Jerry")); + } +} + +/// +/// Prepares the approval request for finance review after manager approval. +/// +internal sealed class PrepareFinanceReview() : Executor("PrepareFinanceReview") +{ + /// + public override ValueTask HandleAsync( + ApprovalResponse message, + IWorkflowContext context, + CancellationToken cancellationToken = default) + { + if (!message.Approved) + { + throw new InvalidOperationException("Cannot proceed to finance review — manager denied the expense."); + } + + // In a real scenario, this would retrieve the original expense details + return new ValueTask(new ApprovalRequest("EXP-2025-001", 1500.00m, "Jerry")); + } +} + +/// +/// Processes the expense reimbursement based on the parallel approval responses from budget and compliance. +/// +internal sealed class ExpenseReimburse() : Executor("Reimburse") +{ + /// + public override async ValueTask HandleAsync( + ApprovalResponse[] message, + IWorkflowContext context, + CancellationToken cancellationToken = default) + { + // Check that all parallel approvals passed + ApprovalResponse? denied = Array.Find(message, r => !r.Approved); + if (denied is not null) + { + return $"Expense reimbursement denied. Comments: {denied.Comments}"; + } + + // Simulate payment processing + await Task.Delay(1000, cancellationToken); + return $"Expense reimbursed at {DateTime.UtcNow:O}"; + } +} diff --git a/dotnet/samples/Durable/Workflow/ConsoleApps/08_WorkflowHITL/Program.cs b/dotnet/samples/Durable/Workflow/ConsoleApps/08_WorkflowHITL/Program.cs new file mode 100644 index 0000000000..3b73b2d9b0 --- /dev/null +++ b/dotnet/samples/Durable/Workflow/ConsoleApps/08_WorkflowHITL/Program.cs @@ -0,0 +1,98 @@ +// Copyright (c) Microsoft. All rights reserved. + +// This sample demonstrates a Human-in-the-Loop (HITL) workflow using Durable Tasks. +// +// ┌──────────────────────┐ ┌────────────────┐ ┌─────────────────────┐ ┌────────────────────┐ +// │ CreateApprovalRequest│──►│ManagerApproval │──►│PrepareFinanceReview │──┬►│ BudgetApproval │──┐ +// └──────────────────────┘ │ (RequestPort) │ └─────────────────────┘ │ │ (RequestPort) │ │ +// └────────────────┘ │ └────────────────────┘ │ ┌─────────────────┐ +// │ ├─►│ExpenseReimburse │ +// │ ┌────────────────────┐ │ └─────────────────┘ +// └►│ComplianceApproval │──┘ +// │ (RequestPort) │ +// └────────────────────┘ +// +// The workflow pauses at three RequestPorts — one for the manager, then two in parallel for finance. +// After manager approval, BudgetApproval and ComplianceApproval run concurrently via fan-out/fan-in. + +using Microsoft.Agents.AI.DurableTask; +using Microsoft.Agents.AI.DurableTask.Workflows; +using Microsoft.Agents.AI.Workflows; +using Microsoft.DurableTask.Client.AzureManaged; +using Microsoft.DurableTask.Worker.AzureManaged; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using WorkflowHITL; + +string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING") + ?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None"; + +// Define executors and RequestPorts for the three HITL pause points +CreateApprovalRequest createRequest = new(); +RequestPort managerApproval = RequestPort.Create("ManagerApproval"); +PrepareFinanceReview prepareFinanceReview = new(); +RequestPort budgetApproval = RequestPort.Create("BudgetApproval"); +RequestPort complianceApproval = RequestPort.Create("ComplianceApproval"); +ExpenseReimburse reimburse = new(); + +// Build the workflow: CreateApprovalRequest -> ManagerApproval -> PrepareFinanceReview -> [BudgetApproval AND ComplianceApproval] -> ExpenseReimburse +Workflow expenseApproval = new WorkflowBuilder(createRequest) + .WithName("ExpenseReimbursement") + .WithDescription("Expense reimbursement with manager and parallel finance approvals") + .AddEdge(createRequest, managerApproval) + .AddEdge(managerApproval, prepareFinanceReview) + .AddFanOutEdge(prepareFinanceReview, [budgetApproval, complianceApproval]) + .AddFanInEdge([budgetApproval, complianceApproval], reimburse) + .Build(); + +IHost host = Host.CreateDefaultBuilder(args) + .ConfigureLogging(logging => logging.SetMinimumLevel(LogLevel.Warning)) + .ConfigureServices(services => + { + services.ConfigureDurableWorkflows( + options => options.AddWorkflow(expenseApproval), + workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString), + clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString)); + }) + .Build(); + +await host.StartAsync(); + +IWorkflowClient workflowClient = host.Services.GetRequiredService(); + +// Start the workflow with streaming to observe events including HITL pauses +string expenseId = "EXP-2025-001"; +Console.WriteLine($"Starting expense reimbursement workflow for expense: {expenseId}"); +IStreamingWorkflowRun run = await workflowClient.StreamAsync(expenseApproval, expenseId); +Console.WriteLine($"Workflow started with instance ID: {run.RunId}\n"); + +// Watch for workflow events — handle HITL requests as they arrive +await foreach (WorkflowEvent evt in run.WatchStreamAsync()) +{ + switch (evt) + { + case DurableWorkflowWaitingForInputEvent requestEvent: + Console.WriteLine($"Workflow paused at RequestPort: {requestEvent.RequestPort.Id}"); + Console.WriteLine($" Input: {requestEvent.Input}"); + + // In a real scenario, this would involve human interaction (UI, email, Teams, etc.) + ApprovalRequest? request = requestEvent.GetInputAs(); + Console.WriteLine($" Approval for: {request?.EmployeeName}, Amount: {request?.Amount:C}"); + + ApprovalResponse approvalResponse = new(Approved: true, Comments: "Approved by manager."); + await run.SendResponseAsync(requestEvent, approvalResponse); + Console.WriteLine($" Response sent: Approved={approvalResponse.Approved}\n"); + break; + + case DurableWorkflowCompletedEvent completedEvent: + Console.WriteLine($"Workflow completed: {completedEvent.Result}"); + break; + + case DurableWorkflowFailedEvent failedEvent: + Console.WriteLine($"Workflow failed: {failedEvent.ErrorMessage}"); + break; + } +} + +await host.StopAsync(); diff --git a/dotnet/samples/Durable/Workflow/ConsoleApps/08_WorkflowHITL/README.md b/dotnet/samples/Durable/Workflow/ConsoleApps/08_WorkflowHITL/README.md new file mode 100644 index 0000000000..01a62da53d --- /dev/null +++ b/dotnet/samples/Durable/Workflow/ConsoleApps/08_WorkflowHITL/README.md @@ -0,0 +1,106 @@ +# Workflow Human-in-the-Loop (HITL) Sample + +This sample demonstrates a **Human-in-the-Loop** pattern in durable workflows using `RequestPort`. The workflow pauses execution at a manager approval point, then fans out to two parallel finance approval points — budget and compliance — before resuming. + +## Key Concepts Demonstrated + +- Using `RequestPort` to define external input points in a workflow +- Sequential and parallel HITL pause points in a single workflow using fan-out/fan-in +- Streaming workflow events with `IStreamingWorkflowRun` +- Handling `DurableWorkflowWaitingForInputEvent` to detect HITL pauses +- Using `SendResponseAsync` to provide responses and resume the workflow +- **Durability**: The workflow survives process restarts while waiting for human input + +## Workflow + +This sample implements the following workflow: + +``` +┌──────────────────────┐ ┌────────────────┐ ┌─────────────────────┐ ┌────────────────────┐ +│ CreateApprovalRequest│──►│ManagerApproval │──►│PrepareFinanceReview │──┬►│ BudgetApproval │──┐ +└──────────────────────┘ │ (RequestPort) │ └─────────────────────┘ │ │ (RequestPort) │ │ + └────────────────┘ │ └────────────────────┘ │ ┌─────────────────┐ + │ ├─►│ExpenseReimburse │ + │ ┌────────────────────┐ │ └─────────────────┘ + └►│ComplianceApproval │──┘ + │ (RequestPort) │ + └────────────────────┘ +``` + +| Step | Description | +|------|-------------| +| CreateApprovalRequest | Retrieves expense details and creates an approval request | +| ManagerApproval (RequestPort) | **PAUSES** the workflow and waits for manager approval | +| PrepareFinanceReview | Prepares the request for finance review after manager approval | +| BudgetApproval (RequestPort) | **PAUSES** the workflow and waits for budget approval (parallel) | +| ComplianceApproval (RequestPort) | **PAUSES** the workflow and waits for compliance approval (parallel) | +| ExpenseReimburse | Processes the reimbursement after all approvals pass | + +## How It Works + +A `RequestPort` defines a typed external input point in the workflow: + +```csharp +RequestPort managerApproval = + RequestPort.Create("ManagerApproval"); +``` + +Use `WatchStreamAsync` to observe events. When the workflow reaches a `RequestPort`, a `DurableWorkflowWaitingForInputEvent` is emitted. Call `SendResponseAsync` to provide the response and resume the workflow: + +```csharp +await foreach (WorkflowEvent evt in run.WatchStreamAsync()) +{ + switch (evt) + { + case DurableWorkflowWaitingForInputEvent requestEvent: + ApprovalRequest? request = requestEvent.GetInputAs(); + await run.SendResponseAsync(requestEvent, new ApprovalResponse(Approved: true, Comments: "Approved.")); + break; + } +} +``` + +## Environment Setup + +See the [README.md](../README.md) file in the parent directory for information on configuring the environment, including how to install and run the Durable Task Scheduler. + +## Running the Sample + +```bash +cd dotnet/samples/Durable/Workflow/ConsoleApps/08_WorkflowHITL +dotnet run --framework net10.0 +``` + +### Sample Output + +```text +Starting expense reimbursement workflow for expense: EXP-2025-001 +Workflow started with instance ID: abc123... + +Workflow paused at RequestPort: ManagerApproval + Input: {"expenseId":"EXP-2025-001","amount":1500.00,"employeeName":"Jerry"} + Approval for: Jerry, Amount: $1,500.00 + Response sent: Approved=True + +Workflow paused at RequestPort: BudgetApproval + Input: {"expenseId":"EXP-2025-001","amount":1500.00,"employeeName":"Jerry"} + Approval for: Jerry, Amount: $1,500.00 + Response sent: Approved=True + +Workflow paused at RequestPort: ComplianceApproval + Input: {"expenseId":"EXP-2025-001","amount":1500.00,"employeeName":"Jerry"} + Approval for: Jerry, Amount: $1,500.00 + Response sent: Approved=True + +Workflow completed: Expense reimbursed at 2025-01-23T17:30:00.0000000Z +``` + +### Viewing Workflows in the DTS Dashboard + +After running the sample, you can navigate to the Durable Task Scheduler (DTS) dashboard to visualize the completed orchestration and inspect its execution history. + +If you are using the DTS emulator, the dashboard is available at `http://localhost:8082`. + +1. Open the dashboard and look for the orchestration instance matching the instance ID logged in the console output (e.g., `abc123...`). +2. Click into the instance to see the execution timeline, which shows each executor activity and the `WaitForExternalEvent` pauses where the workflow waited for human input — including the two parallel finance approvals. +3. Expand individual activity steps to inspect inputs and outputs — for example, the `ManagerApproval`, `BudgetApproval`, and `ComplianceApproval` external events will show the approval request sent and the response received. diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/DurableOptions.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/DurableOptions.cs index 3d48249d31..136eb1fb3d 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/DurableOptions.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/DurableOptions.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using System.Diagnostics; using Microsoft.Agents.AI.DurableTask.Workflows; @@ -9,7 +9,7 @@ namespace Microsoft.Agents.AI.DurableTask; /// Provides configuration options for durable agents and workflows. /// [DebuggerDisplay("Workflows = {Workflows.Workflows.Count}, Agents = {Agents.AgentCount}")] -public sealed class DurableOptions +public class DurableOptions { /// /// Initializes a new instance of the class. diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Logs.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Logs.cs index 43e2031a6f..fdf09749b5 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Logs.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Logs.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using Microsoft.Agents.AI.DurableTask; using Microsoft.Extensions.AI; @@ -211,4 +211,20 @@ public static partial void LogEdgeRoutingMessage( this ILogger logger, string source, string sink); + + [LoggerMessage( + EventId = 112, + Level = LogLevel.Information, + Message = "Workflow waiting for external input at RequestPort '{RequestPortId}'")] + public static partial void LogWaitingForExternalEvent( + this ILogger logger, + string requestPortId); + + [LoggerMessage( + EventId = 113, + Level = LogLevel.Information, + Message = "Received external event for RequestPort '{RequestPortId}'")] + public static partial void LogReceivedExternalEvent( + this ILogger logger, + string requestPortId); } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/ServiceCollectionExtensions.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/ServiceCollectionExtensions.cs index 2175cf5bb9..b1002d0851 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/ServiceCollectionExtensions.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/ServiceCollectionExtensions.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using Microsoft.Agents.AI.DurableTask.Workflows; using Microsoft.Agents.AI.Workflows; @@ -329,12 +329,14 @@ private static WorkflowRegistrationInfo BuildWorkflowRegistration( /// /// Returns for bindings that should be registered as Durable Task activities. - /// (Durable Entities) and (sub-orchestrations) - /// use specialized dispatch and are excluded. + /// (Durable Entities), (sub-orchestrations), + /// and (human-in-the-loop via external events) use specialized dispatch + /// and are excluded. /// private static bool IsActivityBinding(ExecutorBinding binding) => binding is not AIAgentBinding - and not SubworkflowBinding; + and not SubworkflowBinding + and not RequestPortBinding; private static async Task RunWorkflowOrchestrationAsync( TaskOrchestrationContext context, diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableActivityExecutor.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableActivityExecutor.cs index 526a0f00d4..e3ad1980e7 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableActivityExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableActivityExecutor.cs @@ -111,18 +111,38 @@ private static string SerializeResult(object? result) } } - private static object DeserializeInput(string input, Type targetType) + internal static object DeserializeInput(string input, Type targetType) { if (targetType == typeof(string)) { return input; } + // Fan-in aggregation serializes results as a JSON array of strings (e.g., ["{...}", "{...}"]). + // When the target type is a non-string array, deserialize each element individually. + if (targetType.IsArray && targetType != typeof(string[])) + { + Type elementType = targetType.GetElementType()!; + string[]? stringArray = JsonSerializer.Deserialize(input, DurableSerialization.Options); + if (stringArray is not null) + { + Array result = Array.CreateInstance(elementType, stringArray.Length); + for (int i = 0; i < stringArray.Length; i++) + { + object element = JsonSerializer.Deserialize(stringArray[i], elementType, DurableSerialization.Options) + ?? throw new InvalidOperationException($"Failed to deserialize element {i} to type '{elementType.Name}'."); + result.SetValue(element, i); + } + + return result; + } + } + return JsonSerializer.Deserialize(input, targetType, DurableSerialization.Options) ?? throw new InvalidOperationException($"Failed to deserialize input to type '{targetType.Name}'."); } - private static Type ResolveInputType(string? inputTypeName, ISet supportedTypes) + internal static Type ResolveInputType(string? inputTypeName, ISet supportedTypes) { if (string.IsNullOrEmpty(inputTypeName)) { @@ -141,10 +161,13 @@ private static Type ResolveInputType(string? inputTypeName, ISet supported Type? loadedType = Type.GetType(inputTypeName); - // Fall back if type is string but executor doesn't support string - if (loadedType == typeof(string) && !supportedTypes.Contains(typeof(string))) + // Fall back if type is string or string[] but executor doesn't support it + if (loadedType is not null && !supportedTypes.Contains(loadedType)) { - return supportedTypes.FirstOrDefault() ?? typeof(string); + if (loadedType == typeof(string) || loadedType == typeof(string[])) + { + return supportedTypes.FirstOrDefault() ?? typeof(string); + } } return loadedType ?? supportedTypes.FirstOrDefault() ?? typeof(string); diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableExecutorDispatcher.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableExecutorDispatcher.cs index 6f69b923b6..afb7a774fb 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableExecutorDispatcher.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableExecutorDispatcher.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. // ConfigureAwait Usage in Orchestration Code: // This file uses ConfigureAwait(true) because it runs within orchestration context. @@ -8,31 +8,34 @@ // Using ConfigureAwait(false) here could cause non-deterministic behavior during replay. using System.Text.Json; +using Microsoft.Agents.AI.Workflows; using Microsoft.DurableTask; using Microsoft.Extensions.Logging; namespace Microsoft.Agents.AI.DurableTask.Workflows; /// -/// Dispatches workflow executors to activities, AI agents, or sub-orchestrations. +/// Dispatches workflow executors to activities, AI agents, sub-orchestrations, or external events (human-in-the-loop). /// /// /// Called during the dispatch phase of each superstep by /// DurableWorkflowRunner.DispatchExecutorsInParallelAsync. For each executor that has /// pending input, this dispatcher determines whether the executor is an AI agent (stateful, -/// backed by Durable Entities), a sub-workflow (dispatched as a sub-orchestration), or a -/// regular activity, and invokes the appropriate Durable Task API. +/// backed by Durable Entities), a request port (human-in-the-loop, backed by external events), +/// a sub-workflow (dispatched as a sub-orchestration), or a regular activity, and invokes the +/// appropriate Durable Task API. /// The serialised string result is returned to the runner for the routing phase. /// internal static class DurableExecutorDispatcher { /// - /// Dispatches an executor based on its type (activity, AI agent, or sub-workflow). + /// Dispatches an executor based on its type (activity, AI agent, request port, or sub-workflow). /// /// The task orchestration context. /// Information about the executor to dispatch. /// The message envelope containing input and type information. /// The shared state dictionary to pass to the executor. + /// The live workflow status used to publish events and pending request port state. /// The logger for tracing. /// The result from the executor. internal static async Task DispatchAsync( @@ -40,10 +43,16 @@ internal static async Task DispatchAsync( WorkflowExecutorInfo executorInfo, DurableMessageEnvelope envelope, Dictionary sharedState, + DurableWorkflowLiveStatus liveStatus, ILogger logger) { logger.LogDispatchingExecutor(executorInfo.ExecutorId, executorInfo.IsAgenticExecutor); + if (executorInfo.IsRequestPortExecutor) + { + return await ExecuteRequestPortAsync(context, executorInfo, envelope.Message, liveStatus, logger).ConfigureAwait(true); + } + if (executorInfo.IsAgenticExecutor) { return await ExecuteAgentAsync(context, executorInfo, logger, envelope.Message).ConfigureAwait(true); @@ -79,6 +88,47 @@ private static async Task ExecuteActivityAsync( return await context.CallActivityAsync(activityName, serializedInput).ConfigureAwait(true); } + /// + /// Executes a request port executor by waiting for an external event (human-in-the-loop). + /// + /// + /// When the workflow reaches a executor, the orchestration publishes + /// the pending request to and waits for an external actor + /// (e.g., a UI or API) to raise the corresponding event via + /// . + /// Multiple RequestPorts may be dispatched in parallel during a fan-out superstep. + /// Each adds its pending request to . + /// The wait has no built-in timeout; for time-limited approvals, callers can combine + /// context.CreateTimer with Task.WhenAny in a wrapper executor. + /// + private static async Task ExecuteRequestPortAsync( + TaskOrchestrationContext context, + WorkflowExecutorInfo executorInfo, + string input, + DurableWorkflowLiveStatus liveStatus, + ILogger logger) + { + RequestPort requestPort = executorInfo.RequestPort!; + string eventName = requestPort.Id; + + logger.LogWaitingForExternalEvent(eventName); + + // Publish pending request so external clients can discover what input is needed + liveStatus.PendingEvents.Add(new PendingRequestPortStatus(EventName: eventName, Input: input)); + context.SetCustomStatus(liveStatus); + + // Wait until the external actor raises the event + string response = await context.WaitForExternalEvent(eventName).ConfigureAwait(true); + + // Remove this pending request after receiving the response + liveStatus.PendingEvents.RemoveAll(p => p.EventName == eventName); + context.SetCustomStatus(liveStatus.Events.Count > 0 || liveStatus.PendingEvents.Count > 0 ? liveStatus : null); + + logger.LogReceivedExternalEvent(eventName); + + return response; + } + /// /// Executes an AI agent executor through Durable Entities. /// diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableStreamingWorkflowRun.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableStreamingWorkflowRun.cs index a7ed7b11ce..6cacf871e0 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableStreamingWorkflowRun.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableStreamingWorkflowRun.cs @@ -14,15 +14,24 @@ namespace Microsoft.Agents.AI.DurableTask.Workflows; /// Represents a durable workflow run that supports streaming workflow events as they occur. /// /// +/// /// Events are detected by monitoring the orchestration's custom status at regular intervals. /// When executors emit events via or /// , they are written to the orchestration's /// custom status and picked up by this streaming run. +/// +/// +/// When the workflow reaches a executor, a +/// is yielded containing the request data. The caller should then call +/// +/// to provide the response and resume the workflow. +/// /// [DebuggerDisplay("{WorkflowName} ({RunId})")] internal sealed class DurableStreamingWorkflowRun : IStreamingWorkflowRun { private readonly DurableTaskClient _client; + private readonly Dictionary _requestPorts; /// /// Initializes a new instance of the class. @@ -35,6 +44,7 @@ internal DurableStreamingWorkflowRun(DurableTaskClient client, string instanceId this._client = client; this.RunId = instanceId; this.WorkflowName = workflow.Name ?? string.Empty; + this._requestPorts = ExtractRequestPorts(workflow); } /// @@ -92,9 +102,12 @@ private async IAsyncEnumerable WatchStreamAsync( TimeSpan maxInterval = TimeSpan.FromSeconds(2); TimeSpan currentInterval = minInterval; - // Track how many events we've already read from custom status + // Track how many events we've already read from the durable workflow status int lastReadEventIndex = 0; + // Track which pending events we've already yielded to avoid duplicates + HashSet yieldedPendingEvents = []; + while (!cancellationToken.IsCancellationRequested) { // Poll with getInputsAndOutputs: true because SerializedCustomStatus @@ -111,26 +124,54 @@ private async IAsyncEnumerable WatchStreamAsync( bool hasNewEvents = false; - // Always drain any unread events from custom status before checking terminal states. + // Always drain any unread events from the durable workflow status before checking terminal states. // The orchestration may complete before the next poll, so events would be lost if we // check terminal status first. if (metadata.SerializedCustomStatus is not null) { - if (TryParseCustomStatus(metadata.SerializedCustomStatus, out DurableWorkflowCustomStatus customStatus)) + if (DurableWorkflowLiveStatus.TryParse(metadata.SerializedCustomStatus, out DurableWorkflowLiveStatus liveStatus)) { - (List events, lastReadEventIndex) = DrainNewEvents(customStatus.Events, lastReadEventIndex); + (List events, lastReadEventIndex) = DrainNewEvents(liveStatus.Events, lastReadEventIndex); foreach (WorkflowEvent evt in events) { hasNewEvents = true; yield return evt; } + + // Yield a DurableWorkflowWaitingForInputEvent for each new pending request port + foreach (PendingRequestPortStatus pending in liveStatus.PendingEvents) + { + if (yieldedPendingEvents.Add(pending.EventName)) + { + if (!this._requestPorts.TryGetValue(pending.EventName, out RequestPort? matchingPort)) + { + // RequestPort may not exist in the current workflow definition (e.g., during rolling deployments). + continue; + } + + hasNewEvents = true; + yield return new DurableWorkflowWaitingForInputEvent( + pending.Input, + matchingPort); + } + } + + // Sync tracking with current pending events so re-used RequestPort names can be yielded again + if (liveStatus.PendingEvents.Count == 0) + { + yieldedPendingEvents.Clear(); + } + else + { + yieldedPendingEvents.IntersectWith(liveStatus.PendingEvents.Select(p => p.EventName)); + } } } - // Check terminal states after draining events from custom status + // Check terminal states after draining events from the durable workflow status if (metadata.RuntimeStatus == OrchestrationRuntimeStatus.Completed) { - // The framework clears custom status on completion, so events may be in + // The framework clears the durable workflow status on completion, so events may be in // SerializedOutput as a DurableWorkflowResult wrapper. if (TryParseWorkflowResult(metadata.SerializedOutput, out DurableWorkflowResult? outputResult)) { @@ -183,6 +224,28 @@ private async IAsyncEnumerable WatchStreamAsync( } } + /// + /// Sends a response to a to resume the workflow. + /// + /// The type of the response data. + /// The request event to respond to. + /// The response data to send. + /// A cancellation token to observe. + /// A representing the asynchronous operation. + [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Serializing workflow types provided by the caller.")] + [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Serializing workflow types provided by the caller.")] + public async ValueTask SendResponseAsync(DurableWorkflowWaitingForInputEvent requestEvent, TResponse response, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(requestEvent); + + string serializedResponse = JsonSerializer.Serialize(response, DurableSerialization.Options); + await this._client.RaiseEventAsync( + this.RunId, + requestEvent.RequestPort.Id, + serializedResponse, + cancellationToken).ConfigureAwait(false); + } + /// /// Waits for the workflow to complete and returns the result. /// @@ -242,22 +305,6 @@ private static (List Events, int UpdatedIndex) DrainNewEvents(Lis return (events, lastReadIndex); } - [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow custom status.")] - [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow custom status.")] - private static bool TryParseCustomStatus(string serializedStatus, out DurableWorkflowCustomStatus result) - { - try - { - result = JsonSerializer.Deserialize(serializedStatus, DurableWorkflowJsonContext.Default.DurableWorkflowCustomStatus)!; - return result is not null; - } - catch (JsonException) - { - result = default!; - return false; - } - } - /// /// Attempts to parse the orchestration output as a wrapper. /// @@ -395,4 +442,11 @@ private static bool TryParseWorkflowResult(string? serializedOutput, [NotNullWhe return dataElement.ValueKind == JsonValueKind.Null ? null : dataElement.Clone(); } + + private static Dictionary ExtractRequestPorts(Workflow workflow) + { + return WorkflowAnalyzer.GetExecutorsFromWorkflowInOrder(workflow) + .Where(e => e.RequestPort is not null) + .ToDictionary(e => e.RequestPort!.Id, e => e.RequestPort!); + } } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowCustomStatus.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowCustomStatus.cs deleted file mode 100644 index f6d403e861..0000000000 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowCustomStatus.cs +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. - -namespace Microsoft.Agents.AI.DurableTask.Workflows; - -/// -/// Represents the custom status written by the orchestration for streaming consumption. -/// -/// -/// The Durable Task framework exposes SerializedCustomStatus on orchestration metadata, -/// which is the only orchestration state readable by external clients while the orchestration -/// is still running. The orchestrator writes this object via SetCustomStatus after each -/// superstep so that can poll for new events. -/// On orchestration completion the framework clears custom status, so events are also -/// embedded in the output via . -/// -internal sealed class DurableWorkflowCustomStatus -{ - /// - /// Gets or sets the serialized workflow events emitted so far. - /// - public List Events { get; set; } = []; -} diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowJsonContext.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowJsonContext.cs index 9058c41e0a..e68ec842a8 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowJsonContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowJsonContext.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using System.Text.Json.Serialization; @@ -16,7 +16,7 @@ namespace Microsoft.Agents.AI.DurableTask.Workflows; /// : Activity input wrapper with state /// : Executor output wrapper with results, events, and state updates /// : Serialized payload wrapper with type info (events and messages) -/// : Custom status for streaming consumption +/// : Live status payload (streaming events and pending request ports) /// /// /// Note: User-defined executor input/output types still use reflection-based serialization @@ -31,8 +31,10 @@ namespace Microsoft.Agents.AI.DurableTask.Workflows; [JsonSerializable(typeof(DurableExecutorOutput))] [JsonSerializable(typeof(TypedPayload))] [JsonSerializable(typeof(List))] -[JsonSerializable(typeof(DurableWorkflowCustomStatus))] +[JsonSerializable(typeof(DurableWorkflowLiveStatus))] [JsonSerializable(typeof(DurableWorkflowResult))] +[JsonSerializable(typeof(PendingRequestPortStatus))] +[JsonSerializable(typeof(List))] [JsonSerializable(typeof(List))] [JsonSerializable(typeof(Dictionary))] [JsonSerializable(typeof(Dictionary))] diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowLiveStatus.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowLiveStatus.cs new file mode 100644 index 0000000000..1568755800 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowLiveStatus.cs @@ -0,0 +1,59 @@ +// Copyright (c) Microsoft. All rights reserved. + +using Microsoft.Agents.AI.Workflows; + +namespace Microsoft.Agents.AI.DurableTask.Workflows; + +/// +/// Live status payload written to the orchestration via SetCustomStatus. +/// +/// +/// +/// This is the only orchestration state readable by external clients while the workflow +/// is still running. It is written after each superstep so that +/// can poll for new events. +/// On completion the framework clears it, so events are also +/// embedded in the output via . +/// +/// +/// When the workflow is paused at one or more nodes, +/// contains the request data for each. +/// +/// +internal sealed class DurableWorkflowLiveStatus +{ + /// + /// Gets or sets the pending request ports the workflow is waiting on. Empty when no input is needed. + /// + public List PendingEvents { get; set; } = []; + + /// + /// Gets or sets the serialized workflow events emitted so far. + /// + public List Events { get; set; } = []; + + /// + /// Attempts to deserialize a serialized custom status string into a . + /// + [System.Diagnostics.CodeAnalysis.UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing durable workflow status.")] + [System.Diagnostics.CodeAnalysis.UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing durable workflow status.")] + internal static bool TryParse(string? serializedStatus, out DurableWorkflowLiveStatus result) + { + if (serializedStatus is null) + { + result = default!; + return false; + } + + try + { + result = System.Text.Json.JsonSerializer.Deserialize(serializedStatus, DurableSerialization.Options)!; + return result is not null; + } + catch (System.Text.Json.JsonException) + { + result = default!; + return false; + } + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowOptions.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowOptions.cs index eb9ee92758..52cb54b20a 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowOptions.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowOptions.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using System.Diagnostics; using Microsoft.Agents.AI.Workflows; @@ -12,7 +12,6 @@ namespace Microsoft.Agents.AI.DurableTask.Workflows; public sealed class DurableWorkflowOptions { private readonly Dictionary _workflows = new(StringComparer.OrdinalIgnoreCase); - private readonly DurableOptions? _parentOptions; /// /// Initializes a new instance of the class. @@ -20,9 +19,14 @@ public sealed class DurableWorkflowOptions /// Optional parent options container for accessing related configuration. internal DurableWorkflowOptions(DurableOptions? parentOptions = null) { - this._parentOptions = parentOptions; + this.ParentOptions = parentOptions; } + /// + /// Gets the parent container, if available. + /// + internal DurableOptions? ParentOptions { get; } + /// /// Gets the collection of workflows available in the current context, keyed by their unique names. /// @@ -77,7 +81,7 @@ public void AddWorkflows(params Workflow[] workflows) /// private void RegisterWorkflowExecutors(Workflow workflow) { - DurableAgentsOptions? agentOptions = this._parentOptions?.Agents; + DurableAgentsOptions? agentOptions = this.ParentOptions?.Agents; foreach ((string executorId, ExecutorBinding binding) in workflow.ReflectExecutors()) { diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs index 8836a4973a..942d4ecc9a 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. // ConfigureAwait Usage in Orchestration Code: // This file uses ConfigureAwait(true) because it runs within orchestration context. @@ -173,7 +173,7 @@ private static async Task RunSuperstepLoopAsync( logger.LogSuperstepExecutors(superstep, string.Join(", ", executorInputs.Select(e => e.ExecutorId))); } - string[] results = await DispatchExecutorsInParallelAsync(context, executorInputs, state.SharedState, logger).ConfigureAwait(true); + string[] results = await DispatchExecutorsInParallelAsync(context, executorInputs, state, logger).ConfigureAwait(true); haltRequested = ProcessSuperstepResults(executorInputs, results, state, context, logger); @@ -193,7 +193,7 @@ private static async Task RunSuperstepLoopAsync( // Publish final events for live streaming (skip during replay) if (!context.IsReplaying) { - PublishEventsToCustomStatus(context, state); + PublishEventsToLiveStatus(context, state); } string finalResult = GetFinalResult(state.LastResults); @@ -226,11 +226,11 @@ private static int CountRemainingExecutors(Dictionary DispatchExecutorsInParallelAsync( TaskOrchestrationContext context, List executorInputs, - Dictionary sharedState, + SuperstepState state, ILogger logger) { Task[] dispatchTasks = executorInputs - .Select(input => DurableExecutorDispatcher.DispatchAsync(context, input.Info, input.Envelope, sharedState, logger)) + .Select(input => DurableExecutorDispatcher.DispatchAsync(context, input.Info, input.Envelope, state.SharedState, state.LiveStatus, logger)) .ToArray(); return await Task.WhenAll(dispatchTasks).ConfigureAwait(true); @@ -273,9 +273,14 @@ public SuperstepState(Workflow workflow, DurableEdgeMap edgeMap) public Dictionary SharedState { get; } = []; /// - /// Accumulated workflow events for custom status (streaming consumption). + /// Accumulated workflow events for the durable workflow status (streaming consumption). /// public List AccumulatedEvents { get; } = []; + + /// + /// Workflow status published via SetCustomStatus so external clients can poll for streaming events and pending HITL requests. + /// + public DurableWorkflowLiveStatus LiveStatus { get; } = new(); } /// @@ -378,7 +383,7 @@ private static bool ProcessSuperstepResults( // Merge state updates from activity into shared state MergeStateUpdates(state, resultInfo.StateUpdates, resultInfo.ClearedScopes); - // Accumulate events for custom status (streaming) + // Accumulate events for the durable workflow status (streaming) state.AccumulatedEvents.AddRange(resultInfo.Events); // Check for halt request @@ -387,7 +392,7 @@ private static bool ProcessSuperstepResults( // Publish events for live streaming (skip during replay) if (!context.IsReplaying) { - PublishEventsToCustomStatus(context, state); + PublishEventsToLiveStatus(context, state); } RouteOutputToSuccessors(executorId, resultInfo.Result, resultInfo.SentMessages, state, logger); @@ -464,24 +469,23 @@ private static void ApplyClearedScopes(Dictionary shared, List - /// Publishes accumulated workflow events to the orchestration's custom status, + /// Publishes accumulated workflow events to the durable workflow's custom status, /// making them available to for live streaming. /// /// - /// Custom status is the only orchestration metadata readable by external clients while + /// Custom status is the only orchestration state readable by external clients while /// the orchestration is still running. It is cleared by the framework on completion, /// so events are also included in for final retrieval. /// - private static void PublishEventsToCustomStatus(TaskOrchestrationContext context, SuperstepState state) + private static void PublishEventsToLiveStatus( + TaskOrchestrationContext context, + SuperstepState state) { - DurableWorkflowCustomStatus customStatus = new() - { - Events = state.AccumulatedEvents - }; + state.LiveStatus.Events = state.AccumulatedEvents; // Pass the object directly — the framework's DataConverter handles serialization. // Pre-serializing would cause double-serialization (string wrapped in JSON quotes). - context.SetCustomStatus(customStatus); + context.SetCustomStatus(state.LiveStatus); } /// diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowWaitingForInputEvent.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowWaitingForInputEvent.cs new file mode 100644 index 0000000000..86532852c8 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowWaitingForInputEvent.cs @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Text.Json; +using Microsoft.Agents.AI.Workflows; + +namespace Microsoft.Agents.AI.DurableTask.Workflows; + +/// +/// Event raised when the durable workflow is waiting for external input at a . +/// +/// The serialized input data that was passed to the RequestPort. +/// The request port definition. +[DebuggerDisplay("RequestPort = {RequestPort.Id}")] +public sealed class DurableWorkflowWaitingForInputEvent( + string Input, + RequestPort RequestPort) : WorkflowEvent +{ + /// + /// Gets the serialized input data that was passed to the RequestPort. + /// + public string Input { get; } = Input; + + /// + /// Gets the request port definition. + /// + public RequestPort RequestPort { get; } = RequestPort; + + /// + /// Attempts to deserialize the input data to the specified type. + /// + /// The type to deserialize to. + /// The deserialized input. + /// Thrown when the input cannot be deserialized to the specified type. + [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow types provided by the caller.")] + [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow types provided by the caller.")] + public T? GetInputAs() + { + return JsonSerializer.Deserialize(this.Input, DurableSerialization.Options); + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/IStreamingWorkflowRun.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/IStreamingWorkflowRun.cs index e34e9b39d1..79771e8bd2 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/IStreamingWorkflowRun.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/IStreamingWorkflowRun.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using Microsoft.Agents.AI.Workflows; @@ -39,4 +39,17 @@ public interface IStreamingWorkflowRun /// workflow state changes. /// IAsyncEnumerable WatchStreamAsync(CancellationToken cancellationToken = default); + + /// + /// Sends a response to a to resume the workflow. + /// + /// The type of the response data. + /// The request event to respond to. + /// The response data to send. + /// A cancellation token to observe. + /// A representing the asynchronous operation. + ValueTask SendResponseAsync( + DurableWorkflowWaitingForInputEvent requestEvent, + TResponse response, + CancellationToken cancellationToken = default); } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/PendingRequestPortStatus.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/PendingRequestPortStatus.cs new file mode 100644 index 0000000000..d75ea1427d --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/PendingRequestPortStatus.cs @@ -0,0 +1,12 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Agents.AI.DurableTask.Workflows; + +/// +/// Represents a RequestPort the workflow is paused at, waiting for a response. +/// +/// The RequestPort ID identifying which input is needed. +/// The serialized request data passed to the RequestPort. +internal sealed record PendingRequestPortStatus( + string EventName, + string Input); diff --git a/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/BuiltInFunctionExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/BuiltInFunctionExecutor.cs index 07db07a1e7..25200c30bb 100644 --- a/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/BuiltInFunctionExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/BuiltInFunctionExecutor.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using Microsoft.Azure.Functions.Worker; using Microsoft.Azure.Functions.Worker.Context.Features; @@ -85,6 +85,34 @@ public async ValueTask ExecuteAsync(FunctionContext context) return; } + if (context.FunctionDefinition.EntryPoint == BuiltInFunctions.GetWorkflowStatusHttpFunctionEntryPoint) + { + if (httpRequestData == null) + { + throw new InvalidOperationException($"HTTP request data binding is missing for the invocation {context.InvocationId}."); + } + + context.GetInvocationResult().Value = await BuiltInFunctions.GetWorkflowStatusAsync( + httpRequestData, + durableTaskClient, + context); + return; + } + + if (context.FunctionDefinition.EntryPoint == BuiltInFunctions.RespondToWorkflowHttpFunctionEntryPoint) + { + if (httpRequestData == null) + { + throw new InvalidOperationException($"HTTP request data binding is missing for the invocation {context.InvocationId}."); + } + + context.GetInvocationResult().Value = await BuiltInFunctions.RespondToWorkflowAsync( + httpRequestData, + durableTaskClient, + context); + return; + } + if (context.FunctionDefinition.EntryPoint == BuiltInFunctions.InvokeWorkflowActivityFunctionEntryPoint) { if (encodedEntityRequest is null) diff --git a/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/BuiltInFunctions.cs b/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/BuiltInFunctions.cs index 97c6bbcaeb..6dc1ab2244 100644 --- a/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/BuiltInFunctions.cs +++ b/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/BuiltInFunctions.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft. All rights reserved. using System.Net; +using System.Text.Json; using System.Text.Json.Serialization; using Microsoft.Agents.AI.DurableTask; using Microsoft.Agents.AI.DurableTask.Workflows; @@ -26,6 +27,8 @@ internal static class BuiltInFunctions internal static readonly string RunWorkflowOrchestrationHttpFunctionEntryPoint = $"{typeof(BuiltInFunctions).FullName!}.{nameof(RunWorkflowOrchestrationHttpTriggerAsync)}"; internal static readonly string RunWorkflowOrchestrationFunctionEntryPoint = $"{typeof(BuiltInFunctions).FullName!}.{nameof(RunWorkflowOrchestration)}"; internal static readonly string InvokeWorkflowActivityFunctionEntryPoint = $"{typeof(BuiltInFunctions).FullName!}.{nameof(InvokeWorkflowActivityAsync)}"; + internal static readonly string GetWorkflowStatusHttpFunctionEntryPoint = $"{typeof(BuiltInFunctions).FullName!}.{nameof(GetWorkflowStatusAsync)}"; + internal static readonly string RespondToWorkflowHttpFunctionEntryPoint = $"{typeof(BuiltInFunctions).FullName!}.{nameof(RespondToWorkflowAsync)}"; #pragma warning disable IL3000 // Avoid accessing Assembly file path when publishing as a single file - Azure Functions does not use single-file publishing internal static readonly string ScriptFile = Path.GetFileName(typeof(BuiltInFunctions).Assembly.Location); @@ -63,6 +66,122 @@ public static async Task RunWorkflowOrchestrationHttpTriggerAs return response; } + /// + /// Returns the workflow status including any pending HITL requests. + /// The run ID is extracted from the route parameter {runId}. + /// + public static async Task GetWorkflowStatusAsync( + [HttpTrigger] HttpRequestData req, + [DurableClient] DurableTaskClient client, + FunctionContext context) + { + string? runId = context.BindingContext.BindingData.TryGetValue("runId", out object? value) ? value?.ToString() : null; + if (string.IsNullOrEmpty(runId)) + { + return await CreateErrorResponseAsync(req, context, HttpStatusCode.BadRequest, "Run ID is required."); + } + + OrchestrationMetadata? metadata = await client.GetInstanceAsync(runId, getInputsAndOutputs: true); + if (metadata is null) + { + return await CreateErrorResponseAsync(req, context, HttpStatusCode.NotFound, $"Workflow run '{runId}' not found."); + } + + // Parse HITL inputs the workflow is waiting for from the durable workflow status + List? waitingForInput = null; + if (DurableWorkflowLiveStatus.TryParse(metadata.SerializedCustomStatus, out DurableWorkflowLiveStatus liveStatus) + && liveStatus.PendingEvents.Count > 0) + { + waitingForInput = liveStatus.PendingEvents; + } + + HttpResponseData response = req.CreateResponse(HttpStatusCode.OK); + await response.WriteAsJsonAsync(new + { + runId, + status = metadata.RuntimeStatus.ToString(), + waitingForInput = waitingForInput?.Select(p => new { eventName = p.EventName, input = JsonDocument.Parse(p.Input).RootElement }) + }); + return response; + } + + /// + /// Sends a response to a pending RequestPort, resuming the workflow. + /// Expects a JSON body: { "eventName": "...", "response": { ... } }. + /// + public static async Task RespondToWorkflowAsync( + [HttpTrigger] HttpRequestData req, + [DurableClient] DurableTaskClient client, + FunctionContext context) + { + string? runId = context.BindingContext.BindingData.TryGetValue("runId", out object? value) ? value?.ToString() : null; + if (string.IsNullOrEmpty(runId)) + { + return await CreateErrorResponseAsync(req, context, HttpStatusCode.BadRequest, "Run ID is required."); + } + + WorkflowRespondRequest? request; + try + { + request = await req.ReadFromJsonAsync(context.CancellationToken); + } + catch (JsonException) + { + return await CreateErrorResponseAsync(req, context, HttpStatusCode.BadRequest, "Request body is not valid JSON."); + } + + if (request is null || string.IsNullOrEmpty(request.EventName) + || request.Response.ValueKind == JsonValueKind.Undefined) + { + return await CreateErrorResponseAsync(req, context, HttpStatusCode.BadRequest, "Body must contain a non-empty 'eventName' and a 'response' property."); + } + + // Verify the orchestration exists and is in a valid state + OrchestrationMetadata? metadata = await client.GetInstanceAsync(runId, getInputsAndOutputs: true); + if (metadata is null) + { + return await CreateErrorResponseAsync(req, context, HttpStatusCode.NotFound, $"Workflow run '{runId}' not found."); + } + + if (metadata.RuntimeStatus is OrchestrationRuntimeStatus.Completed + or OrchestrationRuntimeStatus.Failed + or OrchestrationRuntimeStatus.Terminated) + { + return await CreateErrorResponseAsync(req, context, HttpStatusCode.BadRequest, + $"Workflow run '{runId}' is in terminal state '{metadata.RuntimeStatus}'."); + } + + // Verify the workflow is waiting for the specified event. + // If status can't be parsed (e.g., not yet set during early execution), allow the event through — + // Durable Task safely queues it until the orchestration reaches WaitForExternalEvent. + bool eventValidated = false; + if (DurableWorkflowLiveStatus.TryParse(metadata.SerializedCustomStatus, out DurableWorkflowLiveStatus liveStatus)) + { + if (!liveStatus.PendingEvents.Exists(p => string.Equals(p.EventName, request.EventName, StringComparison.Ordinal))) + { + return await CreateErrorResponseAsync(req, context, HttpStatusCode.BadRequest, + $"Workflow is not waiting for event '{request.EventName}'."); + } + + eventValidated = true; + } + + // Raise the external event to unblock the orchestration's WaitForExternalEvent call + await client.RaiseEventAsync(runId, request.EventName, request.Response.GetRawText()); + + HttpResponseData response = req.CreateResponse(HttpStatusCode.Accepted); + await response.WriteAsJsonAsync(new + { + message = eventValidated + ? "Response sent to workflow." + : "Response sent to workflow. Event could not be validated against pending requests.", + runId, + eventName = request.EventName, + validated = eventValidated, + }); + return response; + } + /// /// Executes a workflow activity by looking up the registered executor and delegating to it. /// The executor name is derived from the activity function name via . @@ -413,6 +532,15 @@ private sealed record AgentRunAcceptedResponse( [property: JsonPropertyName("status")] int Status, [property: JsonPropertyName("thread_id")] string ThreadId); + /// + /// Represents a request to respond to a pending RequestPort in a workflow. + /// + /// The name of the event to raise (the RequestPort ID). + /// The response payload to send to the workflow. + private sealed record WorkflowRespondRequest( + [property: JsonPropertyName("eventName")] string? EventName, + [property: JsonPropertyName("response")] JsonElement Response); + /// /// A service provider that combines the original service provider with an additional DurableTaskClient instance. /// diff --git a/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/FunctionMetadataFactory.cs b/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/FunctionMetadataFactory.cs index e28b02a3b7..f50f3a03be 100644 --- a/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/FunctionMetadataFactory.cs +++ b/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/FunctionMetadataFactory.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using Microsoft.Agents.AI.DurableTask; using Microsoft.Azure.Functions.Worker.Core.FunctionMetadata; @@ -38,8 +38,9 @@ internal static DefaultFunctionMetadata CreateEntityTrigger(string name) /// The base name used to derive the HTTP function name. /// The HTTP route for the trigger. /// The entry point method for the HTTP trigger. + /// The allowed HTTP methods as a JSON array fragment (e.g., "\"get\""). Defaults to POST. /// A configured for an HTTP trigger. - internal static DefaultFunctionMetadata CreateHttpTrigger(string name, string route, string entryPoint) + internal static DefaultFunctionMetadata CreateHttpTrigger(string name, string route, string entryPoint, string methods = "\"post\"") { return new DefaultFunctionMetadata() { @@ -47,7 +48,7 @@ internal static DefaultFunctionMetadata CreateHttpTrigger(string name, string ro Language = "dotnet-isolated", RawBindings = [ - $"{{\"name\":\"req\",\"type\":\"httpTrigger\",\"direction\":\"In\",\"authLevel\":\"function\",\"methods\": [\"post\"],\"route\":\"{route}\"}}", + $"{{\"name\":\"req\",\"type\":\"httpTrigger\",\"direction\":\"In\",\"authLevel\":\"function\",\"methods\": [{methods}],\"route\":\"{route}\"}}", "{\"name\":\"$return\",\"type\":\"http\",\"direction\":\"Out\"}", "{\"name\":\"client\",\"type\":\"durableClient\",\"direction\":\"In\"}" ], diff --git a/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/FunctionsApplicationBuilderExtensions.cs b/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/FunctionsApplicationBuilderExtensions.cs index 87e390ac4a..9cf22b27f7 100644 --- a/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/FunctionsApplicationBuilderExtensions.cs +++ b/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/FunctionsApplicationBuilderExtensions.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using Microsoft.Agents.AI.DurableTask; using Microsoft.Agents.AI.DurableTask.Workflows; @@ -62,10 +62,10 @@ public static FunctionsApplicationBuilder ConfigureDurableOptions( ArgumentNullException.ThrowIfNull(builder); ArgumentNullException.ThrowIfNull(configure); - builder.Services.ConfigureDurableOptions(configure); + // Ensure FunctionsDurableOptions is registered BEFORE the core extension creates a plain DurableOptions + FunctionsDurableOptions sharedOptions = GetOrCreateSharedOptions(builder.Services); - // Read the shared options to check if workflows were added - DurableOptions sharedOptions = GetOrCreateSharedOptions(builder.Services); + builder.Services.ConfigureDurableOptions(configure); if (sharedOptions.Workflows.Workflows.Count > 0) { @@ -105,7 +105,9 @@ private static void EnsureMiddlewareRegistered(FunctionsApplicationBuilder build string.Equals(context.FunctionDefinition.EntryPoint, BuiltInFunctions.RunAgentEntityFunctionEntryPoint, StringComparison.Ordinal) || string.Equals(context.FunctionDefinition.EntryPoint, BuiltInFunctions.RunWorkflowOrchestrationHttpFunctionEntryPoint, StringComparison.Ordinal) || string.Equals(context.FunctionDefinition.EntryPoint, BuiltInFunctions.RunWorkflowOrchestrationFunctionEntryPoint, StringComparison.Ordinal) || - string.Equals(context.FunctionDefinition.EntryPoint, BuiltInFunctions.InvokeWorkflowActivityFunctionEntryPoint, StringComparison.Ordinal) + string.Equals(context.FunctionDefinition.EntryPoint, BuiltInFunctions.InvokeWorkflowActivityFunctionEntryPoint, StringComparison.Ordinal) || + string.Equals(context.FunctionDefinition.EntryPoint, BuiltInFunctions.GetWorkflowStatusHttpFunctionEntryPoint, StringComparison.Ordinal) || + string.Equals(context.FunctionDefinition.EntryPoint, BuiltInFunctions.RespondToWorkflowHttpFunctionEntryPoint, StringComparison.Ordinal) ); builder.Services.TryAddSingleton(); } @@ -113,17 +115,18 @@ private static void EnsureMiddlewareRegistered(FunctionsApplicationBuilder build /// /// Gets or creates a shared instance from the service collection. /// - private static DurableOptions GetOrCreateSharedOptions(IServiceCollection services) + private static FunctionsDurableOptions GetOrCreateSharedOptions(IServiceCollection services) { ServiceDescriptor? existingDescriptor = services.FirstOrDefault( d => d.ServiceType == typeof(DurableOptions) && d.ImplementationInstance is not null); - if (existingDescriptor?.ImplementationInstance is DurableOptions existing) + if (existingDescriptor?.ImplementationInstance is FunctionsDurableOptions existing) { return existing; } - DurableOptions options = new(); + FunctionsDurableOptions options = new(); + services.AddSingleton(options); services.AddSingleton(options); return options; } diff --git a/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/FunctionsDurableOptions.cs b/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/FunctionsDurableOptions.cs new file mode 100644 index 0000000000..1d67166aaa --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/FunctionsDurableOptions.cs @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft. All rights reserved. + +using Microsoft.Agents.AI.DurableTask; + +namespace Microsoft.Agents.AI.Hosting.AzureFunctions; + +/// +/// Provides Azure Functions–specific configuration for durable workflows. +/// +internal sealed class FunctionsDurableOptions : DurableOptions +{ + private readonly HashSet _statusEndpointWorkflows = new(StringComparer.OrdinalIgnoreCase); + + /// + /// Enables the status HTTP endpoint for the specified workflow. + /// + internal void EnableStatusEndpoint(string workflowName) + { + this._statusEndpointWorkflows.Add(workflowName); + } + + /// + /// Returns whether the status endpoint is enabled for the specified workflow. + /// + internal bool IsStatusEndpointEnabled(string workflowName) + { + return this._statusEndpointWorkflows.Contains(workflowName); + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/Workflows/DurableWorkflowOptionsExtensions.cs b/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/Workflows/DurableWorkflowOptionsExtensions.cs new file mode 100644 index 0000000000..de822cfa45 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/Workflows/DurableWorkflowOptionsExtensions.cs @@ -0,0 +1,30 @@ +// Copyright (c) Microsoft. All rights reserved. + +using Microsoft.Agents.AI.DurableTask.Workflows; +using Microsoft.Agents.AI.Workflows; + +namespace Microsoft.Agents.AI.Hosting.AzureFunctions; + +/// +/// Extension methods for to configure Azure Functions HTTP trigger options. +/// +public static class DurableWorkflowOptionsExtensions +{ + /// + /// Adds a workflow and optionally exposes a status HTTP endpoint for querying pending HITL requests. + /// + /// The workflow options to add the workflow to. + /// The workflow instance to add. + /// If , a GET endpoint is generated at workflows/{name}/status/{runId}. + public static void AddWorkflow(this DurableWorkflowOptions options, Workflow workflow, bool exposeStatusEndpoint) + { + ArgumentNullException.ThrowIfNull(options); + + options.AddWorkflow(workflow); + + if (exposeStatusEndpoint && options.ParentOptions is FunctionsDurableOptions functionsOptions) + { + functionsOptions.EnableStatusEndpoint(workflow.Name!); + } + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/Workflows/DurableWorkflowsFunctionMetadataTransformer.cs b/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/Workflows/DurableWorkflowsFunctionMetadataTransformer.cs index d1dd71061b..cada02d387 100644 --- a/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/Workflows/DurableWorkflowsFunctionMetadataTransformer.cs +++ b/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/Workflows/DurableWorkflowsFunctionMetadataTransformer.cs @@ -25,18 +25,20 @@ namespace Microsoft.Agents.AI.Hosting.AzureFunctions; internal sealed class DurableWorkflowsFunctionMetadataTransformer : IFunctionMetadataTransformer { private readonly ILogger _logger; - private readonly DurableWorkflowOptions _options; + private readonly FunctionsDurableOptions _options; /// /// Initializes a new instance of the class. /// /// The logger instance for diagnostic output. /// The durable options containing workflow configurations. - public DurableWorkflowsFunctionMetadataTransformer(ILogger logger, DurableOptions durableOptions) + public DurableWorkflowsFunctionMetadataTransformer( + ILogger logger, + FunctionsDurableOptions durableOptions) { this._logger = logger ?? throw new ArgumentNullException(nameof(logger)); ArgumentNullException.ThrowIfNull(durableOptions); - this._options = durableOptions.Workflows; + this._options = durableOptions; } /// @@ -51,7 +53,7 @@ public void Transform(IList original) // Track registered function names to avoid duplicates when workflows share executors. HashSet registeredFunctions = []; - foreach (var workflow in this._options.Workflows) + foreach (var workflow in this._options.Workflows.Workflows) { string httpFunctionName = $"{BuiltInFunctions.HttpPrefix}{workflow.Key}"; @@ -80,12 +82,42 @@ public void Transform(IList original) BuiltInFunctions.RunWorkflowOrchestrationHttpFunctionEntryPoint)); } + // Register a status endpoint if opted in via AddWorkflow(exposeStatusEndpoint: true). + if (this._options.IsStatusEndpointEnabled(workflow.Key)) + { + string statusFunctionName = $"{BuiltInFunctions.HttpPrefix}{workflow.Key}-status"; + if (registeredFunctions.Add(statusFunctionName)) + { + this._logger.LogRegisteringWorkflowTrigger(workflow.Key, statusFunctionName, "http-status"); + original.Add(FunctionMetadataFactory.CreateHttpTrigger( + $"{workflow.Key}-status", + $"workflows/{workflow.Key}/status/{{runId}}", + BuiltInFunctions.GetWorkflowStatusHttpFunctionEntryPoint, + methods: "\"get\"")); + } + } + + // Register a respond endpoint when the workflow contains RequestPort nodes. + bool hasRequestPorts = workflow.Value.ReflectExecutors().Values.Any(b => b is RequestPortBinding); + if (hasRequestPorts) + { + string respondFunctionName = $"{BuiltInFunctions.HttpPrefix}{workflow.Key}-respond"; + if (registeredFunctions.Add(respondFunctionName)) + { + this._logger.LogRegisteringWorkflowTrigger(workflow.Key, respondFunctionName, "http-respond"); + original.Add(FunctionMetadataFactory.CreateHttpTrigger( + $"{workflow.Key}-respond", + $"workflows/{workflow.Key}/respond/{{runId}}", + BuiltInFunctions.RespondToWorkflowHttpFunctionEntryPoint)); + } + } + // Register activity or entity functions for each executor in the workflow. // ReflectExecutors() returns all executors across the graph; no need to manually traverse edges. foreach (KeyValuePair entry in workflow.Value.ReflectExecutors()) { - // Sub-workflow bindings are handled as separate orchestrations, not activities. - if (entry.Value is SubworkflowBinding) + // Sub-workflow and RequestPort bindings use specialized dispatch, not activities. + if (entry.Value is SubworkflowBinding or RequestPortBinding) { continue; } diff --git a/dotnet/tests/Microsoft.Agents.AI.DurableTask.IntegrationTests/WorkflowConsoleAppSamplesValidation.cs b/dotnet/tests/Microsoft.Agents.AI.DurableTask.IntegrationTests/WorkflowConsoleAppSamplesValidation.cs index 97e2a1ef13..83158803dc 100644 --- a/dotnet/tests/Microsoft.Agents.AI.DurableTask.IntegrationTests/WorkflowConsoleAppSamplesValidation.cs +++ b/dotnet/tests/Microsoft.Agents.AI.DurableTask.IntegrationTests/WorkflowConsoleAppSamplesValidation.cs @@ -451,6 +451,59 @@ await this.RunSampleTestAsync(samplePath, async (process, logs) => }); } + [Fact] + public async Task WorkflowHITLSampleValidationAsync() + { + using CancellationTokenSource testTimeoutCts = this.CreateTestTimeoutCts(); + string samplePath = Path.Combine(s_samplesPath, "08_WorkflowHITL"); + + await this.RunSampleTestAsync(samplePath, (process, logs) => + { + bool foundStarted = false; + bool foundManagerApprovalPause = false; + bool foundManagerApprovalInput = false; + bool foundManagerResponseSent = false; + bool foundBudgetApprovalPause = false; + bool foundBudgetResponseSent = false; + bool foundComplianceApprovalPause = false; + bool foundComplianceResponseSent = false; + bool foundWorkflowCompleted = false; + + string? line; + while ((line = this.ReadLogLine(logs, testTimeoutCts.Token)) != null) + { + foundStarted |= line.Contains("Starting expense reimbursement workflow", StringComparison.Ordinal); + foundManagerApprovalPause |= line.Contains("Workflow paused at RequestPort: ManagerApproval", StringComparison.Ordinal); + foundManagerApprovalInput |= line.Contains("Approval for: Jerry", StringComparison.Ordinal); + foundManagerResponseSent |= line.Contains("Response sent: Approved=True", StringComparison.Ordinal) && foundManagerApprovalPause && !foundBudgetApprovalPause && !foundComplianceApprovalPause; + foundBudgetApprovalPause |= line.Contains("Workflow paused at RequestPort: BudgetApproval", StringComparison.Ordinal); + foundBudgetResponseSent |= line.Contains("Response sent: Approved=True", StringComparison.Ordinal) && foundBudgetApprovalPause; + foundComplianceApprovalPause |= line.Contains("Workflow paused at RequestPort: ComplianceApproval", StringComparison.Ordinal); + foundComplianceResponseSent |= line.Contains("Response sent: Approved=True", StringComparison.Ordinal) && foundComplianceApprovalPause; + + if (line.Contains("Workflow completed: Expense reimbursed at", StringComparison.Ordinal)) + { + foundWorkflowCompleted = true; + break; + } + + this.AssertNoError(line); + } + + Assert.True(foundStarted, "Workflow start message not found."); + Assert.True(foundManagerApprovalPause, "Manager approval pause not found."); + Assert.True(foundManagerApprovalInput, "Manager approval input (Jerry) not found."); + Assert.True(foundManagerResponseSent, "Manager approval response not sent."); + Assert.True(foundBudgetApprovalPause, "Budget approval pause not found."); + Assert.True(foundBudgetResponseSent, "Budget approval response not sent."); + Assert.True(foundComplianceApprovalPause, "Compliance approval pause not found."); + Assert.True(foundComplianceResponseSent, "Compliance approval response not sent."); + Assert.True(foundWorkflowCompleted, "Workflow did not complete successfully."); + + return Task.CompletedTask; + }); + } + [Fact] public async Task WorkflowAndAgentsSampleValidationAsync() { diff --git a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableActivityExecutorTests.cs b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableActivityExecutorTests.cs new file mode 100644 index 0000000000..6b817fe84b --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableActivityExecutorTests.cs @@ -0,0 +1,235 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Text.Json; +using Microsoft.Agents.AI.DurableTask.Workflows; + +namespace Microsoft.Agents.AI.DurableTask.UnitTests.Workflows; + +public sealed class DurableActivityExecutorTests +{ + private static readonly JsonSerializerOptions s_camelCaseOptions = new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + PropertyNameCaseInsensitive = true + }; + + #region DeserializeInput + + [Fact] + public void DeserializeInput_StringType_ReturnsInputAsIs() + { + // Arrange + const string Input = "hello world"; + + // Act + object result = DurableActivityExecutor.DeserializeInput(Input, typeof(string)); + + // Assert + Assert.Equal("hello world", result); + } + + [Fact] + public void DeserializeInput_SimpleObject_DeserializesCorrectly() + { + // Arrange + string input = JsonSerializer.Serialize(new TestRecord("EXP-001", 100.50m), s_camelCaseOptions); + + // Act + object result = DurableActivityExecutor.DeserializeInput(input, typeof(TestRecord)); + + // Assert + TestRecord record = Assert.IsType(result); + Assert.Equal("EXP-001", record.Id); + Assert.Equal(100.50m, record.Amount); + } + + [Fact] + public void DeserializeInput_StringArray_DeserializesDirectly() + { + // Arrange + string input = JsonSerializer.Serialize((string[])["a", "b", "c"]); + + // Act + object result = DurableActivityExecutor.DeserializeInput(input, typeof(string[])); + + // Assert + string[] array = Assert.IsType(result); + Assert.Equal(["a", "b", "c"], array); + } + + [Fact] + public void DeserializeInput_TypedArrayFromFanIn_DeserializesEachElement() + { + // Arrange — fan-in produces a JSON array of serialized strings + TestRecord r1 = new("EXP-001", 100m); + TestRecord r2 = new("EXP-002", 200m); + string[] serializedElements = + [ + JsonSerializer.Serialize(r1, s_camelCaseOptions), + JsonSerializer.Serialize(r2, s_camelCaseOptions) + ]; + string input = JsonSerializer.Serialize(serializedElements); + + // Act + object result = DurableActivityExecutor.DeserializeInput(input, typeof(TestRecord[])); + + // Assert + TestRecord[] records = Assert.IsType(result); + Assert.Equal(2, records.Length); + Assert.Equal("EXP-001", records[0].Id); + Assert.Equal(100m, records[0].Amount); + Assert.Equal("EXP-002", records[1].Id); + Assert.Equal(200m, records[1].Amount); + } + + [Fact] + public void DeserializeInput_TypedArrayWithSingleElement_DeserializesCorrectly() + { + // Arrange + TestRecord r1 = new("EXP-001", 50m); + string[] serializedElements = [JsonSerializer.Serialize(r1, s_camelCaseOptions)]; + string input = JsonSerializer.Serialize(serializedElements); + + // Act + object result = DurableActivityExecutor.DeserializeInput(input, typeof(TestRecord[])); + + // Assert + TestRecord[] records = Assert.IsType(result); + Assert.Single(records); + Assert.Equal("EXP-001", records[0].Id); + } + + [Fact] + public void DeserializeInput_TypedArrayWithNullElement_ThrowsInvalidOperationException() + { + // Arrange — one element is "null" + string input = JsonSerializer.Serialize((string[])["null"]); + + // Act & Assert + Assert.Throws( + () => DurableActivityExecutor.DeserializeInput(input, typeof(TestRecord[]))); + } + + [Fact] + public void DeserializeInput_InvalidJson_ThrowsJsonException() + { + // Arrange + const string Input = "not valid json"; + + // Act & Assert + Assert.ThrowsAny( + () => DurableActivityExecutor.DeserializeInput(Input, typeof(TestRecord))); + } + + #endregion + + #region ResolveInputType + + [Fact] + public void ResolveInputType_NullTypeName_ReturnsFirstSupportedType() + { + // Arrange + HashSet supportedTypes = [typeof(TestRecord), typeof(string)]; + + // Act + Type result = DurableActivityExecutor.ResolveInputType(null, supportedTypes); + + // Assert + Assert.Equal(typeof(TestRecord), result); + } + + [Fact] + public void ResolveInputType_EmptyTypeName_ReturnsFirstSupportedType() + { + // Arrange + HashSet supportedTypes = [typeof(TestRecord)]; + + // Act + Type result = DurableActivityExecutor.ResolveInputType(string.Empty, supportedTypes); + + // Assert + Assert.Equal(typeof(TestRecord), result); + } + + [Fact] + public void ResolveInputType_EmptySupportedTypes_DefaultsToString() + { + // Arrange + HashSet supportedTypes = []; + + // Act + Type result = DurableActivityExecutor.ResolveInputType(null, supportedTypes); + + // Assert + Assert.Equal(typeof(string), result); + } + + [Fact] + public void ResolveInputType_MatchesByFullName() + { + // Arrange + HashSet supportedTypes = [typeof(TestRecord)]; + + // Act + Type result = DurableActivityExecutor.ResolveInputType(typeof(TestRecord).FullName, supportedTypes); + + // Assert + Assert.Equal(typeof(TestRecord), result); + } + + [Fact] + public void ResolveInputType_MatchesByName() + { + // Arrange + HashSet supportedTypes = [typeof(TestRecord)]; + + // Act + Type result = DurableActivityExecutor.ResolveInputType("TestRecord", supportedTypes); + + // Assert + Assert.Equal(typeof(TestRecord), result); + } + + [Fact] + public void ResolveInputType_StringArrayFallsBackToSupportedType() + { + // Arrange — fan-in sends string[] but executor expects TestRecord[] + HashSet supportedTypes = [typeof(TestRecord[])]; + + // Act + Type result = DurableActivityExecutor.ResolveInputType(typeof(string[]).FullName, supportedTypes); + + // Assert + Assert.Equal(typeof(TestRecord[]), result); + } + + [Fact] + public void ResolveInputType_StringFallsBackToSupportedType() + { + // Arrange — executor doesn't support string + HashSet supportedTypes = [typeof(TestRecord)]; + + // Act + Type result = DurableActivityExecutor.ResolveInputType(typeof(string).FullName, supportedTypes); + + // Assert + Assert.Equal(typeof(TestRecord), result); + } + + [Fact] + public void ResolveInputType_StringArrayRetainedWhenSupported() + { + // Arrange — executor explicitly supports string[] + HashSet supportedTypes = [typeof(string[])]; + + // Act + Type result = DurableActivityExecutor.ResolveInputType(typeof(string[]).FullName, supportedTypes); + + // Assert + Assert.Equal(typeof(string[]), result); + } + + #endregion + + private sealed record TestRecord(string Id, decimal Amount); +} diff --git a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableStreamingWorkflowRunTests.cs b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableStreamingWorkflowRunTests.cs index c4b9037c94..4f07167942 100644 --- a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableStreamingWorkflowRunTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableStreamingWorkflowRunTests.cs @@ -36,8 +36,28 @@ private static OrchestrationMetadata CreateMetadata( private static string SerializeCustomStatus(List events) { - DurableWorkflowCustomStatus status = new() { Events = events }; - return JsonSerializer.Serialize(status, DurableWorkflowJsonContext.Default.DurableWorkflowCustomStatus); + DurableWorkflowLiveStatus status = new() { Events = events }; + return JsonSerializer.Serialize(status, DurableSerialization.Options); + } + + private static string SerializeCustomStatusWithPendingEvents( + List events, + List pendingEvents) + { + DurableWorkflowLiveStatus status = new() { Events = events, PendingEvents = pendingEvents }; + return JsonSerializer.Serialize(status, DurableSerialization.Options); + } + + private static Workflow CreateTestWorkflowWithRequestPort(string requestPortId) + { + FunctionExecutor start = new("start", (_, _, _) => default); + RequestPort requestPort = RequestPort.Create(requestPortId); + FunctionExecutor end = new("end", (_, _, _) => default); + return new WorkflowBuilder(start) + .WithName(WorkflowTestName) + .AddEdge(start, requestPort) + .AddEdge(requestPort, end) + .Build(); } private static string SerializeWorkflowResult(string? result, List events) @@ -486,6 +506,127 @@ public async Task WatchStreamAsync_Cancellation_EndsGracefullyAsync() Assert.Empty(events); } + [Fact] + public async Task WatchStreamAsync_PendingRequestPort_YieldsWaitingForInputEventAsync() + { + // Arrange + string customStatus = SerializeCustomStatusWithPendingEvents( + [], + [new PendingRequestPortStatus("ApprovalPort", """{"amount":100}""")]); + string serializedOutput = SerializeWorkflowResult("approved", []); + + int callCount = 0; + Mock mockClient = new("test"); + mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny())) + .ReturnsAsync(() => + { + callCount++; + return callCount == 1 + ? CreateMetadata(OrchestrationRuntimeStatus.Running, serializedCustomStatus: customStatus) + : CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput); + }); + + Workflow workflow = CreateTestWorkflowWithRequestPort("ApprovalPort"); + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, workflow); + + // Act + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert + Assert.Equal(2, events.Count); + DurableWorkflowWaitingForInputEvent waitingEvent = Assert.IsType(events[0]); + Assert.Equal("ApprovalPort", waitingEvent.RequestPort.Id); + Assert.Contains("amount", waitingEvent.Input); + DurableWorkflowCompletedEvent completedEvent = Assert.IsType(events[1]); + Assert.Equal("approved", completedEvent.Result); + } + + [Fact] + public async Task WatchStreamAsync_PendingRequestPort_DoesNotDuplicateOnSubsequentPollsAsync() + { + // Arrange — same pending event across 2 polls, then completion + string customStatus = SerializeCustomStatusWithPendingEvents( + [], + [new PendingRequestPortStatus("ApprovalPort", """{"amount":100}""")]); + string serializedOutput = SerializeWorkflowResult("done", []); + + int callCount = 0; + Mock mockClient = new("test"); + mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny())) + .ReturnsAsync(() => + { + callCount++; + return callCount switch + { + <= 2 => CreateMetadata(OrchestrationRuntimeStatus.Running, serializedCustomStatus: customStatus), + _ => CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput), + }; + }); + + Workflow workflow = CreateTestWorkflowWithRequestPort("ApprovalPort"); + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, workflow); + + // Act + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert — WaitingForInputEvent yielded only once despite 2 polls + Assert.Equal(2, events.Count); + Assert.IsType(events[0]); + Assert.IsType(events[1]); + } + + #endregion + + #region SendResponseAsync + + [Fact] + public async Task SendResponseAsync_SerializesAndRaisesEventAsync() + { + // Arrange + Mock mockClient = new("test"); + mockClient.Setup(c => c.RaiseEventAsync( + InstanceId, + "ApprovalPort", + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + + RequestPort approvalPort = RequestPort.Create("ApprovalPort"); + DurableWorkflowWaitingForInputEvent requestEvent = new("""{"amount":100}""", approvalPort); + Workflow workflow = CreateTestWorkflowWithRequestPort("ApprovalPort"); + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, workflow); + + // Act + await run.SendResponseAsync(requestEvent, new { approved = true, comments = "Looks good" }); + + // Assert + mockClient.Verify(c => c.RaiseEventAsync( + InstanceId, + "ApprovalPort", + It.Is(s => s.Contains("approved") && s.Contains("true")), + It.IsAny()), Times.Once); + } + + [Fact] + public async Task SendResponseAsync_NullRequestEvent_ThrowsAsync() + { + // Arrange + Mock mockClient = new("test"); + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act & Assert + await Assert.ThrowsAsync(() => + run.SendResponseAsync(null!, "response").AsTask()); + } + #endregion #region WaitForCompletionAsync diff --git a/dotnet/tests/Microsoft.Agents.AI.Hosting.AzureFunctions.IntegrationTests/WorkflowSamplesValidation.cs b/dotnet/tests/Microsoft.Agents.AI.Hosting.AzureFunctions.IntegrationTests/WorkflowSamplesValidation.cs index 53be67daf2..34e8f9ef61 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Hosting.AzureFunctions.IntegrationTests/WorkflowSamplesValidation.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Hosting.AzureFunctions.IntegrationTests/WorkflowSamplesValidation.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using System.Diagnostics; using System.Reflection; @@ -117,6 +117,115 @@ await this.WaitForConditionAsync( }); } + [Fact] + public async Task HITLWorkflowSampleValidationAsync() + { + string samplePath = Path.Combine(s_samplesPath, "03_WorkflowHITL"); + await this.RunSampleTestAsync(samplePath, requiresOpenAI: false, async (logs) => + { + // Use a unique run ID to avoid conflicts with previous test runs + string runId = $"hitl-test-{Guid.NewGuid():N}"; + + // Step 1: Start the expense reimbursement workflow + Uri runUri = new($"http://localhost:{AzureFunctionsPort}/api/workflows/ExpenseReimbursement/run?runId={runId}"); + this._outputHelper.WriteLine($"Starting ExpenseReimbursement workflow via POST request to {runUri}..."); + + using HttpContent runContent = new StringContent("EXP-2025-001", Encoding.UTF8, "text/plain"); + using HttpResponseMessage runResponse = await s_sharedHttpClient.PostAsync(runUri, runContent); + + Assert.True(runResponse.IsSuccessStatusCode, $"Run request failed with status: {runResponse.StatusCode}"); + string runResponseText = await runResponse.Content.ReadAsStringAsync(); + Assert.Contains("ExpenseReimbursement", runResponseText); + this._outputHelper.WriteLine($"Run response: {runResponseText}"); + + // Step 2: Wait for the workflow to pause at the ManagerApproval RequestPort + await this.WaitForConditionAsync( + condition: () => + { + lock (logs) + { + bool exists = logs.Any(log => log.Message.Contains("Workflow waiting for external input at RequestPort 'ManagerApproval'")); + return Task.FromResult(exists); + } + }, + message: "Workflow paused at ManagerApproval RequestPort", + timeout: s_orchestrationTimeout); + + // Step 3: Send approval response to resume the workflow + Uri respondUri = new($"http://localhost:{AzureFunctionsPort}/api/workflows/ExpenseReimbursement/respond/{runId}"); + this._outputHelper.WriteLine($"Sending approval response via POST request to {respondUri}..."); + + using HttpContent respondContent = new StringContent( + """{"eventName": "ManagerApproval", "response": {"Approved": true, "Comments": "Approved by test."}}""", + Encoding.UTF8, "application/json"); + using HttpResponseMessage respondResponse = await s_sharedHttpClient.PostAsync(respondUri, respondContent); + + Assert.True(respondResponse.IsSuccessStatusCode, $"Respond request failed with status: {respondResponse.StatusCode}"); + string respondResponseText = await respondResponse.Content.ReadAsStringAsync(); + Assert.Contains("Response sent to workflow", respondResponseText); + this._outputHelper.WriteLine($"Respond response: {respondResponseText}"); + + // Step 4: Wait for the workflow to pause at the parallel BudgetApproval and ComplianceApproval RequestPorts + await this.WaitForConditionAsync( + condition: () => + { + lock (logs) + { + bool exists = logs.Any(log => log.Message.Contains("Workflow waiting for external input at RequestPort 'BudgetApproval'")); + return Task.FromResult(exists); + } + }, + message: "Workflow paused at BudgetApproval RequestPort", + timeout: s_orchestrationTimeout); + + // Step 5a: Send budget approval response + this._outputHelper.WriteLine("Sending BudgetApproval response..."); + + using HttpContent budgetContent = new StringContent( + """{"eventName": "BudgetApproval", "response": {"Approved": true, "Comments": "Budget approved by test."}}""", + Encoding.UTF8, "application/json"); + using HttpResponseMessage budgetResponse = await s_sharedHttpClient.PostAsync(respondUri, budgetContent); + + Assert.True(budgetResponse.IsSuccessStatusCode, $"BudgetApproval request failed with status: {budgetResponse.StatusCode}"); + this._outputHelper.WriteLine($"BudgetApproval response: {await budgetResponse.Content.ReadAsStringAsync()}"); + + // Step 5b: Send compliance approval response + this._outputHelper.WriteLine("Sending ComplianceApproval response..."); + + using HttpContent complianceContent = new StringContent( + """{"eventName": "ComplianceApproval", "response": {"Approved": true, "Comments": "Compliance approved by test."}}""", + Encoding.UTF8, "application/json"); + using HttpResponseMessage complianceResponse = await s_sharedHttpClient.PostAsync(respondUri, complianceContent); + + Assert.True(complianceResponse.IsSuccessStatusCode, $"ComplianceApproval request failed with status: {complianceResponse.StatusCode}"); + this._outputHelper.WriteLine($"ComplianceApproval response: {await complianceResponse.Content.ReadAsStringAsync()}"); + + // Step 6: Wait for the workflow to complete + await this.WaitForConditionAsync( + condition: () => + { + lock (logs) + { + bool exists = logs.Any(log => log.Message.Contains("Workflow completed")); + return Task.FromResult(exists); + } + }, + message: "HITL workflow completed", + timeout: s_orchestrationTimeout); + + // Verify executor activities ran + lock (logs) + { + Assert.True(logs.Any(log => log.Message.Contains("Received external event for RequestPort 'ManagerApproval'")), + "ManagerApproval external event receipt not found in logs."); + Assert.True(logs.Any(log => log.Message.Contains("Received external event for RequestPort 'BudgetApproval'")), + "BudgetApproval external event receipt not found in logs."); + Assert.True(logs.Any(log => log.Message.Contains("Received external event for RequestPort 'ComplianceApproval'")), + "ComplianceApproval external event receipt not found in logs."); + } + }); + } + [Fact] public async Task ConcurrentWorkflowSampleValidationAsync() {