Architecture¶
v3 is a single, layered architecture. Each layer has one responsibility and communicates with the
next through small, immutable value objects. There is no mutable god-object: definitions describe
work, handlers contain only business logic, backends move messages, and the worker
orchestrates a single attempt per delivery. This page explains the layers, the value objects that
flow between them, the worker and cron pipelines, the QueueBackend contract, and the at-least-once
delivery guarantee.
If you are new to the package, read Quick Start first; this page is the conceptual map behind it.
Layers¶
| Layer | Namespace | Responsibility |
|---|---|---|
| Definition | Daycry\Jobs\Definition |
Describe a job. JobBuilder (fluent, mutable accumulator) produces an immutable JobDefinition value object. |
| Handlers | Daycry\Jobs\Handlers |
Business logic only. JobHandlerInterface::handle(JobContext); HandlerRegistry resolves keys and enforces the per-queue allowlist. |
| Queues | Daycry\Jobs\Queues |
Transport. One QueueBackend contract with lease semantics; JobEnvelope is the wire message, JobLease is an in-flight reservation. EnvelopeFactory builds and signs the wire payload; BackendFactory resolves backends from config. |
| Backends | Daycry\Jobs\Queues\Backends |
Concrete transports: SyncBackend, DatabaseBackend, RedisBackend, BeanstalkBackend, ServiceBusBackend. |
| Signing | Daycry\Jobs\Queues\Signing |
EnvelopeSigner — HMAC-SHA256 sign/verify with constant-time comparison. |
| Execution | Daycry\Jobs\Execution |
Run a single attempt. JobRuntime, JobContext, ExecutionResult, Timeout, RetryPolicy/RetryPolicyFixed, IdempotencyGuard, SingleInstanceLock. |
| Worker | Daycry\Jobs\Worker |
QueueWorker integrates the pipeline: fetch → verify → idempotency → run one attempt → ack/nack/abandon. WorkerResult reports the outcome. |
| Cron | Daycry\Jobs\Cron |
Scheduler (fluent registry of JobBuilders) and CronRunner (evaluate due definitions; enqueue or run inline). |
| Config | Daycry\Jobs\Config |
Config\Jobs — handler/backend maps, security allowlists, retry/timeout settings, and init() to register scheduled jobs. |
| Support | Daycry\Jobs\Libraries, Daycry\Jobs\Metrics |
Cross-cutting helpers: CircuitBreaker, RateLimiter, DeadLetterQueue, serializers, and the MetricsCollectorInterface collectors. |
Core value objects¶
The layers talk to each other only through these small, mostly immutable objects:
JobDefinition(readonly): what the job is — handler key, payload, scheduling, retry policy, identity. EverywithXxx()helper returns a new instance, so a definition can be shared across enqueue sites without spooky action at a distance.JobEnvelope/ wire payload: how the definition travels through a queue. The canonical wire object is:
{ "job", "payload", "queue", "priority", "maxRetries", "attempts", "name", "identifier", "idempotencyKey", "schedule", "_sig" }
Built by EnvelopeFactory::toWire(), identical across every backend so a message enqueued via one
path can be consumed via another.
- JobLease (readonly): a reservation on an in-flight message —
- token: opaque backend handle needed to ack/nack later (DB primary key, Redis processing
entry, Service Bus LockToken, Beanstalk job id);
- ownerToken: a random token minted by the worker at fetch() so a reaped-then-reassigned
message cannot be acked by the previous owner;
- expiresAt: the visibility deadline. After it passes the backend may redeliver; a long-running
worker can renew() it.
- JobContext (readonly): what a handler sees — payload, name, queue, attempt,
meta. Handlers never receive scheduling or queue state.
- ExecutionResult: the outcome of one handler run — success, output, error, startedAt,
endedAt (and durationSeconds()), and the resolved handlerClass.
Queue pipeline (worker)¶
QueueWorker::processOnce($queue) drives one message end to end:
- fetch —
backend->fetch($queue)leases one ready message, or returnsnull(statusempty). - verify — when
verifyEnvelopeSignatureis on and a key is configured, the HMAC_sigis verified over the canonical identity JSON withhash_equals(). A tampered/forged message isabandon()ed (statusrejected). An envelope whose payload is not a valid object is alsorejected. - idempotency — if the envelope carries an
idempotencyKey,IdempotencyGuard::firstRun()short-circuits a duplicate:ack()without running (statusskipped-idempotent). - run one attempt —
JobRuntime::run(definition, context)resolves the handler (enforcing the per-queue allowlist), acquires the single-instance lock if requested, applies the timeout, captures output, and returns anExecutionResult. It runs once and never sleeps. - decide (the retry decision lives here, not in the runtime):
- success →
ack()(statusacked), - failure with retries left (
attemptIndex < maxRetries) →nack($lease, $delay)where$delaycomes from theRetryPolicy(statusrequeued); the backend requeues, - retries exhausted →
abandon()(statusdead-lettered).abandon()routes to the backend's native dead-letter where one exists (Beanstalkbury/ Service Bus afterMaxDeliveryCount) and otherwise just drops the message (Database/Redis). The app-levelDeadLetterQueuehelper is opt-in and is not invoked by the worker.
Total runs for a job are therefore maxRetries + 1. Backoff is realised by the backend honouring
nack($lease, $delaySeconds). See Retries.
┌───────────────────────── QueueWorker::processOnce($queue) ─────────────────────────┐
│ │
fetch(lease) ──► verify signature ──► idempotency guard ──► run ONE attempt (JobRuntime) │
│ │ │ │ │
null tampered duplicate │ │
│ │ │ ┌───────────┼───────────────┐ │
▼ ▼ ▼ ▼ ▼ ▼ │
"empty" abandon() ack() success failure (retries) retries exhausted │
"rejected" "skipped-idempotent" │ │ │ │
ack() nack(delay) abandon() │
"acked" "requeued" "dead-lettered" │
(backend requeues) native DLQ or drop │
(app DLQ is opt-in) │
└─────────────────────────────────────────────────────────────────────────────────┘
The CLI command jobs:queue:work wraps this loop with a circuit breaker, per-queue rate limits and
graceful shutdown. See CLI Commands and Concurrency.
Cron pipeline¶
jobs:cronjob:runbuilds aSchedulerand callsConfig\Jobs::init($scheduler).CronRunner::run()walks the definitions in topological order ofdependsOn()(Scheduler::getExecutionOrder()performs the sort and throws on circular/unknown dependencies).- For each definition it skips disabled jobs (
enabled()) and jobs outside the currentenvironments(), then evaluates the cron expression against "now" (or a frozen-testTime). - A due definition with a
queueis enqueued viaBackendFactory::make(...)->enqueue(); otherwise it runs inline throughJobRuntime(one attempt). The runner never sleeps between jobs.
See Scheduling.
The QueueBackend contract¶
A single contract underpins all five backends:
interface QueueBackend
{
/** Persist the definition into the queue. Returns the backend-assigned id. */
public function enqueue(JobDefinition $definition): string;
/** Lease one ready message, or null when the queue is empty (after any blocking timeout). */
public function fetch(string $queue): ?JobLease;
/** Mark the leased message processed; the backend removes it permanently. */
public function ack(JobLease $lease): bool;
/** Redeliver the message, optionally after $delaySeconds (retry backoff). */
public function nack(JobLease $lease, ?int $delaySeconds = null): bool;
/** Stop holding the lease without retrying (unprocessable / DLQ). */
public function abandon(JobLease $lease): bool;
/** Reclaim messages whose lease expired (crashed/stalled worker). Returns the count recovered. */
public function reapExpired(string $queue, int $visibilityTimeout): int;
}
The contract is stateless with respect to the in-flight message: the worker holds the JobLease
and passes it back to ack/nack/abandon, so the backend never has to remember which message a
given worker is processing. This replaces the legacy split between a queue interface and a worker
interface that coupled the two sides through backend instance state.
| Backend | Notes |
|---|---|
SyncBackend |
enqueue() runs the job inline via JobRuntime and returns a synthetic sync-... id; fetch() is always null, the lease verbs are no-ops returning true. Ideal for tests/CLI. |
DatabaseBackend |
Atomic reservation; reapExpired() returns rows past the visibility timeout to a runnable state; requeue is an in-place UPDATE (no orphan rows). |
RedisBackend |
Reliable-queue pattern (waiting/processing lists) with owner tokens; nack(delay) uses a delayed ZSET; reapExpired() recovers stalled leases. |
BeanstalkBackend |
nack() re-serialises the payload with attempts+1 via delete + re-put (the native release verb cannot mutate the job body); the requested delay is applied on the re-put. abandon() buries the job (native DLQ). Per-job TTR makes the server recover stalled jobs natively (reapExpired() is a no-op). |
ServiceBusBackend |
Peek-lock with serviceBusLockTimeout; the broker redelivers on lock expiry. |
reapExpired() is exercised by jobs:queue:reap for the Database and Redis backends; Beanstalk and
Service Bus recover natively. See Queues & Workers.
At-least-once delivery¶
Every persistent backend provides at-least-once delivery, not exactly-once. A message can be delivered (and a handler can therefore run) more than once. This happens by design when:
- a worker crashes after running a job but before
ack()— the lease expires and the reaper (or the broker) makes the message eligible again; - a
nack()requeues a failed attempt for retry; - a visibility timeout elapses while a slow job is still running (mitigated by setting the timeout
above the maximum expected runtime, or by
JobLease::renew()).
Implications for your handlers:
- Make handlers idempotent. Running twice should not corrupt state (use upserts, conditional writes, or dedupe on a natural key).
- Use
idempotencyKey()for hard dedupe. When a definition carries anidempotencyKey,EnvelopeFactory::toWire()serialises it onto the envelope (as a signed identity field) and the worker, viaIdempotencyGuard, skips a message whose key was already processed withinidempotencyTtl— itacks without running (statusskipped-idempotent). This is opt-in. Because delivery is at-least-once and the dedupe is best-effort under crash/redelivery, still keep handlers idempotent at the application level. See Idempotency in depth. - Size visibility timeouts correctly.
databaseVisibilityTimeout/redisProcessingVisibilityTimeout/serviceBusLockTimeoutmust exceed the longest job, or a live job may be reclaimed and run concurrently. See Configuration.
Warning: Owner tokens guard
ack/nackso a previous owner cannot ack a reaped-and-reassigned message, but they do not prevent two runs of the same payload — that is the nature of at-least-once. Idempotency is the application-level guarantee.
Execution & resilience¶
- One attempt per fetch —
JobRuntimeruns the handler exactly once; the worker owns the retry decision. This eliminates the legacy double-retry (a coordinator loop and a requeue helper both consumingmaxRetries). - Timeout that interrupts —
Timeoutinstalls a SIGALRM handler that throws (withpcntl_async_signals(true)), so even CPU-bound code is interrupted at the deadline. Withoutpcntlit degrades to a documented soft check. - Idempotency — opt-in
IdempotencyGuarddeduplicates by key (TTLidempotencyTtl). - Single-instance lock —
SingleInstanceLockuses an ownership token so a release never frees a lock held by a different owner; contention surfaces as a failed result that the worker requeues. - Circuit breaker & rate limit —
jobs:queue:workskips a failing backend (circuitBreakerThreshold/circuitBreakerCooldown) and honoursqueueRateLimits. - Graceful shutdown — SIGTERM/SIGINT finish the current cycle and exit; the in-flight job is not aborted mid-run.
See Concurrency and Retries.
Security¶
- Envelope signing —
EnvelopeFactory::toWire()signs the immutable identity fields with HMAC-SHA256 at enqueue; the worker rejects tampered/forged messages. Key resolution:Config\Jobs::$signingKey→env('JOBS_SIGNING_KEY')→ Encryption key. - Per-queue handler allowlist —
HandlerRegistry::resolveForQueue()refuses handler keys not listed for a queue (Config\Jobs::$queueHandlers). - ShellHandler deny-by-default — an empty
$allowedShellCommandsrejects everything; execution is viaproc_openwith an argv array (never/bin/sh -c). - EventHandler allowlist — only
$allowedEventsmay be fired (empty = deny all). - UrlHandler anti-SSRF — http/https only, private/reserved IPv4/IPv6 rejected, SSL verification forced, redirects disabled.
See Security.
Extending the architecture¶
- Custom handler — implement
JobHandlerInterface(or extendAbstractJobHandler/TypedJobHandler) and register the key inConfig\Jobs::$handlers. See Handlers. - Custom backend — implement
QueueBackendand register it inConfig\Jobs::$backends. See Queues & Workers. - Custom retry policy — implement
RetryPolicyand inject it intoQueueWorker. See Retries. - Metrics — implement
MetricsCollectorInterfaceand setConfig\Jobs::$metricsCollector. See Configuration.