DHI.Services.PostgreSQL for Jobs — Internal Developer Guide¶
This module gives you PostgreSQL-backed repositories for the DHI.Services.Jobs domain:
JobRepository→IJobRepository<Guid, string>ScenarioRepository→BaseScenarioRepository
They sit on top of the shared PostgreSQL base infrastructure:
BaseRepository(table auto-creation, connection string parsing, JSON converters, DateTime handling, etc.).- Npgsql helpers (
TableExists,AddParameter, JSONB condition builders, …).
For the shared concepts (BaseRepository, DateTime handling, JSON converters, Table= override, etc.), see:
PostgreSQL Providers
This document focuses specifically on Jobs and Scenarios.
When to use these providers¶
Use DHI.Services.Provider.PostgreSQL for Jobs & Scenarios when you:
- Want to store job queue + job history in PostgreSQL.
- Want to store job scenarios (arbitrary JSON + metadata) in PostgreSQL.
- Are building
DHI.Services.Jobs.WebApiservices that should talk directly to PostgreSQL, not via MCLite or JSON file storage.
Typical configuration (from your connections.json):
{
"postgres-job": {
"$type": "DHI.Services.Jobs.WebApi.JobServiceConnection, DHI.Services.Jobs.WebApi",
"JobRepositoryConnectionString": "Server=localhost;Port=5432;Database=ProviderTest;User Id=postgres;Password=Solutions!",
"JobRepositoryType": "DHI.Services.Provider.PostgreSQL.JobRepository, DHI.Services.Provider.PostgreSQL",
"TaskRepositoryConnectionString": "[AppData]workflows.json",
"TaskRepositoryType": "DHI.Services.Jobs.Workflows.WorkflowRepository, DHI.Services.Jobs",
"Name": "PostgreSQL job service connection to workspace1",
"Id": "postgres-job"
},
"postgres-scenarios": {
"$type": "DHI.Services.Jobs.WebApi.ScenarioServiceConnection, DHI.Services.Jobs.WebApi",
"ConnectionString": "Server=localhost;Port=5432;Database=ProviderTest;User Id=postgres;Password=Solutions!",
"RepositoryType": "DHI.Services.Provider.PostgreSQL.ScenarioRepository, DHI.Services.Provider.PostgreSQL",
"JobRepositoryConnectionString": "jobs.json",
"JobRepositoryType": "DHI.Services.Jobs.WebApi.JobRepository, DHI.Services.Jobs.WebApi",
"Name": "Scenario service connection",
"Id": "postgres-scenarios"
}
}
So:
- Jobs: PostgreSQL-backed
JobRepository+ JSON file–based task/workflow repository. - Scenarios: PostgreSQL-backed
ScenarioRepository+ a (separate) JobService connection (here via a JSON-backed job repo).
Shared PostgreSQL base behaviour (quick reminder)¶
Both JobRepository and ScenarioRepository rely on the PostgreSQL base helpers described in
PostgreSQL Providers
Key points you should remember:
- Tables are self-provisioned:
- If table doesn’t exist, repo calls
CreateDataModel(...)to create it and add indexes. - Later migrations are done via
ALTER TABLE ...helpers (UpdateDataModel_x_y_z).
- If table doesn’t exist, repo calls
- Connection string can include
Table=...to override the default:- e.g.
"...;Table=jobs.jobs_pg"→ table name becomesjobs.jobs_pg.
- e.g.
- JSON serialization/deserialization uses a standard set of converters:
- enums as strings,
- dictionary type resolution,
- “infer JSON primitive types” on deserialization.
- DateTime handling is normalized for Npgsql 8 (see the main doc).
You don’t need to reimplement any of that for Jobs/Scenarios – it’s already baked into BaseRepository and the extension methods.
JobRepository (PostgreSQL jobs)¶
Namespace: DHI.Services.Provider.PostgreSQL
Implements: IJobRepository<Guid, string>
Base: BaseRepository (not BaseRepository<TEntity> – jobs are GUID-id, not string-id).
1. Data model & migrations¶
Default table: public.jobs (or whatever you override via Table=).
On construction:
public JobRepository(string connectionString, ILogger logger = null, IEnumerable<JsonConverter> converters = null)
: base("public.Jobs", connectionString, logger, converters)
{
TryRunAction(() =>
{
UpdateDataModel_3_7_0(ConnectionString);
UpdateDataModel_3_8_0(ConnectionString);
UpdateDataModel_3_12_0(ConnectionString);
UpdateDataModel_6_8_0(ConnectionString);
UpdateDataModel_7_0_0(ConnectionString);
UpdateDataModel_8_0_0(ConnectionString);
});
}
BaseRepository will:
- Ensure the table exists via
AssureTableExists()→CreateDataModel(connectionString)once. - Apply schema migrations idempotently (each
UpdateDataModel_x_y_zis safe to run multiple times).
CreateDataModel creates:
CREATE TABLE IF NOT EXISTS public.jobs(
id uuid PRIMARY KEY,
accountid varchar(255),
taskid varchar(255),
hostid varchar(255),
tag varchar(255),
progress integer,
status varchar(255),
statusmessage varchar(255),
requested timestamp without time zone,
started timestamp without time zone,
finished timestamp without time zone,
parameters varchar(2048),
metadata varchar(2048),
hostgroup varchar(255),
priority integer,
rejected timestamp without time zone,
starting timestamp without time zone,
heartbeat timestamp without time zone
);
CREATE INDEX IF NOT EXISTS jobs_index ON public.jobs USING btree (requested DESC);
Schema-update helpers do:
UpdateDataModel_3_7_0:ADD COLUMN hostgroup varchar(255)UpdateDataModel_3_8_0:ADD COLUMN priority integerUpdateDataModel_3_12_0:ALTER COLUMN parameters TYPE JSONB USING parameters::JSONBUpdateDataModel_6_8_0:ADD COLUMN rejected timestampUpdateDataModel_7_0_0:ADD COLUMN starting timestampUpdateDataModel_8_0_0:ADD COLUMN heartbeat timestamp
So on a fresh DB you end up with:
parametersas JSONB (good for querying).- Additional job lifecycle timestamps and fields.
2. Repository API¶
Implements the full IJobRepository<Guid,string>:
Maybe<Job<Guid,string>> Get(Guid id);
bool Contains(Guid id);
IEnumerable<Job<Guid,string>> GetAll();
IEnumerable<Guid> GetIds();
void Add(Job<Guid,string> entity);
void Update(Job<Guid,string> entity);
void Remove(Guid id);
IEnumerable<Job<Guid,string>> Get(Query<Job<Guid,string>> query);
Job<Guid,string> GetLast(Query<Job<Guid,string>> query);
void Remove(Query<Job<Guid,string>> query);
void UpdateField<TField>(Guid jobId, string fieldName, TField fieldValue);
int Count(); // inherited from BaseRepository
Unlike the MCLite job provider:
- Add/Update are fully implemented.
- Point update (
UpdateField) is supported for a whitelisted set of fields (currently:heartbeatonly).
3. Mapping between DB row and Job¶
SelectJob does a SELECT * FROM jobs and maps fields by index:
// indexes -> columns:
0: id (uuid)
1: accountid
2: taskid
3: hostid
4: tag
5: progress
6: status
7: statusmessage
8: requested
9: started
10: finished
11: parameters (JSONB)
12: metadata (varchar(2048), but JSON string)
13: hostgroup
14: priority
15: rejected
16: starting
17: heartbeat
For each row:
var job = reader.GetValue(1) is not DBNull
? new Job<Guid,string>(reader.GetGuid(0), reader.GetString(2), reader.GetString(1))
: new Job<Guid,string>(reader.GetGuid(0), reader.GetString(2));
So:
Id=idTaskId=taskidAccountId=accountid(optional; ctor overload used when present)HostId,Tag,Progress,Status,StatusMessage,Requested,Started,Finished,HostGroup,Priority,Rejected,Starting,Heartbeatall mapped if notDBNull.Statusis parsed from the stored string →DHI.Services.Jobs.JobStatusviaEnum.TryParse.-
ParametersandMetadataare deserialized from JSON:if (reader.GetValue(11) is not DBNull) { foreach (var kv in JsonSerializer.Deserialize<Dictionary<string, object>>(reader.GetString(11), _deserializerOptions)) job.Parameters.Add(kv.Key, kv.Value); } if (reader.GetValue(12) is not DBNull) { foreach (var kv in JsonSerializer.Deserialize<Dictionary<string, object>>(reader.GetString(12), _deserializerOptions)) job.Metadata.Add(kv.Key, kv.Value); }_deserializerOptionsalready has the “infer type” converters, so numbers, booleans, DateTimes, Guids, etc. are handled automatically.
On insert/update, we write:
insertCommand.AddParameter(NpgsqlDbType.Jsonb, "@parameters", JsonSerializer.Serialize(job.Parameters));
insertCommand.AddParameter(NpgsqlDbType.Varchar, "@metadata", JsonSerializer.Serialize(job.Metadata));
So:
parametersis stored as JSONB.metadatais stored as a string (varchar) containing JSON.
4. Basic usage (directly from code)¶
using DHI.Services.Jobs;
using DHI.Services.Provider.PostgreSQL;
// connection string can also include "Table=customschema.jobs"
var conn = "Server=localhost;Port=5432;Database=ProviderTest;User Id=postgres;Password=Solutions!";
var repo = new JobRepository(conn);
// create job
var job = new Job<Guid,string>(Guid.NewGuid(), taskId: "ingest-csv", accountId: "alice")
{
HostId = "worker-1",
HostGroup = "etl",
Priority = 10,
Tag = "nightly",
Progress = 0,
Status = JobStatus.Created,
StatusMessage = "Queued",
Requested = DateTime.UtcNow,
Parameters = { ["path"] = "/data/in.csv", ["delimiter"] = ";" },
Metadata = { ["tenant"] = "acme", ["correlationId"] = Guid.NewGuid().ToString() }
};
repo.Add(job);
// update status
job.Status = JobStatus.Running;
job.Starting = DateTime.UtcNow;
job.Heartbeat = DateTime.UtcNow;
repo.Update(job);
// point-update heartbeat only (more efficient)
repo.UpdateField(job.Id, "heartbeat", DateTime.UtcNow);
// query last completed job for a given task
var q = new Query<Job<Guid,string>>
{
new QueryCondition("taskid", QueryOperator.Equal, "ingest-csv"),
new QueryCondition("status", QueryOperator.Equal, JobStatus.Completed),
new QueryCondition("requested", QueryOperator.GreaterThan, DateTime.UtcNow.AddDays(-7))
};
var lastCompleted = repo.GetLast(q);
// delete
repo.Remove(job.Id);
5. Query model for jobs¶
Get(query) and GetLast(query) both call SelectJob(connectionString, query, last).
Each QueryCondition is translated based on its Item (case-insensitive):
Column-based filters¶
-
Date/time & int fields (
requested,rejected,starting,started,finished,priority,heartbeat):condition.Validate([=, !=, >, >=, <, <=]); var type = item == "priority" ? NpgsqlDbType.Integer : NpgsqlDbType.Timestamp; var param = cmd.AddParameter(type, "@" + item, condition.Value); segments.Add($"{item} {condition.QueryOperator.GetDescription()} {param}"); -
id(GUID):condition.Validate([=, !=, Any]); var param = cmd.AddParameter(NpgsqlDbType.Uuid, "@id", Guid.Parse($"{condition.Value}")); segments.Add($"id {condition.QueryOperator.GetDescription()} {param}"); -
status(string enum):condition.Validate([=, !=, Any]); if (QueryOperator.Any && condition.Value is JobStatus[] jobStatus) { var param = cmd.AddParameter(NpgsqlDbType.Array | NpgsqlDbType.Text, "@status", jobStatus.Select(js => js.ToString()).ToArray()); segments.Add($"status = ANY({param})"); } else { var param = cmd.AddParameter(NpgsqlDbType.Varchar, "@status", condition.Value.ToString()); segments.Add($"status {op} {param}"); } -
String fields (
accountid,hostid,hostgroup,taskid,tag,statusmessage):
condition.Validate([=, !=, LIKE, Any]);
if (QueryOperator.Any)
{
var param = cmd.AddParameter(NpgsqlDbType.Array | NpgsqlDbType.Text, "@field", condition.Value);
segments.Add($"{item} = ANY({param})");
}
else
{
var param = cmd.AddParameter(NpgsqlDbType.Varchar, "@field", condition.Value);
segments.Add($"{item} {op} {param}");
}
So you can write:
var q = new Query<Job<Guid,string>>
{
new QueryCondition("status", QueryOperator.Any, new[] { JobStatus.Created, JobStatus.Running }),
new QueryCondition("hostgroup", QueryOperator.Equal, "etl"),
new QueryCondition("priority", QueryOperator.GreaterThanOrEqual, 5),
new QueryCondition("requested", QueryOperator.GreaterThanOrEqual, DateTime.UtcNow.AddHours(-1))
};
var jobs = repo.Get(q);
JSONB parameter filters (default case)¶
Any condition.Item that is not one of the explicit names falls into the default branch:
default: // Assume querying parameters
segments.Add(condition.ToJsonCondition("parameters"));
break;
That uses the shared ToJsonCondition("parameters") extension to generate JSONB filters like:
"parameters" -> 'path' ->> ''comparisons,- numeric casts, boolean casts, DateTime casts, etc.
Example:
var q = new Query<Job<Guid,string>>
{
new QueryCondition("path", QueryOperator.Like, "%.csv"),
new QueryCondition("region", QueryOperator.Equal, "eu")
};
var csvJobsInEu = repo.Get(q);
6. UpdateField and allowed columns¶
UpdateField allows column-level updates but only for a whitelist:
private static readonly ConcurrentBag<string> ValidUpdateFields =
new ConcurrentBag<string>(new[] { "heartbeat" });
If fieldName is not in ValidUpdateFields, you get:
throw new ArgumentException($"{fieldName} is not a valid field name");
Implementation:
var sql = $"UPDATE {TableName} SET {fieldName}=@field_value WHERE id=@id";
command.AddParameter(NpgsqlDbType.Uuid, "@id", jobId);
command.Parameters.Add(new NpgsqlParameter("@field_value", fieldValue));
This is intentionally strict because column names cannot be parameterised, and we don’t want to expose arbitrary columns to external callers.
Typical scenario: heartbeat / worker health updates from job runners.
ScenarioRepository (PostgreSQL scenarios)¶
Namespace: DHI.Services.Provider.PostgreSQL
Base: BaseScenarioRepository (from DHI.Services.Jobs.Scenarios)
This repository stores job scenarios: loosely structured JSON payloads plus metadata, with support for soft delete and JSONB querying.
1. Data model & connection string¶
Default table: public.scenarios.
The constructor:
public ScenarioRepository(string connectionString, ILogger logger = null)
{
_logger = logger;
try
{
(_table, _connectionString) = connectionString.GetAddition("Table", _table);
_CreateDataModel(_connectionString);
_UpdateDataModel_3_12_0(_connectionString);
_UpdateDataModel_7_1_0(_connectionString);
}
catch (Exception ex)
{
if (_logger == null) throw;
_logger.LogError(ex, "Error occurred in {Repository} start up", GetType());
}
}
So, again, you can override the table:
Server=...;Database=ProviderTest;User Id=...;Password=...;Table=jobs.scenarios_pg
Initial create¶
_CreateDataModel:
- Checks
connection.TableExists(_table). - If not, creates:
CREATE TABLE IF NOT EXISTS public.scenarios(
id varchar(255) PRIMARY KEY,
version varchar(255),
lastjobid varchar(255),
datetime timestamp without time zone,
data text,
deleted timestamp without time zone
);
CREATE INDEX IF NOT EXISTS scenarios_index ON public.scenarios USING btree (datetime DESC);
Schema updates¶
-
_UpdateDataModel_3_12_0: convertdatato JSONB:ALTER TABLE public.scenarios ALTER COLUMN data TYPE JSONB USING data::JSONB; -
_UpdateDataModel_7_1_0: ensuredeletedcolumn exists:ALTER TABLE public.scenarios ADD COLUMN IF NOT EXISTS deleted timestamp without time zone;
So effective schema is:
id varchar(255) PK,
version varchar(255),
lastjobid varchar(255),
datetime timestamp without time zone,
data jsonb,
deleted timestamp without time zone
2. Repository API¶
Overrides from BaseScenarioRepository:
public override Maybe<Scenario> Get(string id, ClaimsPrincipal user = null);
public override IEnumerable<Scenario> GetAll(ClaimsPrincipal user = null);
public override IEnumerable<Scenario> Get(DateTime from, DateTime to, ClaimsPrincipal user = null);
public override IEnumerable<Scenario> Get(Query<Scenario> query, ClaimsPrincipal user = null);
public override void Add(Scenario scenario, ClaimsPrincipal user = null);
public override void Update(Scenario scenario, ClaimsPrincipal user = null);
public override void Remove(string id, ClaimsPrincipal user = null);
3. Mapping between DB row and Scenario¶
For reads (Get, Get(from,to), _SelectScenario), the mapping is:
// columns:
0: id (varchar)
1: version (varchar)
2: lastjobid (varchar)
3: datetime (timestamp)
4: data (jsonb)
5: deleted (timestamp)
Get example:
var version = Guid.NewGuid();
if (reader.GetValue(1) != DBNull.Value && reader.GetString(1) != "")
version = Guid.Parse(reader.GetString(1));
Guid? lastJobId = null;
if (reader.GetValue(2) != DBNull.Value && reader.GetString(2) != "")
lastJobId = Guid.Parse(reader.GetString(2));
var data = "";
if (reader.GetValue(4) != DBNull.Value && reader.GetString(4) != "")
data = reader.GetString(4);
DateTime? deleted = null;
if (reader.GetValue(5) != DBNull.Value)
deleted = reader.GetDateTime(5);
return Maybe.ToMaybe(new Scenario(reader.GetString(0))
{
Version = version,
LastJobId = lastJobId,
DateTime = reader.GetDateTime(3),
Data = data,
Deleted = deleted
});
On insert/update:
command.AddParameter(NpgsqlDbType.Varchar, "@id", scenario.Id);
command.AddParameter(NpgsqlDbType.Varchar, "@version", Guid.NewGuid().ToString());
command.AddParameter(NpgsqlDbType.Varchar, "@lastjobid", scenario.LastJobId.ToString());
command.AddParameter(NpgsqlDbType.Timestamp, "@datetime", scenario.DateTime ?? DateTime.UtcNow);
command.AddParameter(NpgsqlDbType.Json, "@data", string.IsNullOrEmpty(scenario.Data) ? string.Empty : scenario.Data);
command.AddParameter(NpgsqlDbType.Timestamp, "@deleted", scenario.Deleted);
Note:
datais written as JSON (text) but the column is JSONB; PostgreSQL will accept valid JSON text and store it as JSONB. Your domain is responsible for ensuringScenario.Datais valid JSON if you want JSONB queries to work properly.
4. Basic usage (direct code)¶
using DHI.Services.Jobs.Scenarios;
using DHI.Services.Provider.PostgreSQL;
var conn = "Server=localhost;Port=5432;Database=ProviderTest;User Id=postgres;Password=Solutions!";
var repo = new ScenarioRepository(conn);
// create scenario
var s = new Scenario("run-2025-01-01-01")
{
LastJobId = Guid.NewGuid(),
DateTime = DateTime.UtcNow,
Data = @"{ ""state"": ""Queued"", ""inputs"": { ""mesh"": ""dfsu-1"", ""timestep"": 60 } }"
};
repo.Add(s);
// read by id
var maybe = repo.Get("run-2025-01-01-01");
if (maybe.HasValue)
{
var loaded = maybe.Value;
}
// time window
var recent = repo.Get(DateTime.UtcNow.AddDays(-7), DateTime.UtcNow);
// update (e.g. finished)
s.Data = @"{ ""state"": ""Finished"", ""resultId"": ""abc-123"" }";
s.Deleted = null;
repo.Update(s);
// soft delete
s.Deleted = DateTime.UtcNow;
repo.Update(s);
// hard delete
repo.Remove(s.Id);
5. Query model for scenarios¶
Get(Query<Scenario>) uses _SelectScenario with per-field semantics:
foreach (var condition in query)
{
var item = condition.Item.ToLower();
switch (item)
{
case "id":
case "version":
case "lastjobid":
allowed: [Equal, NotEqual, Like]
param: varchar
segments.Add($"{item} {op} {param}");
case "deleted":
case "datetime":
allowed: [=, !=, >, >=, <, <=]
param: timestamp
segments.Add($"{item} {op} {param}");
default:
// JSONB in 'data'
segments.Add(condition.ToJsonCondition("data"));
break;
}
}
So you can:
- Filter by scenario id, version, or lastjobid using
Equal,NotEqual,Like. - Filter by
datetimeordeletedusing range operators. - Everything else is interpreted as JSONB path
data->....
Example — find all non-deleted scenarios for mesh="dfsu-1" where state is Queued or Running in last 30 days:
var q = new Query<Scenario>
{
new QueryCondition("datetime", QueryOperator.GreaterThanOrEqual, DateTime.UtcNow.AddDays(-30)),
new QueryCondition("deleted", QueryOperator.Equal, null), // 'deleted IS NULL' is not directly supported, so you might omit this or handle differently in domain layer
new QueryCondition("inputs.mesh", QueryOperator.Equal, "dfsu-1"),
new QueryCondition("state", QueryOperator.Any, new[] { "Queued", "Running" }) // JSONB ANY
};
var matches = repo.Get(q);
(Exact generated SQL for JSON parts comes from QueryCondition.ToJsonCondition("data"), which is described in PostgreSQL Providers.)
Wiring into Jobs Web API via Connections¶
For a typical Jobs Web API host that routes endpoints by {connectionId}, you’ll set up connections as you showed:
"postgres-job": {
"$type": "DHI.Services.Jobs.WebApi.JobServiceConnection, DHI.Services.Jobs.WebApi",
"JobRepositoryConnectionString": "...",
"JobRepositoryType": "DHI.Services.Provider.PostgreSQL.JobRepository, DHI.Services.Provider.PostgreSQL",
"TaskRepositoryConnectionString": "[AppData]workflows.json",
"TaskRepositoryType": "DHI.Services.Jobs.Workflows.WorkflowRepository, DHI.Services.Jobs",
"Name": "PostgreSQL job service connection to workspace1",
"Id": "postgres-job"
}
At runtime, JobServiceConnection:
- Uses reflection to create:
JobRepositorywith the given connection string.WorkflowRepository(for tasks) with its JSON file path.
- Constructs a
JobService<Guid,string,Workflow>. - Exposes it under the id
"postgres-job"to your controllers.
So:
GET /api/jobs/postgres-job/{jobId}
POST /api/jobs/postgres-job
...
…will target the PostgreSQL-backed job repository.
Similarly for scenarios:
"postgres-scenarios": {
"$type": "DHI.Services.Jobs.WebApi.ScenarioServiceConnection, DHI.Services.Jobs.WebApi",
"ConnectionString": "Server=...;Database=ProviderTest;User Id=...;Password=...",
"RepositoryType": "DHI.Services.Provider.PostgreSQL.ScenarioRepository, DHI.Services.Provider.PostgreSQL",
"JobRepositoryConnectionString": "jobs.json",
"JobRepositoryType": "DHI.Services.Jobs.WebApi.JobRepository, DHI.Services.Jobs.WebApi",
"Name": "Scenario service connection",
"Id": "postgres-scenarios"
}
This creates:
- A
ScenarioRepository(PostgreSQL). - A
JobRepository(WebApi / JSON-backed in your sample) for cross-referencing jobs in scenario APIs. - A
ScenarioServicethat ties them together.
Typical patterns¶
Jobs
- ✓ Use
Addto insert new jobs,Updateto modify full rows,UpdateField("heartbeat", ...)for efficient heartbeats. - ✓ Use
GetLast(query)to fetch the most recent job byrequesteddate. - ✓ Store structured parameters in
job.Parametersand leverage JSONB querying viaQueryConditionitems that don’t match a known column name. - Ensure
Scenario.DataandJob.Parametersare valid JSON if you want to query them as JSONB.
Scenarios
- ✓ Use
Deletedproperty for soft delete; physically removing is still available viaRemove. - ✓ Combine
Get(DateTime from, to)for time-windows andGet(Query<Scenario>)for JSONB-based filters. - ✓
LastJobIdlets you join scenarios with jobs at the application/service level (e.g., viaScenarioServiceand a job repo).
General
- Connection string
Table=override is supported for both providers:- e.g.,
"...;Table=jobs.job_history"or"...;Table=jobs.scenarios_pg".
- e.g.,
- DateTime handling & JSON converters behave exactly as described in
PostgreSQL Providers
so treat
timestamp without time zoneas “UTC-but-Unspecified” in your domain.
With these pieces, you can build PostgreSQL-backed job & scenario services that plug cleanly into the existing DHI.Services.Jobs ecosystem, either directly from code or via the standard Web API + Connections setup.