Storing Laravel Job Batches in Redis

/ Content

In the previous post I went through the Redis command alphabet, pipelines, transactions, and LUA scripts. This post is the practical application of all of it. We're replacing Laravel's database batch repository with one backed by Redis.

The job_batches table in a busy system becomes a problem quickly. Every dispatched batch writes a row. Every job completion updates it. Finished batches pile up. On a system processing thousands of batches a day, you're looking at millions of rows within a few weeks, most of which represent completed work you'll never query again. The data is transient but the table is not.

Redis is a much better fit for batch tracking. Reads and writes are fast, keys can have TTLs, and there's no table to bloat or index to maintain. The tradeoff is you lose SQL querying, but you were never really doing complex analytical queries on job_batches anyway.

The Interface

Laravel's BatchRepository interface is the contract we need to fulfill:

interface BatchRepository
{
    public function get(int $limit, mixed $before): array;
    public function find(string $batchId): ?Batch;
    public function store(PendingBatch $batch): Batch;
    public function incrementTotalJobs(string $batchId, int $amount): void;
    public function decrementPendingJobs(string $batchId, string $jobId): UpdatedBatchJobCounts;
    public function incrementFailedJobs(string $batchId, string $jobId): UpdatedBatchJobCounts;
    public function markAsFinished(string $batchId): void;
    public function cancel(string $batchId): void;
    public function delete(string $batchId): void;
    public function transaction(Closure $callback): mixed;
}

There's also PrunableBatchRepository for cleanup:

interface PrunableBatchRepository extends BatchRepository
{
    public function prune(DateTimeInterface $before): int;
    public function pruneUnfinished(DateTimeInterface $before): int;
    public function pruneCancelled(DateTimeInterface $before): int;
}

The decrementPendingJobs and incrementFailedJobs methods return an UpdatedBatchJobCounts value object that Laravel uses to decide whether to fire the batch's then, catch, or finally callbacks. We'll need to construct that correctly.

Data Structure Design

Before writing any code, figure out the data model. Three structures cover everything we need.

A hash per batch stores the batch fields:

batch:{id} -> Hash {
    id, name, total_jobs, pending_jobs, failed_jobs,
    options, created_at, cancelled_at, finished_at
}

A sorted set keyed by created timestamp handles ordering and pagination:

batches:created_at -> ZSet { score: created_at_timestamp, member: batch_id }

A second sorted set keyed by finished timestamp handles pruning:

batches:finished_at -> ZSet { score: finished_at_timestamp, member: batch_id }

A set per batch tracks which job IDs failed:

batch:{id}:failed_job_ids -> Set { job_id, job_id, ... }

This design maps cleanly to the operations we need. Finding a batch by ID is a single HGETALL. Paginating batches by creation time is a ZREVRANGEBYSCORE. Pruning old finished batches is a ZRANGEBYSCORE with a timestamp cutoff. Checking failed job IDs is SMEMBERS.

The Repository Implementation

The constructor takes a BatchFactory (Laravel's own class for instantiating Batch objects), a Redis connection, and a prefix string:

<?php

namespace App\Queue;

use Carbon\Carbon;
use Closure;
use DateTimeInterface;
use Illuminate\Bus\Batch;
use Illuminate\Bus\BatchFactory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Bus\PrunableBatchRepository;
use Illuminate\Bus\UpdatedBatchJobCounts;
use Illuminate\Redis\Connections\Connection;
use Illuminate\Support\Str;

class RedisBatchRepository implements PrunableBatchRepository
{
    public function __construct(
        protected BatchFactory $factory,
        protected Connection $connection,
        protected string $redisPrefix = 'job_batches',
    ) {}

Key helpers keep the key names consistent:

    protected function getBatchKey(string $batchId): string
    {
        return "{$this->redisPrefix}:batch_{$batchId}";
    }

    protected function getBatchFailedJobIdsKey(string $batchId): string
    {
        return "{$this->redisPrefix}:failed_job_ids_{$batchId}";
    }

    protected function getOrderedSetKeyForCreatedAt(): string
    {
        return "{$this->redisPrefix}:batches_by_created_at";
    }

    protected function getOrderedSetKeyForFinishedAt(): string
    {
        return "{$this->redisPrefix}:batches_by_finished_at";
    }

find() is a single hash lookup:

    public function find(string $batchId): ?Batch
    {
        $data = $this->connection->hgetall($this->getBatchKey($batchId));

        if (empty($data)) {
            return null;
        }

        return $this->redisDataToBatch($data);
    }

store() writes the initial hash and registers the batch in the created-at sorted set. Wrapped in a transaction so both writes are atomic:

    public function store(PendingBatch $batch): Batch
    {
        $id = (string) Str::orderedUuid();
        $now = Carbon::now();

        $this->connection->transaction(function ($tx) use ($id, $batch, $now) {
            $tx->zadd(
                $this->getOrderedSetKeyForCreatedAt(),
                $now->getTimestamp(),
                $id,
            );

            $tx->hmset($this->getBatchKey($id), [
                'id' => $id,
                'name' => $batch->name,
                'total_jobs' => 0,
                'pending_jobs' => 0,
                'failed_jobs' => 0,
                'options' => serialize($batch->options()),
                'created_at' => $now->getTimestamp(),
                'cancelled_at' => '',
                'finished_at' => '',
            ]);
        });

        return $this->find($id);
    }

incrementTotalJobs() is called when jobs are actually dispatched into the batch. It bumps both total_jobs and pending_jobs and clears finished_at in case the batch was previously marked done:

    public function incrementTotalJobs(string $batchId, int $amount): void
    {
        $this->connection->transaction(function ($tx) use ($batchId, $amount) {
            $tx->hincrby($this->getBatchKey($batchId), 'total_jobs', $amount);
            $tx->hincrby($this->getBatchKey($batchId), 'pending_jobs', $amount);
            $tx->hset($this->getBatchKey($batchId), 'finished_at', '');
        });
    }

decrementPendingJobs() removes a job from the pending count and from the failed IDs set (in case it was retried). It returns a counts object that Laravel uses to decide if the batch is done:

    public function decrementPendingJobs(string $batchId, string $jobId): UpdatedBatchJobCounts
    {
        $this->connection->transaction(function ($tx) use ($batchId, $jobId) {
            $tx->hincrby($this->getBatchKey($batchId), 'pending_jobs', -1);
            $tx->srem($this->getBatchFailedJobIdsKey($batchId), $jobId);
        });

        $counts = $this->connection->hmget(
            $this->getBatchKey($batchId),
            ['pending_jobs', 'failed_jobs'],
        );

        return new UpdatedBatchJobCounts(
            (int) ($counts[0] ?? 0),
            (int) ($counts[1] ?? 0),
        );
    }

incrementFailedJobs() bumps the failed counter and adds the job ID to the failed set:

    public function incrementFailedJobs(string $batchId, string $jobId): UpdatedBatchJobCounts
    {
        $this->connection->transaction(function ($tx) use ($batchId, $jobId) {
            $tx->hincrby($this->getBatchKey($batchId), 'failed_jobs', 1);
            $tx->sadd($this->getBatchFailedJobIdsKey($batchId), $jobId);
        });

        $counts = $this->connection->hmget(
            $this->getBatchKey($batchId),
            ['pending_jobs', 'failed_jobs'],
        );

        return new UpdatedBatchJobCounts(
            (int) ($counts[0] ?? 0),
            (int) ($counts[1] ?? 0),
        );
    }

markAsFinished() stamps the finished_at time and registers the batch in the finished-at sorted set for future pruning:

    public function markAsFinished(string $batchId): void
    {
        $now = Carbon::now();

        $this->connection->transaction(function ($tx) use ($batchId, $now) {
            $tx->hset($this->getBatchKey($batchId), 'finished_at', $now->getTimestamp());
            $tx->zadd($this->getOrderedSetKeyForFinishedAt(), $now->getTimestamp(), $batchId);
        });
    }

cancel() sets both cancelled_at and finished_at. Cancelled batches are considered finished for pruning purposes:

    public function cancel(string $batchId): void
    {
        $now = Carbon::now();

        $this->connection->transaction(function ($tx) use ($batchId, $now) {
            $tx->hset($this->getBatchKey($batchId), 'cancelled_at', $now->getTimestamp());
            $tx->hset($this->getBatchKey($batchId), 'finished_at', $now->getTimestamp());
            $tx->zadd($this->getOrderedSetKeyForFinishedAt(), $now->getTimestamp(), $batchId);
        });
    }

delete() cleans up all three keys:

    public function delete(string $batchId): void
    {
        $this->connection->pipeline(function ($pipe) use ($batchId) {
            $pipe->del($this->getBatchKey($batchId));
            $pipe->del($this->getBatchFailedJobIdsKey($batchId));
            $pipe->zrem($this->getOrderedSetKeyForCreatedAt(), $batchId);
            $pipe->zrem($this->getOrderedSetKeyForFinishedAt(), $batchId);
        });
    }

get() is the most involved. It paginates using the created-at sorted set, then pipelines hash and failed job ID fetches:

    public function get(int $limit, mixed $before): array
    {
        $beforeScore = $before
            ? Carbon::parse($before)->getTimestamp()
            : '+inf';

        $batchIds = $this->connection->zrevrangebyscore(
            $this->getOrderedSetKeyForCreatedAt(),
            $beforeScore,
            '-inf',
            ['limit' => [0, $limit]],
        );

        if (empty($batchIds)) {
            return [];
        }

        $hashes = $this->connection->pipeline(function ($pipe) use ($batchIds) {
            foreach ($batchIds as $id) {
                $pipe->hgetall($this->getBatchKey($id));
            }
        });

        $failedJobIdSets = $this->connection->pipeline(function ($pipe) use ($batchIds) {
            foreach ($batchIds as $id) {
                $pipe->smembers($this->getBatchFailedJobIdsKey($id));
            }
        });

        $batches = [];

        foreach ($hashes as $index => $hash) {
            if (empty($hash)) {
                continue;
            }

            $batches[] = $this->redisDataToBatch(
                $hash,
                $failedJobIdSets[$index] ?? [],
            );
        }

        return $batches;
    }

transaction() is used by Laravel internally when it needs to wrap multiple repository calls together. In our case, we delegate to the connection:

    public function transaction(Closure $callback): mixed
    {
        return $this->connection->transaction($callback);
    }

Converting the raw Redis hash back to a Batch instance:

    protected function redisDataToBatch(array $data, array $failedJobIds = []): Batch
    {
        return $this->factory->make(
            $this,
            $data['id'],
            $data['name'],
            (int) $data['total_jobs'],
            (int) $data['pending_jobs'],
            (int) $data['failed_jobs'],
            $failedJobIds,
            unserialize($data['options']),
            Carbon::createFromTimestamp($data['created_at']),
            $data['cancelled_at'] ? Carbon::createFromTimestamp($data['cancelled_at']) : null,
            $data['finished_at'] ? Carbon::createFromTimestamp($data['finished_at']) : null,
        );
    }

The Prune Command

Pruning uses the finished-at sorted set to find old batches by timestamp:

<?php

namespace App\Console\Commands;

use App\Queue\RedisBatchRepository;
use Carbon\Carbon;
use Illuminate\Bus\BatchRepository;
use Illuminate\Console\Command;

class PruneRedisBatches extends Command
{
    protected $signature = 'queue:prune-redis-batches
                            {--hours=24 : How many hours old a finished batch must be before pruning}
                            {--unfinished= : Prune unfinished batches older than this many hours}
                            {--cancelled= : Prune cancelled batches older than this many hours}';

    protected $description = 'Prune stale batch records from Redis.';

    public function handle(BatchRepository $repository): int
    {
        if (! $repository instanceof RedisBatchRepository) {
            $this->error('The batch repository is not a RedisBatchRepository.');
            return 1;
        }

        $pruned = $repository->prune(Carbon::now()->subHours((int) $this->option('hours')));
        $this->info("Pruned {$pruned} finished batches.");

        if ($hours = $this->option('unfinished')) {
            $pruned = $repository->pruneUnfinished(Carbon::now()->subHours((int) $hours));
            $this->info("Pruned {$pruned} unfinished batches.");
        }

        if ($hours = $this->option('cancelled')) {
            $pruned = $repository->pruneCancelled(Carbon::now()->subHours((int) $hours));
            $this->info("Pruned {$pruned} cancelled batches.");
        }

        return 0;
    }
}

The prune() method on the repository uses the finished-at sorted set, fetches the batche data, and deletes any matching entries:

    public function prune(DateTimeInterface $before): int
    {
        $cutoff = $before->getTimestamp();

        $batchIds = $this->connection->zrangebyscore(
            $this->getOrderedSetKeyForFinishedAt(),
            '-inf',
            $cutoff,
        );

        foreach ($batchIds as $id) {
            $this->delete($id);
        }

        return count($batchIds);
    }

pruneUnfinished() and pruneCancelled() work similarly but query the created-at set and filter based on the finished_at and cancelled_at fields respectively.

Wiring It Up

Extend Laravel's BusServiceProvider and override registerBatchServices():

<?php

namespace App\Providers;

use App\Queue\RedisBatchRepository;
use Illuminate\Bus\BusServiceProvider;
use Illuminate\Bus\DatabaseBatchRepository;
use Illuminate\Contracts\Bus\Dispatcher;
use Illuminate\Support\Facades\Redis;

class QueueBusServiceProvider extends BusServiceProvider
{
    protected function registerBatchServices(): void
    {
        $driver = config('queue.batching.driver', 'database');

        if ($driver !== 'redis') {
            parent::registerBatchServices();
            return;
        }

        $this->app->singleton(RedisBatchRepository::class, function ($app) {
            return new RedisBatchRepository(
                $app->make(\Illuminate\Bus\BatchFactory::class),
                Redis::connection(config('queue.batching.redis_connection', 'default')),
                config('queue.batching.table', 'job_batches'),
            );
        });

        $this->app->alias(RedisBatchRepository::class, \Illuminate\Bus\BatchRepository::class);
        $this->app->alias(RedisBatchRepository::class, \Illuminate\Bus\PrunableBatchRepository::class);
    }
}

In config/app.php, swap BusServiceProvider for yours:

'providers' => ServiceProvider::defaultProviders()
    ->replace([
        \Illuminate\Bus\BusServiceProvider::class => \App\Providers\QueueBusServiceProvider::class,
    ])
    ->toArray(),

Configuration

Add the batching block to config/queue.php:

'batching' => [
    'driver' => env('QUEUE_BATCH_DRIVER', 'database'),
    'redis_connection' => env('QUEUE_BATCH_REDIS_CONNECTION', 'default'),
    'table' => env('QUEUE_BATCH_TABLE', 'job_batches'),
],

Then in .env:

QUEUE_BATCH_DRIVER=redis
QUEUE_BATCH_REDIS_CONNECTION=default
QUEUE_BATCH_TABLE=job_batches

The table value becomes the key prefix in Redis, so job_batches means your keys will look like laravel_database_job_batches:batch_{id}.

When to Use This

This works well for transient batch data, which is most batch data. If you dispatch thousands of batches a day and query them mostly by ID (checking progress, running callbacks), Redis handles that faster and cheaper than a database table with millions of rows.

If you need long-term audit history of your batches, the database driver is still the right call. Redis is not an audit log. You can add TTLs to the keys if batches really pile up and you want Redis to clean up after itself, but that means you lose the data permanently once the TTL expires.

Schedule the prune command for the batches you do want to keep around for a while:

0 * * * * php /path/to/artisan queue:prune-redis-batches --hours=48

That keeps Redis from growing unbounded while giving you two days of history on completed batches.