DHI.Services.Jobs — Internal Developer Guide (Core)¶
This guide explains the Jobs module in DHI.Services.Jobs: what it is, the core types, how jobs/tasks/workers/hosts fit together, status/timeout/heartbeat rules, and how to wire it up using the built-in JSON repositories. (Any custom/DB/cloud providers can be added via the connection helpers.)
What the Jobs module does¶
At a glance:
- Define tasks (units of work) as
ICodeWorkflowclasses discoverable at runtime. - Queue & track jobs against those tasks with timestamps, progress, and status.
- Execute jobs using a pluggable worker (local or remote).
- Balance load across hosts (optionally grouped) with capacity & priority.
- Enforce timeouts & heartbeats and auto-clean long-running / stuck jobs.
- Scenarios: store & query scenario runs that link to the last job.
Core layers follow the Domain Services pattern (Repositories ↔ Services ↔ Orchestration).
Key concepts¶
1) Job lifecycle¶
Job<TJobId,TTaskId> (default: Job<Guid,string>) carries:
- Identity & routing:
TaskId, optionalAccountId,HostId,HostGroup,Tag,Priority. - Status & time:
Status(Pending,Starting,InProgress,Completed,Error,Cancel,Cancelling,Cancelled,TimingOut,TimedOut,Unknown),Requested,Starting,Started,Finished,Heartbeat. - Progress & messages:
Progress(0–100),StatusMessage,Parameters(string→object).
Status rules enforced by JobService:
Cancelis only valid fromPendingorInProgress(otherwise throws).UpdateStatusauto-sets timestamps (Started,Finished) appropriately.UpdateHeartbeat(jobId)stampsHeartbeat=UtcNow.
2) Tasks (Code Workflows)¶
A task is anything implementing ICodeWorkflow. Publish metadata via:
CodeWorkflow(immutable task record):Id(usually the CLR type full name),Name,AssemblyName, optionalTimeoutandHostGroup, and aParametersdictionary.- Attributes you can put on your workflow class:
[WorkflowName("...")]– friendly display name.[HostGroup("...")]– default host group to run on.[Timeout("hh:mm:ss")]– default max duration.[WorkflowParameter]– mark public properties to surface inParameters.
CodeWorkflowService.ImportFrom(Assembly, allowReplace) scans an assembly for ICodeWorkflow types, reads attributes/parameters, and writes them into the CodeWorkflowRepository.
3) Workers & Hosts¶
- Worker (
IWorker<Guid,TTaskId>): the executor. You bring a concrete worker (local or implementingIRemoteWorkerwhen jobs run on remote hosts). It raises events (Executing/Executed/Cancelling/Cancelled/ProgressChanged). - Host (
Host): execution slot withRunningJobsLimit,Priority, optional cloud handler viaCloudInstanceHandlerType+CloudInstanceParameters. Hosts can be grouped (folders) to segment capacity. - Load balancer (
ILoadBalancer):LoadBalancer: chooses any host with spare capacity and available remote endpoint.RoundRobinLoadBalancer: concurrently probes availability and picks least-recently assigned/available host; can start/await cloud hosts.
4) Scenarios¶
Scenario is a light record of a run configuration or input. ScenarioService can return ScenarioInfo that enriches with the last job status/progress (if a job repo is provided). You can soft-remove by stamping Deleted=UtcNow. You can also filter a scenario’s JSON Data using simple JSONPath-like selectors.
Core types overview¶
1) Entities (selected)¶
// Jobs
public class Job<TJobId,TTaskId> : BaseEntity<TJobId> { /* Status, timestamps, progress, params... */ }
public enum JobStatus { Pending, Starting, InProgress, Completed, Error, Unknown, Cancel,
Cancelling, Cancelled, TimingOut, TimedOut }
// Tasks (code workflows)
public interface ICodeWorkflow : ITask<string> { void Run(); }
public class CodeWorkflow : BaseNamedEntity<string>, ITask<string> { /* Name, Timeout, HostGroup, Parameters */ }
// Hosts
public class Host : BaseGroupedEntity<string> {
public int RunningJobsLimit { get; set; } = 10;
public int Priority { get; set; } = 1;
public string CloudInstanceHandlerType { get; set; } // reflection-created handler
public Parameters CloudInstanceParameters { get; } // constructor arg bag
}
2) Repositories (JSON-backed defaults)¶
JobRepository<TJobId,TTaskId>– JSON file; always order by Requested desc. SupportsQuery<>filters,GetLast,Remove(Query), andUpdateField(jobId, fieldName, value).CodeWorkflowRepository– immutable JSON; good for publishing task catalog from assemblies.HostRepository/GroupedHostRepository– JSON (grouped supports folder semantics).ScenarioRepository– JSON; supportsGet(DateTime from,to)andGet(Query<Scenario>).
File-based repos expect the file to exist (constructor throws if missing) except where noted.
3) Services¶
JobService<TTask,TTaskId>– validation (account & task), safe status transitions, heartbeats, time helpers, and bulk deletion (Removewith filter) plus events:Added,Updated,Deleted, andDeletedMultiple(with filter payload).
TaskService<TTask,TId>– updatable catalog of tasks.HostService/GroupedHostService– duplicate checks on Add/Update; proxiesCreateHost/AdjustJobCapacityif the repo implements them (default JSON repo throws NotSupported).ScenarioService– enrichesScenario→ScenarioInfowith last job status/progress via job repo;TrySoftRemove(id)setsDeleted.JobWorker<TTask,TTaskId>– orchestrator tying services + worker + load balancer together. Includes housekeeping:ExecutePending(),Cancel(),CleanLongRunningJobs(),CleanNotStartedJobs(),MonitorInProgressHeartbeat(),MonitorTimeouts().
Default thresholds in JobWorker
- Start timeout: 2 minutes
- Workflow timeout (if none on task): 24 hours
- Heartbeat threshold: 15 seconds
- Max Age (retention): 30 days (you clean via your own policy using
Remove)
Typical tasks (with code)¶
1) Install packages¶
Add from NuGet:
DHI.Services
DHI.Services.Jobs
DHI.Services.Converters
2) Publish tasks from an assembly¶
var taskRepoPath = Path.Combine(appData, "tasks.json");
File.WriteAllText(taskRepoPath, "{}"); // ensure file exists
var taskRepo = new CodeWorkflowRepository(taskRepoPath);
var taskSvc = new CodeWorkflowService(taskRepo);
// Scan an assembly that contains your ICodeWorkflow implementations
taskSvc.ImportFrom(typeof(MyWorkflows.FooWorkflow).Assembly, allowReplace: true);
3) Create a job store & submit jobs¶
var jobsPath = Path.Combine(appData, "jobs.json");
File.WriteAllText(jobsPath, "{}");
var jobRepo = new JobRepository<Guid,string>(jobsPath);
var jobSvc = new JobService<CodeWorkflow,string>(jobRepo, taskSvc);
// Submit a job against a known task id
var job = new Job<Guid,string>(Guid.NewGuid(), taskId: typeof(MyWorkflows.FooWorkflow).FullName) {
AccountId = "acme",
Tag = "nightly",
Parameters = { ["RunId"] = Guid.NewGuid().ToString() }
};
jobSvc.Add(job);
4) Define hosts (optional, for remote execution & load balancing)¶
var hostsPath = Path.Combine(appData, "hosts.json");
File.WriteAllText(hostsPath, "{}");
var hostRepo = new HostRepository(hostsPath);
var hostSvc = new HostService(hostRepo);
hostSvc.Add(new Host(id: "host-a", name: "Host A") { RunningJobsLimit = 2, Priority = 1 });
For grouped hosts (e.g., “prod/eu/host-1”), use GroupedHostRepository + GroupedHostService.
5) Run a worker¶
You need a concrete IWorker<Guid,string>. For a remote worker, implement IRemoteWorker<Guid,string> and IsHostAvailable(hostId) plus remote Execute/Cancel/Timeout.
IWorker<Guid,string> worker = new MyRemoteWorker(logger); // or local worker
var lb = new RoundRobinLoadBalancer<CodeWorkflow,string>("worker-1", worker,
(JobService<CodeWorkflow,string>)jobSvc, hostSvc, logger);
var jw = new JobWorker<CodeWorkflow,string>(
id: "worker-1",
worker: worker,
taskService: taskSvc,
jobService: (JobService<CodeWorkflow,string>)jobSvc,
hostService: hostSvc,
loadBalancer: lb,
heartbeatTimeout: TimeSpan.FromSeconds(15),
logger: logger);
// Simple loop (e.g., hosted service)
jw.ExecutePending();
jw.MonitorInProgressHeartbeat();
jw.MonitorTimeouts();
jw.CleanNotStartedJobs();
jw.CleanLongRunningJobs();
From your executors, send heartbeats & progress:
jobSvc.UpdateHeartbeat(jobId);
jobSvc.UpdateStatus(jobId, JobStatus.InProgress, "Step 2/5", progress: 40);
Cancel a job (from an API/UI):
var j = jobSvc.Get(jobId);
j.Status = JobStatus.Cancel;
jobSvc.Update(j); // or jobSvc.UpdateStatus(jobId, JobStatus.Cancel)
6) Scenarios: record + show last run¶
var scenPath = Path.Combine(appData, "scenarios.json");
File.WriteAllText(scenPath, "{}");
var scenRepo = new ScenarioRepository(scenPath);
var scenSvc = new ScenarioService(scenRepo, jobRepo);
// Save a scenario that points to a last job
scenSvc.Add(new Scenario("acme/run-42") {
DateTime = DateTime.UtcNow,
LastJobId = job.Id,
Data = JsonSerializer.Serialize(new { project="HarborX", variant="A" })
});
// List with last job status/progress, selecting part of Data
var infos = scenSvc.Get(DateTime.UtcNow.AddDays(-7), DateTime.UtcNow, dataSelectors: new[] { "$.project" });
Connections (discover providers & compose without recompiling)¶
Use the connection helpers to instantiate services/providers by type name + connection string (handy for admin UIs/plugins):
JobServiceConnection<TTask,TTaskId>– builds aJobServicefrom JobRepositoryType/ConnectionString and a TaskRepositoryType/ConnectionString.TaskServiceConnection<TTask,TId>– builds aTaskService.HostServiceConnection/GroupedHostServiceConnection– builds host services.ScenarioServiceConnection<TTask,TTaskId>– buildsScenarioService; optionally wires a jobs repo to enrichScenarioInfo.
These helpers also expose CreateConnectionType<...>(path?) to discover provider types (reflection) for tooling.
Operational behaviors & housekeeping¶
- ExecutePending() orders pending jobs by
PrioritythenRequested. If a task doesn’t exist → job becomesError. If using hosts:- For grouped hosts, the job’s
HostGroup(or the task’s default) must exist. - Load balancer picks a host with spare
RunningJobsLimitand where the remote is reachable. Cloud hosts can be started on demand. - Job transitions to
Starting(setsStarting=UtcNow) before the worker raisesExecuting.
- For grouped hosts, the job’s
- Heartbeats:
MonitorInProgressHeartbeat()marks jobs Error if no heartbeat (orStarted/Starting) within the threshold. - Timeouts:
MonitorTimeouts()applies task-specific timeout (Timeoutor job’sWorkflowTimeoutparam) → sets TimedOut and callsworker.Timeout. - Long running:
CleanLongRunningJobs()cancels jobs exceeding the default (24h) or task timeout and sets Error.
Common pitfalls¶
- JSON files must exist for
JobRepository,HostRepository,GroupedHostRepository,ScenarioRepository,CodeWorkflowRepository. Create{}first. - Cancel only from
Pending/InProgress; other transitions will throw. - Progress must be within 0–100 (
Progressstruct guards it). - Grouped hosts require a host group: use job
HostGroupor the task’s default; or configure the load balancer with a default host group. - CloudInstanceHandler is constructed by full type name in
Host.CloudInstanceHandlerType; ensure the type is loadable and its constructor acceptsParameters. - Parameters validation: when adding/updating a job, if a
TaskServiceis provided, parameter keys must exist in the task’s publishedParametersset (else aKeyNotFoundExceptionis thrown). - Obsolete types:
Workflow(XAML),Activityand friends are legacy. Prefer CodeWorkflow +ICodeWorkflow.WorkflowLoggeris also obsolete—use a normal logging framework.
Quick reference¶
| Need… | Use… | Notes |
|---|---|---|
| Submit job + track status/progress | JobService + JobRepository |
UpdateStatus, UpdateHeartbeat |
| Discover tasks from assemblies | CodeWorkflowService.ImportFrom(asm) |
Reads attributes & parameter types |
| Execute jobs (local/remote) | JobWorker + your IWorker |
Wire a LoadBalancer if using hosts |
| Segment capacity | GroupedHostService + GroupedHostRepository |
GetByGroup, host Priority, RunningJobsLimit |
| Auto-clean & health checks | JobWorker.Clean*/Monitor* methods |
Call on a schedule/background service |
| Find last run in scenarios | ScenarioService.Get(...)/Get(id) |
Enriches with LastJobStatus/Progress |
| Compose without recompiling | *ServiceConnection helpers |
Provide repository type names + connection strings |
Minimal wiring (Program.cs-style)¶
// 1) Repos (ensure files exist)
var jobsPath = Path.Combine(AppContext.BaseDirectory, "App_Data", "jobs.json");
var tasksPath= Path.Combine(AppContext.BaseDirectory, "App_Data", "tasks.json");
var hostsPath= Path.Combine(AppContext.BaseDirectory, "App_Data", "hosts.json");
Directory.CreateDirectory(Path.GetDirectoryName(jobsPath));
File.WriteAllText(jobsPath, "{}");
File.WriteAllText(tasksPath, "{}");
File.WriteAllText(hostsPath, "{}");
// 2) Services
var jobRepo = new JobRepository(jobsPath);
var taskRepo= new CodeWorkflowRepository(tasksPath);
var hostRepo= new HostRepository(hostsPath);
var taskSvc = new CodeWorkflowService(taskRepo);
taskSvc.ImportFrom(typeof(MyWorkflows.FooWorkflow).Assembly, allowReplace:true);
var jobSvc = new JobService(jobRepo, taskSvc);
var hostSvc = new HostService(hostRepo);
// 3) Worker + LB
IWorker<Guid,string> worker = new MyRemoteWorker(/* logger */);
var lb = new RoundRobinLoadBalancer("worker-1", worker, (JobService<CodeWorkflow,string>)jobSvc, hostSvc);
var jw = new JobWorker("worker-1", worker, taskSvc, (JobService<CodeWorkflow,string>)jobSvc, hostSvc, lb);
// 4) Background loop (hosted service / cron)
jw.ExecutePending();
jw.MonitorInProgressHeartbeat();
jw.MonitorTimeouts();
See also / migration notes¶
- Prefer CodeWorkflow over legacy
Workflow(XAML). LegacyWorkflow*types are marked[Obsolete]and will be removed. - When building custom providers for jobs/hosts/scenarios, implement the corresponding repository interfaces (
IJobRepository<,>,IHostRepository,IScenarioRepository) and wire them through the*ServiceConnectionhelpers so apps/UIs can discover them at runtime.