Skip to content

DHI.Services.Jobs — Internal Developer Guide (Core)

This guide explains the Jobs module in DHI.Services.Jobs: what it is, the core types, how jobs/tasks/workers/hosts fit together, status/timeout/heartbeat rules, and how to wire it up using the built-in JSON repositories. (Any custom/DB/cloud providers can be added via the connection helpers.)


What the Jobs module does

At a glance:

  • Define tasks (units of work) as ICodeWorkflow classes discoverable at runtime.
  • Queue & track jobs against those tasks with timestamps, progress, and status.
  • Execute jobs using a pluggable worker (local or remote).
  • Balance load across hosts (optionally grouped) with capacity & priority.
  • Enforce timeouts & heartbeats and auto-clean long-running / stuck jobs.
  • Scenarios: store & query scenario runs that link to the last job.

Core layers follow the Domain Services pattern (Repositories ↔ Services ↔ Orchestration).


Key concepts

1) Job lifecycle

Job<TJobId,TTaskId> (default: Job<Guid,string>) carries:

  • Identity & routing: TaskId, optional AccountId, HostId, HostGroup, Tag, Priority.
  • Status & time: Status (Pending, Starting, InProgress, Completed, Error, Cancel, Cancelling, Cancelled, TimingOut, TimedOut, Unknown), Requested, Starting, Started, Finished, Heartbeat.
  • Progress & messages: Progress (0–100), StatusMessage, Parameters (string→object).

Status rules enforced by JobService:

  • Cancel is only valid from Pending or InProgress (otherwise throws).
  • UpdateStatus auto-sets timestamps (Started, Finished) appropriately.
  • UpdateHeartbeat(jobId) stamps Heartbeat=UtcNow.

2) Tasks (Code Workflows)

A task is anything implementing ICodeWorkflow. Publish metadata via:

  • CodeWorkflow (immutable task record): Id (usually the CLR type full name), Name, AssemblyName, optional Timeout and HostGroup, and a Parameters dictionary.
  • Attributes you can put on your workflow class:
    • [WorkflowName("...")] – friendly display name.
    • [HostGroup("...")] – default host group to run on.
    • [Timeout("hh:mm:ss")] – default max duration.
    • [WorkflowParameter] – mark public properties to surface in Parameters.

CodeWorkflowService.ImportFrom(Assembly, allowReplace) scans an assembly for ICodeWorkflow types, reads attributes/parameters, and writes them into the CodeWorkflowRepository.

3) Workers & Hosts

  • Worker (IWorker<Guid,TTaskId>): the executor. You bring a concrete worker (local or implementing IRemoteWorker when jobs run on remote hosts). It raises events (Executing/Executed/Cancelling/Cancelled/ProgressChanged).
  • Host (Host): execution slot with RunningJobsLimit, Priority, optional cloud handler via CloudInstanceHandlerType + CloudInstanceParameters. Hosts can be grouped (folders) to segment capacity.
  • Load balancer (ILoadBalancer):
    • LoadBalancer: chooses any host with spare capacity and available remote endpoint.
    • RoundRobinLoadBalancer: concurrently probes availability and picks least-recently assigned/available host; can start/await cloud hosts.

4) Scenarios

Scenario is a light record of a run configuration or input. ScenarioService can return ScenarioInfo that enriches with the last job status/progress (if a job repo is provided). You can soft-remove by stamping Deleted=UtcNow. You can also filter a scenario’s JSON Data using simple JSONPath-like selectors.


Core types overview

1) Entities (selected)

// Jobs
public class Job<TJobId,TTaskId> : BaseEntity<TJobId> { /* Status, timestamps, progress, params... */ }
public enum JobStatus { Pending, Starting, InProgress, Completed, Error, Unknown, Cancel,
                        Cancelling, Cancelled, TimingOut, TimedOut }

// Tasks (code workflows)
public interface ICodeWorkflow : ITask<string> { void Run(); }
public class CodeWorkflow : BaseNamedEntity<string>, ITask<string> { /* Name, Timeout, HostGroup, Parameters */ }

// Hosts
public class Host : BaseGroupedEntity<string> {
  public int RunningJobsLimit { get; set; } = 10;
  public int Priority { get; set; } = 1;
  public string CloudInstanceHandlerType { get; set; } // reflection-created handler
  public Parameters CloudInstanceParameters { get; }    // constructor arg bag
}

2) Repositories (JSON-backed defaults)

  • JobRepository<TJobId,TTaskId> – JSON file; always order by Requested desc. Supports Query<> filters, GetLast, Remove(Query), and UpdateField(jobId, fieldName, value).
  • CodeWorkflowRepository – immutable JSON; good for publishing task catalog from assemblies.
  • HostRepository / GroupedHostRepository – JSON (grouped supports folder semantics).
  • ScenarioRepository – JSON; supports Get(DateTime from,to) and Get(Query<Scenario>).

File-based repos expect the file to exist (constructor throws if missing) except where noted.

3) Services

  • JobService<TTask,TTaskId> – validation (account & task), safe status transitions, heartbeats, time helpers, and bulk deletion (Remove with filter) plus events:
    • Added, Updated, Deleted, and DeletedMultiple (with filter payload).
  • TaskService<TTask,TId> – updatable catalog of tasks.
  • HostService / GroupedHostService – duplicate checks on Add/Update; proxies CreateHost / AdjustJobCapacity if the repo implements them (default JSON repo throws NotSupported).
  • ScenarioService – enriches ScenarioScenarioInfo with last job status/progress via job repo; TrySoftRemove(id) sets Deleted.
  • JobWorker<TTask,TTaskId>orchestrator tying services + worker + load balancer together. Includes housekeeping:
    • ExecutePending(), Cancel(), CleanLongRunningJobs(), CleanNotStartedJobs(), MonitorInProgressHeartbeat(), MonitorTimeouts().

Default thresholds in JobWorker

  • Start timeout: 2 minutes
  • Workflow timeout (if none on task): 24 hours
  • Heartbeat threshold: 15 seconds
  • Max Age (retention): 30 days (you clean via your own policy using Remove)

Typical tasks (with code)

1) Install packages

Add from NuGet:

DHI.Services
DHI.Services.Jobs
DHI.Services.Converters

2) Publish tasks from an assembly

var taskRepoPath = Path.Combine(appData, "tasks.json");
File.WriteAllText(taskRepoPath, "{}"); // ensure file exists

var taskRepo = new CodeWorkflowRepository(taskRepoPath);
var taskSvc  = new CodeWorkflowService(taskRepo);

// Scan an assembly that contains your ICodeWorkflow implementations
taskSvc.ImportFrom(typeof(MyWorkflows.FooWorkflow).Assembly, allowReplace: true);

3) Create a job store & submit jobs

var jobsPath = Path.Combine(appData, "jobs.json");
File.WriteAllText(jobsPath, "{}");

var jobRepo = new JobRepository<Guid,string>(jobsPath);
var jobSvc  = new JobService<CodeWorkflow,string>(jobRepo, taskSvc);

// Submit a job against a known task id
var job = new Job<Guid,string>(Guid.NewGuid(), taskId: typeof(MyWorkflows.FooWorkflow).FullName) {
  AccountId = "acme",
  Tag = "nightly",
  Parameters = { ["RunId"] = Guid.NewGuid().ToString() }
};
jobSvc.Add(job);

4) Define hosts (optional, for remote execution & load balancing)

var hostsPath = Path.Combine(appData, "hosts.json");
File.WriteAllText(hostsPath, "{}");

var hostRepo = new HostRepository(hostsPath);
var hostSvc  = new HostService(hostRepo);

hostSvc.Add(new Host(id: "host-a", name: "Host A") { RunningJobsLimit = 2, Priority = 1 });

For grouped hosts (e.g., “prod/eu/host-1”), use GroupedHostRepository + GroupedHostService.

5) Run a worker

You need a concrete IWorker<Guid,string>. For a remote worker, implement IRemoteWorker<Guid,string> and IsHostAvailable(hostId) plus remote Execute/Cancel/Timeout.

IWorker<Guid,string> worker = new MyRemoteWorker(logger); // or local worker
var lb = new RoundRobinLoadBalancer<CodeWorkflow,string>("worker-1", worker,
          (JobService<CodeWorkflow,string>)jobSvc, hostSvc, logger);

var jw = new JobWorker<CodeWorkflow,string>(
  id: "worker-1",
  worker: worker,
  taskService: taskSvc,
  jobService: (JobService<CodeWorkflow,string>)jobSvc,
  hostService: hostSvc,
  loadBalancer: lb,
  heartbeatTimeout: TimeSpan.FromSeconds(15),
  logger: logger);

// Simple loop (e.g., hosted service)
jw.ExecutePending();
jw.MonitorInProgressHeartbeat();
jw.MonitorTimeouts();
jw.CleanNotStartedJobs();
jw.CleanLongRunningJobs();

From your executors, send heartbeats & progress:

jobSvc.UpdateHeartbeat(jobId);
jobSvc.UpdateStatus(jobId, JobStatus.InProgress, "Step 2/5", progress: 40);

Cancel a job (from an API/UI):

var j = jobSvc.Get(jobId);
j.Status = JobStatus.Cancel;
jobSvc.Update(j);     // or jobSvc.UpdateStatus(jobId, JobStatus.Cancel)

6) Scenarios: record + show last run

var scenPath = Path.Combine(appData, "scenarios.json");
File.WriteAllText(scenPath, "{}");
var scenRepo = new ScenarioRepository(scenPath);
var scenSvc  = new ScenarioService(scenRepo, jobRepo);

// Save a scenario that points to a last job
scenSvc.Add(new Scenario("acme/run-42") {
  DateTime = DateTime.UtcNow,
  LastJobId = job.Id,
  Data = JsonSerializer.Serialize(new { project="HarborX", variant="A" })
});

// List with last job status/progress, selecting part of Data
var infos = scenSvc.Get(DateTime.UtcNow.AddDays(-7), DateTime.UtcNow, dataSelectors: new[] { "$.project" });

Connections (discover providers & compose without recompiling)

Use the connection helpers to instantiate services/providers by type name + connection string (handy for admin UIs/plugins):

  • JobServiceConnection<TTask,TTaskId> – builds a JobService from JobRepositoryType/ConnectionString and a TaskRepositoryType/ConnectionString.
  • TaskServiceConnection<TTask,TId> – builds a TaskService.
  • HostServiceConnection / GroupedHostServiceConnection – builds host services.
  • ScenarioServiceConnection<TTask,TTaskId> – builds ScenarioService; optionally wires a jobs repo to enrich ScenarioInfo.

These helpers also expose CreateConnectionType<...>(path?) to discover provider types (reflection) for tooling.


Operational behaviors & housekeeping

  • ExecutePending() orders pending jobs by Priority then Requested. If a task doesn’t exist → job becomes Error. If using hosts:
    • For grouped hosts, the job’s HostGroup (or the task’s default) must exist.
    • Load balancer picks a host with spare RunningJobsLimit and where the remote is reachable. Cloud hosts can be started on demand.
    • Job transitions to Starting (sets Starting=UtcNow) before the worker raises Executing.
  • Heartbeats: MonitorInProgressHeartbeat() marks jobs Error if no heartbeat (or Started/Starting) within the threshold.
  • Timeouts: MonitorTimeouts() applies task-specific timeout (Timeout or job’s WorkflowTimeout param) → sets TimedOut and calls worker.Timeout.
  • Long running: CleanLongRunningJobs() cancels jobs exceeding the default (24h) or task timeout and sets Error.

Common pitfalls

  • JSON files must exist for JobRepository, HostRepository, GroupedHostRepository, ScenarioRepository, CodeWorkflowRepository. Create {} first.
  • Cancel only from Pending/InProgress; other transitions will throw.
  • Progress must be within 0–100 (Progress struct guards it).
  • Grouped hosts require a host group: use job HostGroup or the task’s default; or configure the load balancer with a default host group.
  • CloudInstanceHandler is constructed by full type name in Host.CloudInstanceHandlerType; ensure the type is loadable and its constructor accepts Parameters.
  • Parameters validation: when adding/updating a job, if a TaskService is provided, parameter keys must exist in the task’s published Parameters set (else a KeyNotFoundException is thrown).
  • Obsolete types: Workflow (XAML), Activity and friends are legacy. Prefer CodeWorkflow + ICodeWorkflow. WorkflowLogger is also obsolete—use a normal logging framework.

Quick reference

Need… Use… Notes
Submit job + track status/progress JobService + JobRepository UpdateStatus, UpdateHeartbeat
Discover tasks from assemblies CodeWorkflowService.ImportFrom(asm) Reads attributes & parameter types
Execute jobs (local/remote) JobWorker + your IWorker Wire a LoadBalancer if using hosts
Segment capacity GroupedHostService + GroupedHostRepository GetByGroup, host Priority, RunningJobsLimit
Auto-clean & health checks JobWorker.Clean*/Monitor* methods Call on a schedule/background service
Find last run in scenarios ScenarioService.Get(...)/Get(id) Enriches with LastJobStatus/Progress
Compose without recompiling *ServiceConnection helpers Provide repository type names + connection strings

Minimal wiring (Program.cs-style)

// 1) Repos (ensure files exist)
var jobsPath = Path.Combine(AppContext.BaseDirectory, "App_Data", "jobs.json");
var tasksPath= Path.Combine(AppContext.BaseDirectory, "App_Data", "tasks.json");
var hostsPath= Path.Combine(AppContext.BaseDirectory, "App_Data", "hosts.json");
Directory.CreateDirectory(Path.GetDirectoryName(jobsPath));
File.WriteAllText(jobsPath, "{}");
File.WriteAllText(tasksPath, "{}");
File.WriteAllText(hostsPath, "{}");

// 2) Services
var jobRepo = new JobRepository(jobsPath);
var taskRepo= new CodeWorkflowRepository(tasksPath);
var hostRepo= new HostRepository(hostsPath);

var taskSvc = new CodeWorkflowService(taskRepo);
taskSvc.ImportFrom(typeof(MyWorkflows.FooWorkflow).Assembly, allowReplace:true);

var jobSvc  = new JobService(jobRepo, taskSvc);
var hostSvc = new HostService(hostRepo);

// 3) Worker + LB
IWorker<Guid,string> worker = new MyRemoteWorker(/* logger */);
var lb = new RoundRobinLoadBalancer("worker-1", worker, (JobService<CodeWorkflow,string>)jobSvc, hostSvc);
var jw = new JobWorker("worker-1", worker, taskSvc, (JobService<CodeWorkflow,string>)jobSvc, hostSvc, lb);

// 4) Background loop (hosted service / cron)
jw.ExecutePending();
jw.MonitorInProgressHeartbeat();
jw.MonitorTimeouts();

See also / migration notes

  • Prefer CodeWorkflow over legacy Workflow (XAML). Legacy Workflow* types are marked [Obsolete] and will be removed.
  • When building custom providers for jobs/hosts/scenarios, implement the corresponding repository interfaces (IJobRepository<,>, IHostRepository, IScenarioRepository) and wire them through the *ServiceConnection helpers so apps/UIs can discover them at runtime.