Skip to content

Architecture

v3 is a single, layered architecture. Each layer has one responsibility and communicates with the next through small, immutable value objects. There is no mutable god-object: definitions describe work, handlers contain only business logic, backends move messages, and the worker orchestrates a single attempt per delivery. This page explains the layers, the value objects that flow between them, the worker and cron pipelines, the QueueBackend contract, and the at-least-once delivery guarantee.

If you are new to the package, read Quick Start first; this page is the conceptual map behind it.


Layers

Layer Namespace Responsibility
Definition Daycry\Jobs\Definition Describe a job. JobBuilder (fluent, mutable accumulator) produces an immutable JobDefinition value object.
Handlers Daycry\Jobs\Handlers Business logic only. JobHandlerInterface::handle(JobContext); HandlerRegistry resolves keys and enforces the per-queue allowlist.
Queues Daycry\Jobs\Queues Transport. One QueueBackend contract with lease semantics; JobEnvelope is the wire message, JobLease is an in-flight reservation. EnvelopeFactory builds and signs the wire payload; BackendFactory resolves backends from config.
Backends Daycry\Jobs\Queues\Backends Concrete transports: SyncBackend, DatabaseBackend, RedisBackend, BeanstalkBackend, ServiceBusBackend.
Signing Daycry\Jobs\Queues\Signing EnvelopeSigner — HMAC-SHA256 sign/verify with constant-time comparison.
Execution Daycry\Jobs\Execution Run a single attempt. JobRuntime, JobContext, ExecutionResult, Timeout, RetryPolicy/RetryPolicyFixed, IdempotencyGuard, SingleInstanceLock.
Worker Daycry\Jobs\Worker QueueWorker integrates the pipeline: fetch → verify → idempotency → run one attempt → ack/nack/abandon. WorkerResult reports the outcome.
Cron Daycry\Jobs\Cron Scheduler (fluent registry of JobBuilders) and CronRunner (evaluate due definitions; enqueue or run inline).
Config Daycry\Jobs\Config Config\Jobs — handler/backend maps, security allowlists, retry/timeout settings, and init() to register scheduled jobs.
Support Daycry\Jobs\Libraries, Daycry\Jobs\Metrics Cross-cutting helpers: CircuitBreaker, RateLimiter, DeadLetterQueue, serializers, and the MetricsCollectorInterface collectors.

Core value objects

The layers talk to each other only through these small, mostly immutable objects:

  • JobDefinition (readonly): what the job is — handler key, payload, scheduling, retry policy, identity. Every withXxx() helper returns a new instance, so a definition can be shared across enqueue sites without spooky action at a distance.
  • JobEnvelope / wire payload: how the definition travels through a queue. The canonical wire object is:
{ "job", "payload", "queue", "priority", "maxRetries", "attempts", "name", "identifier", "idempotencyKey", "schedule", "_sig" }

Built by EnvelopeFactory::toWire(), identical across every backend so a message enqueued via one path can be consumed via another. - JobLease (readonly): a reservation on an in-flight message — - token: opaque backend handle needed to ack/nack later (DB primary key, Redis processing entry, Service Bus LockToken, Beanstalk job id); - ownerToken: a random token minted by the worker at fetch() so a reaped-then-reassigned message cannot be acked by the previous owner; - expiresAt: the visibility deadline. After it passes the backend may redeliver; a long-running worker can renew() it. - JobContext (readonly): what a handler seespayload, name, queue, attempt, meta. Handlers never receive scheduling or queue state. - ExecutionResult: the outcome of one handler run — success, output, error, startedAt, endedAt (and durationSeconds()), and the resolved handlerClass.


Queue pipeline (worker)

QueueWorker::processOnce($queue) drives one message end to end:

  1. fetchbackend->fetch($queue) leases one ready message, or returns null (status empty).
  2. verify — when verifyEnvelopeSignature is on and a key is configured, the HMAC _sig is verified over the canonical identity JSON with hash_equals(). A tampered/forged message is abandon()ed (status rejected). An envelope whose payload is not a valid object is also rejected.
  3. idempotency — if the envelope carries an idempotencyKey, IdempotencyGuard::firstRun() short-circuits a duplicate: ack() without running (status skipped-idempotent).
  4. run one attemptJobRuntime::run(definition, context) resolves the handler (enforcing the per-queue allowlist), acquires the single-instance lock if requested, applies the timeout, captures output, and returns an ExecutionResult. It runs once and never sleeps.
  5. decide (the retry decision lives here, not in the runtime):
  6. success → ack() (status acked),
  7. failure with retries left (attemptIndex < maxRetries) → nack($lease, $delay) where $delay comes from the RetryPolicy (status requeued); the backend requeues,
  8. retries exhausted → abandon() (status dead-lettered). abandon() routes to the backend's native dead-letter where one exists (Beanstalk bury / Service Bus after MaxDeliveryCount) and otherwise just drops the message (Database/Redis). The app-level DeadLetterQueue helper is opt-in and is not invoked by the worker.

Total runs for a job are therefore maxRetries + 1. Backoff is realised by the backend honouring nack($lease, $delaySeconds). See Retries.

                    ┌───────────────────────── QueueWorker::processOnce($queue) ─────────────────────────┐
                    │                                                                                     │
 fetch(lease) ──►  verify signature  ──►  idempotency guard  ──►  run ONE attempt (JobRuntime)           │
     │                   │                       │                        │                              │
   null              tampered                 duplicate                   │                              │
     │                   │                       │            ┌───────────┼───────────────┐              │
     ▼                   ▼                       ▼            ▼           ▼                 ▼              │
  "empty"           abandon()                 ack()        success   failure (retries)  retries exhausted │
                  "rejected"          "skipped-idempotent"   │            │                 │             │
                                                            ack()     nack(delay)        abandon()        │
                                                          "acked"   "requeued"        "dead-lettered"     │
                                                                  (backend requeues)  native DLQ or drop  │
                                                                                      (app DLQ is opt-in) │
                    └─────────────────────────────────────────────────────────────────────────────────┘

The CLI command jobs:queue:work wraps this loop with a circuit breaker, per-queue rate limits and graceful shutdown. See CLI Commands and Concurrency.


Cron pipeline

  1. jobs:cronjob:run builds a Scheduler and calls Config\Jobs::init($scheduler).
  2. CronRunner::run() walks the definitions in topological order of dependsOn() (Scheduler::getExecutionOrder() performs the sort and throws on circular/unknown dependencies).
  3. For each definition it skips disabled jobs (enabled()) and jobs outside the current environments(), then evaluates the cron expression against "now" (or a frozen -testTime).
  4. A due definition with a queue is enqueued via BackendFactory::make(...)->enqueue(); otherwise it runs inline through JobRuntime (one attempt). The runner never sleeps between jobs.

See Scheduling.


The QueueBackend contract

A single contract underpins all five backends:

interface QueueBackend
{
    /** Persist the definition into the queue. Returns the backend-assigned id. */
    public function enqueue(JobDefinition $definition): string;

    /** Lease one ready message, or null when the queue is empty (after any blocking timeout). */
    public function fetch(string $queue): ?JobLease;

    /** Mark the leased message processed; the backend removes it permanently. */
    public function ack(JobLease $lease): bool;

    /** Redeliver the message, optionally after $delaySeconds (retry backoff). */
    public function nack(JobLease $lease, ?int $delaySeconds = null): bool;

    /** Stop holding the lease without retrying (unprocessable / DLQ). */
    public function abandon(JobLease $lease): bool;

    /** Reclaim messages whose lease expired (crashed/stalled worker). Returns the count recovered. */
    public function reapExpired(string $queue, int $visibilityTimeout): int;
}

The contract is stateless with respect to the in-flight message: the worker holds the JobLease and passes it back to ack/nack/abandon, so the backend never has to remember which message a given worker is processing. This replaces the legacy split between a queue interface and a worker interface that coupled the two sides through backend instance state.

Backend Notes
SyncBackend enqueue() runs the job inline via JobRuntime and returns a synthetic sync-... id; fetch() is always null, the lease verbs are no-ops returning true. Ideal for tests/CLI.
DatabaseBackend Atomic reservation; reapExpired() returns rows past the visibility timeout to a runnable state; requeue is an in-place UPDATE (no orphan rows).
RedisBackend Reliable-queue pattern (waiting/processing lists) with owner tokens; nack(delay) uses a delayed ZSET; reapExpired() recovers stalled leases.
BeanstalkBackend nack() re-serialises the payload with attempts+1 via delete + re-put (the native release verb cannot mutate the job body); the requested delay is applied on the re-put. abandon() buries the job (native DLQ). Per-job TTR makes the server recover stalled jobs natively (reapExpired() is a no-op).
ServiceBusBackend Peek-lock with serviceBusLockTimeout; the broker redelivers on lock expiry.

reapExpired() is exercised by jobs:queue:reap for the Database and Redis backends; Beanstalk and Service Bus recover natively. See Queues & Workers.


At-least-once delivery

Every persistent backend provides at-least-once delivery, not exactly-once. A message can be delivered (and a handler can therefore run) more than once. This happens by design when:

  • a worker crashes after running a job but before ack() — the lease expires and the reaper (or the broker) makes the message eligible again;
  • a nack() requeues a failed attempt for retry;
  • a visibility timeout elapses while a slow job is still running (mitigated by setting the timeout above the maximum expected runtime, or by JobLease::renew()).

Implications for your handlers:

  1. Make handlers idempotent. Running twice should not corrupt state (use upserts, conditional writes, or dedupe on a natural key).
  2. Use idempotencyKey() for hard dedupe. When a definition carries an idempotencyKey, EnvelopeFactory::toWire() serialises it onto the envelope (as a signed identity field) and the worker, via IdempotencyGuard, skips a message whose key was already processed within idempotencyTtl — it acks without running (status skipped-idempotent). This is opt-in. Because delivery is at-least-once and the dedupe is best-effort under crash/redelivery, still keep handlers idempotent at the application level. See Idempotency in depth.
  3. Size visibility timeouts correctly. databaseVisibilityTimeout / redisProcessingVisibilityTimeout / serviceBusLockTimeout must exceed the longest job, or a live job may be reclaimed and run concurrently. See Configuration.

Warning: Owner tokens guard ack/nack so a previous owner cannot ack a reaped-and-reassigned message, but they do not prevent two runs of the same payload — that is the nature of at-least-once. Idempotency is the application-level guarantee.


Execution & resilience

  • One attempt per fetchJobRuntime runs the handler exactly once; the worker owns the retry decision. This eliminates the legacy double-retry (a coordinator loop and a requeue helper both consuming maxRetries).
  • Timeout that interruptsTimeout installs a SIGALRM handler that throws (with pcntl_async_signals(true)), so even CPU-bound code is interrupted at the deadline. Without pcntl it degrades to a documented soft check.
  • Idempotency — opt-in IdempotencyGuard deduplicates by key (TTL idempotencyTtl).
  • Single-instance lockSingleInstanceLock uses an ownership token so a release never frees a lock held by a different owner; contention surfaces as a failed result that the worker requeues.
  • Circuit breaker & rate limitjobs:queue:work skips a failing backend (circuitBreakerThreshold / circuitBreakerCooldown) and honours queueRateLimits.
  • Graceful shutdown — SIGTERM/SIGINT finish the current cycle and exit; the in-flight job is not aborted mid-run.

See Concurrency and Retries.


Security

  • Envelope signingEnvelopeFactory::toWire() signs the immutable identity fields with HMAC-SHA256 at enqueue; the worker rejects tampered/forged messages. Key resolution: Config\Jobs::$signingKeyenv('JOBS_SIGNING_KEY') → Encryption key.
  • Per-queue handler allowlistHandlerRegistry::resolveForQueue() refuses handler keys not listed for a queue (Config\Jobs::$queueHandlers).
  • ShellHandler deny-by-default — an empty $allowedShellCommands rejects everything; execution is via proc_open with an argv array (never /bin/sh -c).
  • EventHandler allowlist — only $allowedEvents may be fired (empty = deny all).
  • UrlHandler anti-SSRF — http/https only, private/reserved IPv4/IPv6 rejected, SSL verification forced, redirects disabled.

See Security.


Extending the architecture

  1. Custom handler — implement JobHandlerInterface (or extend AbstractJobHandler / TypedJobHandler) and register the key in Config\Jobs::$handlers. See Handlers.
  2. Custom backend — implement QueueBackend and register it in Config\Jobs::$backends. See Queues & Workers.
  3. Custom retry policy — implement RetryPolicy and inject it into QueueWorker. See Retries.
  4. Metrics — implement MetricsCollectorInterface and set Config\Jobs::$metricsCollector. See Configuration.