Skip to content

DHI.Services.Jobs.WorkflowWorker — Internal Developer Guide

This package lets a Jobs host talk to remote Workflow hosts over SignalR. It provides:

  • a SignalR Hub (WorkerHub) your workflow hosts connect to
  • a remote worker (SignalRWorkflowWorker) your Job engine can use to run/cancel/time-out workflows on a given host
  • in-memory host registry & caches for availability and ad-hoc reports
  • DI helpers and a JSON converter for Dictionary<string,object>

Install

dotnet add package DHI.Services.Jobs.WorkflowWorker
dotnet add package Microsoft.AspNetCore.SignalR

You’ll typically use this from an ASP.NET Core service that exposes the Hub.


What connects to what?

  • Workflow Host (client): your machine/agent running workflows. It connects to the Hub and implements the client interface IWorkerClient (handlers for OnRunJob, OnAvailable, OnReport).
  • Jobs service (server): hosts the Hub, keeps track of connected hosts, and calls them through SignalRWorkflowWorker.
Jobs service                          Workflow Host (client)
-------------                         ----------------------
WorkerHub (SignalR hub)  <====>       HubConnection
Server-to-client calls:               Implements IWorkerClient:
  - OnRunJob(...)                       - OnRunJob(dto, definition)
  - OnAvailable()                        - calls hub.AvailableResponse(bool)
  - OnReport()                           - calls hub.ReportResponse(dict)

Quick start (server)

1) Wire up services (DI)

using DHI.Services.Jobs.WorkflowWorker;

var builder = WebApplication.CreateBuilder(args);

// SignalR + auth as you normally do
builder.Services.AddSignalR();
builder.Services.AddAuthentication(/* your scheme */);
builder.Services.AddAuthorization();

// Register workflow-worker services
builder.Services.Adds(builder.Services, builder.Logging.CreateLogger("WorkflowWorker")); // DI helper

var app = builder.Build();

// Map the hub
app.MapHub<WorkerHub>("/workerHub").RequireAuthorization();

app.Run();

DependencyInjection.Adds(...) registers:

  • IUserIdProviderMachineNameUserIdProvider
  • AvailableCache, ReportCache
  • SignalRWorkflowWorker (the remote worker you’ll call)
  • ISignalRHostCollection & IHostRepositorySignalRHostRepository

2) Authentication & claims

WorkerHub is [Authorize]. Your auth should issue these claims to each connecting workflow host:

  • HostGroup — e.g., prod, dev-west
  • Priority (optional, int) — default 1
  • RunningJobsLimit (optional, int) — default 1
  • MachineName — recommended; if missing, a random id is used

The user id for a connection is computed by MachineNameUserIdProvider as:

{HostGroup}/{MachineName}

Use that exact hostId when targeting a specific host from the server.

3) Map groups

On connect, the hub adds the connection into the SignalR group named by HostGroup. You can broadcast to a whole group (e.g., prod) if you need to fan out messages.


Using the remote worker from Jobs

Inject SignalRWorkflowWorker where you dispatch jobs:

using DHI.Services.Jobs.WorkflowWorker;
using DHI.Services.Jobs.Workflows;
using DHI.Services.Jobs.Workflows.Code; // if using CodeWorkflow

public class MyJobDispatcher
{
    private readonly SignalRWorkflowWorker _worker;

    public MyJobDispatcher(SignalRWorkflowWorker worker) => _worker = worker;

    public void Run(Guid jobId, ITask<string> task, Dictionary<string,object> parameters, string hostGroup, string machineName)
    {
        var hostId = $"{hostGroup}/{machineName}";

        // Optional: ping for availability (blocks up to ~5s)
        if (!_worker.IsHostAvailable(hostId))
            throw new InvalidOperationException($"Host {hostId} is not available.");

        // Execute (task must be Workflow or CodeWorkflow)
        _worker.Execute(jobId, task, parameters, hostId);
    }

    public void Cancel(Guid jobId, string hostId)  => _worker.Cancel(jobId, hostId);
    public void Timeout(Guid jobId, string hostId) => _worker.Timeout(jobId, hostId);
}

Task types supported

  • Workflow — send an existing serialized workflow (workflow.Definition must be non-empty)
  • CodeWorkflow — generates a definition from code (codeWorkflow.ToDefinition())

If you pass anything else, Execute throws.

Events you can observe

_worker.Executing       += (_, e) => { var (jobId, msg) = e.Value; /* log */ };
_worker.Executed        += (_, e) => { var (jobId, status, msg) = e.Value; /* mark done */ };
_worker.Cancelling      += (_, e) => { /* job cancel requested */ };
_worker.Cancelled       += (_, e) => { var (jobId, msg) = e.Value; };
_worker.ProgressChanged += (_, e) => { var (jobId, prog) = e.Value; };
_worker.HostNotAvailable+= (_, e) => { var jobId = e.Value; /* reschedule */ };

Host registry & caches

Host registry (ISignalRHostCollection)

Backed by an in-memory ConcurrentDictionary<string, ConcurrentBag<Host>> keyed by group.

Read methods (write operations throw; hosts are added/removed by hub connect/disconnect):

// get all known hosts (deduped per connection id)
IEnumerable<Host> all     = hostCollection.GetAll();
IEnumerable<Host> prod    = hostCollection.GetGroupMembers("prod");
bool exists               = hostCollection.Contains(hostId);
int count                 = hostCollection.Count();

Duplicate handling: reconnects may momentarily produce duplicates; reads are deduped by Id.

Availability cache (AvailableCache)

SignalRWorkflowWorker.IsHostAvailable(hostId) sends OnAvailable to the client and waits up to ~5s for the client to call hub.AvailableResponse(bool). Results are stored as:

// key = hostId, value = (available, lastSeenUtc)
ConcurrentDictionary<string,(bool,DateTime)> AvailableCache

You can inject and inspect it yourself if needed.

Report cache (ReportCache)

When the server invokes OnReport on a client, the client should respond with a Dictionary<string,object> via hub.ReportResponse(...). The latest per-host payload is stored:

// key = hostId, value = (reportDict, lastSeenUtc)
ConcurrentDictionary<string,(Dictionary<string,object>, DateTime)> ReportCache

Use the included DictionaryStringObjectJsonConverter if you need robust Dictionary<string,object> (de)serialization in your own controllers/services.


The hub surface (WorkerHub)

Server-side methods the client calls:

  • AvailableResponse(bool available) — writes to AvailableCache
  • ReportResponse(Dictionary<string,object> report) — writes to ReportCache

Connection lifecycle:

  • OnConnectedAsync
    • logs
    • builds a Host from claims and adds it to the SignalRHostRepository
    • adds connection to the SignalR group named by HostGroup
  • OnDisconnectedAsync
    • removes the member from the repository (with reconnect/duplicate safety)

Implementing the client (IWorkerClient)

Your workflow host app connects to /workerHub and implements these handlers:

public class WorkflowClient : IWorkerClient
{
    public void OnRunJob(WorkflowDto dto, string definition)
    {
        // start the workflow and stream progress back via server-invoked callbacks
        // e.g., hubConnection.SendAsync("ProgressChanged", ...) if you add such calls
    }

    public Task OnAvailable()
    {
        // compute availability and tell the hub
        return hubConnection.SendAsync("AvailableResponse", available: true);
    }

    public void OnReport()
    {
        var payload = new Dictionary<string,object>
        {
            ["cpu"] = 0.42,
            ["memMB"] = 512,
            ["running"] = 1
        };
        hubConnection.SendAsync("ReportResponse", payload);
    }
}

Make sure the client authenticates and includes the required claims. Its connection UserIdentifier must match the server’s MachineNameUserIdProvider (HostGroup + MachineName).


Querying hosts by group (readonly service)

If you prefer a service abstraction instead of the raw repository:

var hostService = new SignalRHostService(hostCollection, new[] { "prod", "dev", "qa" });

IEnumerable<Host> prodHosts = hostService.GetByGroup("prod");
bool knownGroup            = hostService.GroupExists("prod");
Host one                   = hostService.Get(hostId);
int total                  = hostService.Count();

All mutation methods throw by design (the hub owns membership).


Operational notes & best practices

  • Backplane: if you scale the Hub across instances, set up a SignalR backplane (e.g., Redis). The provided host/caches are in-memory per node—plan your topology accordingly.
  • Targeting hosts: Always construct hostId using the same rule the server uses: "{HostGroup}/{MachineName}".
  • Timeouts: IsHostAvailable waits ~5s polling AvailableCache. If your network is slow, consider calling it sparingly and caching results at your layer.
  • Duplicates & reconnects: The repository dedupes on read; transient duplicates are tolerated during reconnect.
  • Security: The hub is [Authorize]. Keep tokens/claims strict; prefer short-lived access tokens for hosts.
  • Limits: Use RunningJobsLimit and Priority claims to inform your scheduling layer (they’re stored on Host).

Minimal end-to-end checklist

  1. Host an ASP.NET Core app with AddSignalR() and MapHub<WorkerHub>("/workerHub").
  2. Call services.Adds(services, logger) to register all components.
  3. Ensure your auth issues HostGroup, MachineName (and optional Priority, RunningJobsLimit) claims to workflow hosts.
  4. In your Jobs component, inject SignalRWorkflowWorker and call:
    • .IsHostAvailable(hostId) (optional)
    • .Execute(jobId, workflowOrCodeWorkflow, parameters, hostId)
    • .Cancel(jobId, hostId) / .Timeout(jobId, hostId) as needed.
  5. On the workflow host, implement IWorkerClient handlers and call back AvailableResponse/ReportResponse.