DHI.Services.Jobs.WorkflowWorker — Internal Developer Guide¶
This package lets a Jobs host talk to remote Workflow hosts over SignalR. It provides:
- a SignalR Hub (
WorkerHub) your workflow hosts connect to - a remote worker (
SignalRWorkflowWorker) your Job engine can use to run/cancel/time-out workflows on a given host - in-memory host registry & caches for availability and ad-hoc reports
- DI helpers and a JSON converter for
Dictionary<string,object>
Install¶
dotnet add package DHI.Services.Jobs.WorkflowWorker
dotnet add package Microsoft.AspNetCore.SignalR
You’ll typically use this from an ASP.NET Core service that exposes the Hub.
What connects to what?¶
- Workflow Host (client): your machine/agent running workflows. It connects to the Hub and implements the client interface
IWorkerClient(handlers forOnRunJob,OnAvailable,OnReport). - Jobs service (server): hosts the Hub, keeps track of connected hosts, and calls them through
SignalRWorkflowWorker.
Jobs service Workflow Host (client)
------------- ----------------------
WorkerHub (SignalR hub) <====> HubConnection
Server-to-client calls: Implements IWorkerClient:
- OnRunJob(...) - OnRunJob(dto, definition)
- OnAvailable() - calls hub.AvailableResponse(bool)
- OnReport() - calls hub.ReportResponse(dict)
Quick start (server)¶
1) Wire up services (DI)¶
using DHI.Services.Jobs.WorkflowWorker;
var builder = WebApplication.CreateBuilder(args);
// SignalR + auth as you normally do
builder.Services.AddSignalR();
builder.Services.AddAuthentication(/* your scheme */);
builder.Services.AddAuthorization();
// Register workflow-worker services
builder.Services.Adds(builder.Services, builder.Logging.CreateLogger("WorkflowWorker")); // DI helper
var app = builder.Build();
// Map the hub
app.MapHub<WorkerHub>("/workerHub").RequireAuthorization();
app.Run();
DependencyInjection.Adds(...) registers:
IUserIdProvider→ MachineNameUserIdProviderAvailableCache,ReportCacheSignalRWorkflowWorker(the remote worker you’ll call)ISignalRHostCollection&IHostRepository→ SignalRHostRepository
2) Authentication & claims¶
WorkerHub is [Authorize]. Your auth should issue these claims to each connecting workflow host:
HostGroup— e.g.,prod,dev-westPriority(optional, int) — default1RunningJobsLimit(optional, int) — default1MachineName— recommended; if missing, a random id is used
The user id for a connection is computed by MachineNameUserIdProvider as:
{HostGroup}/{MachineName}
Use that exact hostId when targeting a specific host from the server.
3) Map groups¶
On connect, the hub adds the connection into the SignalR group named by HostGroup. You can broadcast to a whole group (e.g., prod) if you need to fan out messages.
Using the remote worker from Jobs¶
Inject SignalRWorkflowWorker where you dispatch jobs:
using DHI.Services.Jobs.WorkflowWorker;
using DHI.Services.Jobs.Workflows;
using DHI.Services.Jobs.Workflows.Code; // if using CodeWorkflow
public class MyJobDispatcher
{
private readonly SignalRWorkflowWorker _worker;
public MyJobDispatcher(SignalRWorkflowWorker worker) => _worker = worker;
public void Run(Guid jobId, ITask<string> task, Dictionary<string,object> parameters, string hostGroup, string machineName)
{
var hostId = $"{hostGroup}/{machineName}";
// Optional: ping for availability (blocks up to ~5s)
if (!_worker.IsHostAvailable(hostId))
throw new InvalidOperationException($"Host {hostId} is not available.");
// Execute (task must be Workflow or CodeWorkflow)
_worker.Execute(jobId, task, parameters, hostId);
}
public void Cancel(Guid jobId, string hostId) => _worker.Cancel(jobId, hostId);
public void Timeout(Guid jobId, string hostId) => _worker.Timeout(jobId, hostId);
}
Task types supported¶
Workflow— send an existing serialized workflow (workflow.Definitionmust be non-empty)CodeWorkflow— generates a definition from code (codeWorkflow.ToDefinition())
If you pass anything else, Execute throws.
Events you can observe¶
_worker.Executing += (_, e) => { var (jobId, msg) = e.Value; /* log */ };
_worker.Executed += (_, e) => { var (jobId, status, msg) = e.Value; /* mark done */ };
_worker.Cancelling += (_, e) => { /* job cancel requested */ };
_worker.Cancelled += (_, e) => { var (jobId, msg) = e.Value; };
_worker.ProgressChanged += (_, e) => { var (jobId, prog) = e.Value; };
_worker.HostNotAvailable+= (_, e) => { var jobId = e.Value; /* reschedule */ };
Host registry & caches¶
Host registry (ISignalRHostCollection)¶
Backed by an in-memory ConcurrentDictionary<string, ConcurrentBag<Host>> keyed by group.
Read methods (write operations throw; hosts are added/removed by hub connect/disconnect):
// get all known hosts (deduped per connection id)
IEnumerable<Host> all = hostCollection.GetAll();
IEnumerable<Host> prod = hostCollection.GetGroupMembers("prod");
bool exists = hostCollection.Contains(hostId);
int count = hostCollection.Count();
Duplicate handling: reconnects may momentarily produce duplicates; reads are deduped by Id.
Availability cache (AvailableCache)¶
SignalRWorkflowWorker.IsHostAvailable(hostId) sends OnAvailable to the client and waits up to ~5s for the client to call hub.AvailableResponse(bool). Results are stored as:
// key = hostId, value = (available, lastSeenUtc)
ConcurrentDictionary<string,(bool,DateTime)> AvailableCache
You can inject and inspect it yourself if needed.
Report cache (ReportCache)¶
When the server invokes OnReport on a client, the client should respond with a Dictionary<string,object> via hub.ReportResponse(...). The latest per-host payload is stored:
// key = hostId, value = (reportDict, lastSeenUtc)
ConcurrentDictionary<string,(Dictionary<string,object>, DateTime)> ReportCache
Use the included DictionaryStringObjectJsonConverter if you need robust Dictionary<string,object> (de)serialization in your own controllers/services.
The hub surface (WorkerHub)¶
Server-side methods the client calls:
AvailableResponse(bool available)— writes toAvailableCacheReportResponse(Dictionary<string,object> report)— writes toReportCache
Connection lifecycle:
OnConnectedAsync- logs
- builds a
Hostfrom claims and adds it to the SignalRHostRepository - adds connection to the SignalR group named by
HostGroup
OnDisconnectedAsync- removes the member from the repository (with reconnect/duplicate safety)
Implementing the client (IWorkerClient)¶
Your workflow host app connects to /workerHub and implements these handlers:
public class WorkflowClient : IWorkerClient
{
public void OnRunJob(WorkflowDto dto, string definition)
{
// start the workflow and stream progress back via server-invoked callbacks
// e.g., hubConnection.SendAsync("ProgressChanged", ...) if you add such calls
}
public Task OnAvailable()
{
// compute availability and tell the hub
return hubConnection.SendAsync("AvailableResponse", available: true);
}
public void OnReport()
{
var payload = new Dictionary<string,object>
{
["cpu"] = 0.42,
["memMB"] = 512,
["running"] = 1
};
hubConnection.SendAsync("ReportResponse", payload);
}
}
Make sure the client authenticates and includes the required claims. Its connection
UserIdentifiermust match the server’sMachineNameUserIdProvider(HostGroup+MachineName).
Querying hosts by group (readonly service)¶
If you prefer a service abstraction instead of the raw repository:
var hostService = new SignalRHostService(hostCollection, new[] { "prod", "dev", "qa" });
IEnumerable<Host> prodHosts = hostService.GetByGroup("prod");
bool knownGroup = hostService.GroupExists("prod");
Host one = hostService.Get(hostId);
int total = hostService.Count();
All mutation methods throw by design (the hub owns membership).
Operational notes & best practices¶
- Backplane: if you scale the Hub across instances, set up a SignalR backplane (e.g., Redis). The provided host/caches are in-memory per node—plan your topology accordingly.
- Targeting hosts: Always construct
hostIdusing the same rule the server uses:"{HostGroup}/{MachineName}". - Timeouts:
IsHostAvailablewaits ~5s pollingAvailableCache. If your network is slow, consider calling it sparingly and caching results at your layer. - Duplicates & reconnects: The repository dedupes on read; transient duplicates are tolerated during reconnect.
- Security: The hub is
[Authorize]. Keep tokens/claims strict; prefer short-lived access tokens for hosts. - Limits: Use
RunningJobsLimitandPriorityclaims to inform your scheduling layer (they’re stored onHost).
Minimal end-to-end checklist¶
- Host an ASP.NET Core app with
AddSignalR()andMapHub<WorkerHub>("/workerHub"). - Call
services.Adds(services, logger)to register all components. - Ensure your auth issues
HostGroup,MachineName(and optionalPriority,RunningJobsLimit) claims to workflow hosts. - In your Jobs component, inject
SignalRWorkflowWorkerand call:.IsHostAvailable(hostId)(optional).Execute(jobId, workflowOrCodeWorkflow, parameters, hostId).Cancel(jobId, hostId)/.Timeout(jobId, hostId)as needed.
- On the workflow host, implement
IWorkerClienthandlers and call backAvailableResponse/ReportResponse.