Skip to content

DHI.Services.Jobs.Orchestrator — Internal Developer Guide

JobOrchestrator<TTaskId> coordinates one or more job workers via three timers (execute, heartbeat, timeout) and can emit scalar metrics about the fleet.

Use this in a Windows service, Linux systemd unit, Kubernetes worker, or as part of a .NET Generic Host.


What it does

On each timer tick (they run independently):

  • Execution

    • For each IJobWorker<TTaskId> (in parallel):
      1. CleanNotStartedJobs()
      2. ExecutePending()
      3. Cancel()
    • Optionally writes metrics (see below)
  • Heartbeat

    • For each worker: MonitorInProgressHeartbeat()
  • Timeouts

    • For each worker: MonitorTimeouts()

All exceptions are logged; timers are stopped during work and restarted in finally to avoid overlapping executions.


Installing

dotnet add package DHI.Services.Jobs.Orchestrator

Constructing an orchestrator

using DHI.Services.Jobs.Orchestrator;
using Microsoft.Extensions.Logging;

// Your set of workers (implement IJobWorker<TTaskId>)
IEnumerable<IJobWorker<string>> workers = new[] { myWorker1, myWorker2 };

// Optional: metrics wiring
GroupedScalarService<string,int>? scalarService = /* from DI */;
IDictionary<string, IJobService<string>>? jobServicesByWorkerId = new Dictionary<string, IJobService<string>>
{
    // Keys must match the *ids* your workers use in metrics grouping
    ["worker-A"] = myWorkerServiceA,
    ["worker-B"] = myWorkerServiceB
};

var orchestrator = new JobOrchestrator(
    jobWorkers: workers,
    logger: logger,
    executionTimerInterval:   TimeSpan.FromSeconds(5).TotalMilliseconds,
    heartbeatTimerInterval:   TimeSpan.FromSeconds(5).TotalMilliseconds,
    timeoutTimerInterval:     TimeSpan.FromSeconds(10).TotalMilliseconds,
    scalarService:            scalarService,               // omit if not needed
    jobServices:              jobServicesByWorkerId        // omit if not needed
);

orchestrator.Start();
// ...
orchestrator.Stop();
orchestrator.Dispose();

Recommended starting intervals

  • execution: 2–10s
  • heartbeat: 2–10s
  • timeout: 10–60s

Tune based on throughput and store latency.


Required worker surface

Your workers must implement (names taken from what the orchestrator calls):

  • void CleanNotStartedJobs() — tidy up jobs that never began (stuck in limbo)
  • void ExecutePending() — pick up and start pending jobs
  • void Cancel() — cancel jobs marked for cancellation
  • void MonitorInProgressHeartbeat() — validate health/heartbeats of running jobs; mark stalled ones
  • void MonitorTimeouts() — enforce max runtime; move to error/timeout

These are invoked in Parallel.ForEach across workers. Ensure your implementations are thread-safe and idempotent.


Metrics (optional)

If you provide GroupedScalarService<string,int> and a dictionary of IJobService<TTaskId> keyed by jobWorkerId, the orchestrator emits scalars under:

Job Orchestrator/{COMPUTERNAME}/{jobWorkerId}/<Metric Name>

Metrics per worker:

  • Jobs In Progress — count of JobStatus.InProgress
  • Jobs Pending — count of JobStatus.Pending
  • Jobs Completed Last 24 Hours — count
  • Average Execution Time (seconds) for Jobs Completed Last 24 Hours — average over completed jobs with both Started and Finished set
  • Jobs With Errors Last 24 Hours — count

Notes

  • The group uses the environment variable COMPUTERNAME. On Linux, set it (e.g., COMPUTERNAME=$(hostname)) so metrics group neatly by host.
  • The orchestrator calls the job service with since: DateTime.Now.AddDays(-1) to compute “last 24 hours”.

Hosting with the .NET Generic Host (example)

public sealed class OrchestratorHostedService : IHostedService, IDisposable
{
    private readonly JobOrchestrator _orchestrator;

    public OrchestratorHostedService(JobOrchestrator orchestrator)
    {
        _orchestrator = orchestrator;
    }

    public Task StartAsync(CancellationToken _) { _orchestrator.Start(); return Task.CompletedTask; }
    public Task StopAsync (CancellationToken _) { _orchestrator.Stop();  return Task.CompletedTask; }
    public void Dispose() => _orchestrator.Dispose();
}

// In Program.cs
builder.Services.AddSingleton(new JobOrchestrator(
    workers, logger, 5000, 5000, 10000, scalarService, jobServices));
builder.Services.AddHostedService<OrchestratorHostedService>();

Operations & troubleshooting

  • Is it running? orchestrator.IsRunning() checks if the execution timer is enabled.
  • Are metrics active? orchestrator.ScalarsEnabled() returns whether a scalar service was injected.
  • Timer overlap The orchestrator stops a timer before work and restarts it in finally. If your worker logic takes longer than the interval, that’s okay—ticks won’t overlap.
  • Exceptions inside workers Exceptions bubble to the orchestrator’s try/catch and are logged. Ensure your worker keeps working after an exception (handle per-job faults internally).

Design tips for workers

  • Idempotency: re-running ExecutePending should not double-start the same job.
  • Back-pressure: if your store supports it, claim jobs atomically.
  • Heartbeats: write lastHeartbeat from the runners; consider grace periods when validating.
  • Timeouts: store a Started timestamp and check now - Started > MaxRuntime.
  • Cancellation: a job flagged for cancel should promptly transition; clean up resources.

Summary

  • Use Executer to enqueue jobs from scripts/CI with robust retries and token auth.
  • Use Orchestrator to continuously run workers, watch health and timeouts, and optionally publish scalar metrics your dashboards can consume.