DHI.Services.Jobs.Orchestrator — Internal Developer Guide¶
JobOrchestrator<TTaskId> coordinates one or more job workers via three timers (execute, heartbeat, timeout) and can emit scalar metrics about the fleet.
Use this in a Windows service, Linux systemd unit, Kubernetes worker, or as part of a .NET Generic Host.
What it does¶
On each timer tick (they run independently):
-
Execution
- For each
IJobWorker<TTaskId>(in parallel):CleanNotStartedJobs()ExecutePending()Cancel()
- Optionally writes metrics (see below)
- For each
-
Heartbeat
- For each worker:
MonitorInProgressHeartbeat()
- For each worker:
-
Timeouts
- For each worker:
MonitorTimeouts()
- For each worker:
All exceptions are logged; timers are stopped during work and restarted in finally to avoid overlapping executions.
Installing¶
dotnet add package DHI.Services.Jobs.Orchestrator
Constructing an orchestrator¶
using DHI.Services.Jobs.Orchestrator;
using Microsoft.Extensions.Logging;
// Your set of workers (implement IJobWorker<TTaskId>)
IEnumerable<IJobWorker<string>> workers = new[] { myWorker1, myWorker2 };
// Optional: metrics wiring
GroupedScalarService<string,int>? scalarService = /* from DI */;
IDictionary<string, IJobService<string>>? jobServicesByWorkerId = new Dictionary<string, IJobService<string>>
{
// Keys must match the *ids* your workers use in metrics grouping
["worker-A"] = myWorkerServiceA,
["worker-B"] = myWorkerServiceB
};
var orchestrator = new JobOrchestrator(
jobWorkers: workers,
logger: logger,
executionTimerInterval: TimeSpan.FromSeconds(5).TotalMilliseconds,
heartbeatTimerInterval: TimeSpan.FromSeconds(5).TotalMilliseconds,
timeoutTimerInterval: TimeSpan.FromSeconds(10).TotalMilliseconds,
scalarService: scalarService, // omit if not needed
jobServices: jobServicesByWorkerId // omit if not needed
);
orchestrator.Start();
// ...
orchestrator.Stop();
orchestrator.Dispose();
Recommended starting intervals
execution: 2–10sheartbeat: 2–10stimeout: 10–60s
Tune based on throughput and store latency.
Required worker surface¶
Your workers must implement (names taken from what the orchestrator calls):
void CleanNotStartedJobs()— tidy up jobs that never began (stuck in limbo)void ExecutePending()— pick up and start pending jobsvoid Cancel()— cancel jobs marked for cancellationvoid MonitorInProgressHeartbeat()— validate health/heartbeats of running jobs; mark stalled onesvoid MonitorTimeouts()— enforce max runtime; move to error/timeout
These are invoked in
Parallel.ForEachacross workers. Ensure your implementations are thread-safe and idempotent.
Metrics (optional)¶
If you provide GroupedScalarService<string,int> and a dictionary of IJobService<TTaskId> keyed by jobWorkerId, the orchestrator emits scalars under:
Job Orchestrator/{COMPUTERNAME}/{jobWorkerId}/<Metric Name>
Metrics per worker:
Jobs In Progress— count ofJobStatus.InProgressJobs Pending— count ofJobStatus.PendingJobs Completed Last 24 Hours— countAverage Execution Time (seconds) for Jobs Completed Last 24 Hours— average over completed jobs with bothStartedandFinishedsetJobs With Errors Last 24 Hours— count
Notes
- The group uses the environment variable
COMPUTERNAME. On Linux, set it (e.g.,COMPUTERNAME=$(hostname)) so metrics group neatly by host. - The orchestrator calls the job service with
since: DateTime.Now.AddDays(-1)to compute “last 24 hours”.
Hosting with the .NET Generic Host (example)¶
public sealed class OrchestratorHostedService : IHostedService, IDisposable
{
private readonly JobOrchestrator _orchestrator;
public OrchestratorHostedService(JobOrchestrator orchestrator)
{
_orchestrator = orchestrator;
}
public Task StartAsync(CancellationToken _) { _orchestrator.Start(); return Task.CompletedTask; }
public Task StopAsync (CancellationToken _) { _orchestrator.Stop(); return Task.CompletedTask; }
public void Dispose() => _orchestrator.Dispose();
}
// In Program.cs
builder.Services.AddSingleton(new JobOrchestrator(
workers, logger, 5000, 5000, 10000, scalarService, jobServices));
builder.Services.AddHostedService<OrchestratorHostedService>();
Operations & troubleshooting¶
- Is it running?
orchestrator.IsRunning()checks if the execution timer is enabled. - Are metrics active?
orchestrator.ScalarsEnabled()returns whether a scalar service was injected. - Timer overlap
The orchestrator stops a timer before work and restarts it in
finally. If your worker logic takes longer than the interval, that’s okay—ticks won’t overlap. - Exceptions inside workers
Exceptions bubble to the orchestrator’s
try/catchand are logged. Ensure your worker keeps working after an exception (handle per-job faults internally).
Design tips for workers¶
- Idempotency: re-running
ExecutePendingshould not double-start the same job. - Back-pressure: if your store supports it, claim jobs atomically.
- Heartbeats: write
lastHeartbeatfrom the runners; consider grace periods when validating. - Timeouts: store a
Startedtimestamp and checknow - Started > MaxRuntime. - Cancellation: a job flagged for cancel should promptly transition; clean up resources.
Summary¶
- Use Executer to enqueue jobs from scripts/CI with robust retries and token auth.
- Use Orchestrator to continuously run workers, watch health and timeouts, and optionally publish scalar metrics your dashboards can consume.