0gkitdocsGitHub

@foundryprotocol/0gkit-jobs

Durable async job runner for long-running 0G workflows (inference, agents, batched uploads, DA publishes). Three swappable backends; HMAC-signed webhooks; graceful shutdown for serverless.

What it does

Wraps a worker loop, a typed job registry, and a pluggable persistence layer behind a single JobRunner. Enqueue with a zod-typed input, get an opaque id; the runner claims, runs, validates the output, persists, and (optionally) fires a signed webhook on each state change.

When to use it

  • Long-running inference where a synchronous HTTP timeout isn't survivable.
  • Multi-step agents whose orchestration outlives a single request.
  • Webhook-driven workflows where the receiver must trust the sender.
  • Anywhere "at-least-once retry with bounded backoff" is the right delivery semantic.

Install

pnpm add @foundryprotocol/0gkit-jobs @foundryprotocol/0gkit-core zod

# Redis backend only
pnpm add ioredis

Define + run a job

import { JobRunner, jobs } from "@foundryprotocol/0gkit-jobs";
import { MemoryBackend } from "@foundryprotocol/0gkit-jobs/backends/memory";
import { z } from "zod";

const InferenceJob = jobs.define({
  name: "inference",
  input: z.object({ prompt: z.string(), model: z.string() }),
  output: z.object({ text: z.string() }),
  handler: async ({ input, signer, signal }) => {
    if (signal.aborted) throw new Error("shutting down");
    // ...call Compute / Storage / DA here.
    return { text: "..." };
  },
});

const runner = new JobRunner({
  backend: new MemoryBackend(),
  signer,
  webhook: { url: process.env.WEBHOOK_URL!, secret: process.env.WEBHOOK_SECRET! },
});
runner.register(InferenceJob);
await runner.start({ concurrency: 4 });

const id = await runner.enqueue(InferenceJob, { prompt: "hi", model: "..." });
const final = await runner.waitFor(id);

Backends

BackendInstallWhen to use
memory(built-in)dev, tests, ephemeral workflows
sqlite(built-in via better-sqlite3)single-node prod, no extra infra
redisoptional peer ioredismulti-node prod, fan-out

Backends share a single conformance contract — same behaviour across all three, the only differences are durability and concurrency reach.

Webhook verification (Express)

import { jobs } from "@foundryprotocol/0gkit-jobs";

app.post("/api/jobs/webhook", express.text({ type: "*/*" }), (req, res) => {
  const ok = jobs.verifyWebhook({
    body: req.body,
    signature: req.header("x-0gkit-signature") ?? "",
    secret: process.env.JOBS_SECRET!,
  });
  if (!ok) return res.status(401).send("bad signature");
  // ... dedupe on (jobId, newState)
});

The signed payload is the exact request body (not a re-serialised JSON), so consumers must read raw bytes before verifying.

Graceful shutdown (Vercel Fluid Compute)

process.on("beforeExit", async () => {
  await runner.stop({ drain: true, timeoutMs: 25_000 });
});

stop({ drain: true }) lets in-flight handlers finish; stop({ drain: false }) aborts them via the AbortSignal passed into the handler ctx.

CLI

0g jobs status <id> [--backend memory|sqlite] [--path ./.jobs.db] [--json]

Read-only inspector against a backend you point it at — useful in CI logs and post-mortems. The production runner lives in your app, not in the CLI.

Error codes

See also the durable jobs concept page for the delivery model and idempotency requirements.

Exports

  • ClaimOpts
  • JobBackend
  • JobDefinition
  • JobHandlerContext
  • JobMetadata
  • JobRecord
  • JobState
  • RunnerConfig
  • WebhookConfig