Паттерн "Transactional Outbox" — это надежный способ обеспечения согласованности данных между микросервисами и другими системами, особенно в системах с распределенными транзакциями. В Laravel этот паттерн можно реализовать следующим образом:
1. Создание таблицы для хранения событий (outbox_messages)
Первый шаг – создать миграцию, которая создаст таблицу для хранения исходящих сообщений или событий:
<?php
declare(strict_types=1);
use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;
return new class () extends Migration {
public function up(): void
{
Schema::create('outbox_messages', function (Blueprint $table): void {
$table->id();
$table->string('job_class');
$table->json('payload');
$table->integer('attempts')->default(0);
$table->longText('description')->nullable();
$table->string('status')->default(\App\Enums\OutboxMessageStatusEnum::PENDING);
$table->string('queue_type')->default(\App\Enums\QueueTypeEnum::DEFAULT);
$table->timestamps();
});
}
public function down(): void
{
Schema::dropIfExists('outbox_messages');
}
};
Таким образом, мы будем хранить класс задания (job_class), полезную нагрузку (payload), количество попыток доставки, описание ошибок, статус и тип очереди.
2. Создание модели OutboxMessage
Для удобной работы с таблицей создайте модель, которая определяет, какие поля доступны для массового заполнения и их приведение типов.
<?php
declare(strict_types=1);
namespace App\Models;
use App\Enums\OutboxMessageStatusEnum;
use App\Enums\QueueTypeEnum;
use Database\Factories\OutboxMessageFactory;
use Illuminate\Database\Eloquent\Factories\HasFactory;
use Illuminate\Database\Eloquent\Model;
class OutboxMessage extends Model
{
/** @use HasFactory<OutboxMessageFactory> */
use HasFactory;
protected $fillable = [
'job_class',
'payload',
'attempts',
'description',
'status',
'queue_type',
];
protected function casts(): array
{
return [
'payload' => 'array',
'attempts' => 'int',
'status' => OutboxMessageStatusEnum::class,
'queue_type' => QueueTypeEnum::class,
];
}
}
3. Определение Enum для статусов и типов очередей
Для контроля статуса сообщения и типа очереди удобно использовать Enum. Пример двух перечислений:
<?php
declare(strict_types=1);
namespace App\Enums;
enum OutboxMessageStatusEnum: string
{
case PENDING = 'pending';
case SENT = 'sent';
case ERROR = 'error';
}
– QueueTypeEnum:
<?php
declare(strict_types=1);
namespace App\Enums;
enum QueueTypeEnum: string
{
case DEFAULT = 'default';
case BILLY = 'invoice';
}
4. Создание DTO для сообщений Outbox
Чтобы удобно передавать данные события, создайте Data Transfer Object (DTO):
<?php
declare(strict_types=1);
namespace App\Data\OutboxMessage;
use App\Enums\OutboxMessageStatusEnum;
use App\Enums\QueueTypeEnum;
use Illuminate\Contracts\Queue\ShouldQueue;
use Spatie\LaravelData\Data;
final class OutboxMessageData extends Data
{
/**
* @param class-string<ShouldQueue> $job_class
* @param array<string, scalar> $payload
*/
public function __construct(
public string $job_class,
public array $payload,
public int $attempts = 0,
public OutboxMessageStatusEnum $status = OutboxMessageStatusEnum::PENDING,
public QueueTypeEnum $queue_type = QueueTypeEnum::DEFAULT,
public ?int $id = null,
public ?string $description = null,
) {}
/**
* @return array<string, mixed>
*/
public function toEntityArray(): array
{
$data = $this->toArray();
unset($data['id']);
return $data;
}
}
5. Action для создания сообщения Outbox
Опишем действие (Action), которое сохранит сообщение в базу:
<?php
declare(strict_types=1);
namespace App\Actions\OutboxMessage;
use App\Data\OutboxMessage\OutboxMessageData;
use App\Models\OutboxMessage;
use Lorisleiva\Actions\Concerns\AsAction;
/**
* @method static OutboxMessageData run(OutboxMessageData $outboxMessageData)
*/
final class CreateOutboxMessage
{
use AsAction;
public function handle(OutboxMessageData $message): OutboxMessageData
{
$messageModel = OutboxMessage::create($message->toEntityArray());
$message->id = $messageModel->id;
return $message;
}
}
6. Создание Job для отправки событий
Теперь создадим класс Laravel Job, который будет выполнять отправку сообщения. В данном примере событие связано, например, с обновлением счёта:
<?php
declare(strict_types=1);
namespace App\Jobs;
use App\Actions\Invoices\UpdateInvoiceFromBilly;
use App\Actions\OutboxMessage\CreateOutboxMessage;
use App\Data\OutboxMessage\OutboxMessageData;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
final class InvoiceUpdatedQueueJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function __construct(
public int $id
) {}
public static function dispatchOutbox(int $id): void
{
CreateOutboxMessage::run(new OutboxMessageData(
job_class: self::class,
payload: ['id' => $id],
));
}
public function handle(): void
{
app(UpdateInvoiceFromBilly::class)->handle($this->id);
}
}
Обратите внимание на метод dispatchOutbox – благодаря ему, при вызове InvoiceUpdatedQueueJob::dispatchOutbox($id), сообщение сохраняется в таблице outbox_messages.
7. Обработка сообщений Outbox
Чтобы отправлять события, накопленные в таблице, можно создать консольную команду. Пример команды, которая извлекает все ожидающие сообщения и отправляет их в RabbitMQ, выглядит следующим образом:
<?php
declare(strict_types=1);
namespace App\Console\Commands\OutboxMessage;
use App\Actions\OutboxMessage\DispatchOutboxMessage;
use App\Actions\OutboxMessage\GetAllPendingMessages;
use App\Data\OutboxMessage\OutboxMessageData;
use Illuminate\Console\Command;
final class OutboxProcessCommand extends Command
{
protected $signature = 'outbox:process';
protected $description = 'Send Outbox messages to Rabbit MQ';
public function handle(): void
{
$messages = GetAllPendingMessages::run();
$this->withProgressBar($messages, function (OutboxMessageData $messageData): void {
DispatchOutboxMessage::run($messageData);
});
}
}
Action DispatchOutboxMessage отвечает за попытки отправки, обработку ошибок и обновление статуса сообщения. Пример реализации DispatchOutboxMessage можно увидеть в исходном коде.
8. Дополнительные команды
Чтобы поддерживать чистоту таблицы, можно добавить команду по удалению старых сообщений:
<?php
declare(strict_types=1);
namespace App\Actions\OutboxMessage;
use App\Enums\OutboxMessageStatusEnum;
use App\Models\OutboxMessage;
use Carbon\Carbon;
use Lorisleiva\Actions\Concerns\AsAction;
/**
* @method static void run()
*/
final class DeleteOldEvents
{
use AsAction;
public const int DAYS_TO_KEEP = 60;
public function handle(): void
{
OutboxMessage::where('status', OutboxMessageStatusEnum::SENT)
->where('created_at', '<', Carbon::now()->subDays(self::DAYS_TO_KEEP))
->delete();
}
}
А также консольную команду для удаления старых сообщений:
<?php
declare(strict_types=1);
namespace App\Console\Commands\OutboxMessage;
use App\Actions\OutboxMessage\DeleteOldEvents;
use Illuminate\Console\Command;
final class MessagesDeleteOldCommand extends Command
{
protected $signature = 'messages:delete-old';
protected $description = 'Delete old outbox messages';
public function handle(): void
{
DeleteOldEvents::run();
$this->info('Messages has been deleted');
}
}
Если потребуется повторить отправку конкретного события, можно реализовать команду OutboxRetryCommand:
<?php
declare(strict_types=1);
namespace App\Console\Commands\OutboxMessage;
use App\Actions\OutboxMessage\DispatchOutboxMessage;
use App\Actions\OutboxMessage\GetOutboxMessageById;
use Illuminate\Console\Command;
final class OutboxRetryCommand extends Command
{
protected $signature = 'outbox:retry {id}';
protected $description = 'Retry event by its ID';
public function handle(): void
{
$id = (int) $this->argument('id');
$message = GetOutboxMessageById::run($id);
DispatchOutboxMessage::run($message);
}
}
Заключение
В данной статье мы рассмотрели один из подходов к реализации Transactional Outbox в Laravel. Паттерн Outbox Messages позволяет:
- Гарантировать доставку событий даже при временной недоступности брокера сообщений (RabbitMQ);
- Хранить историю событий для дальнейшего анализа и воспроизведения;
- Исключить рассинхронизацию между базой данных и очередью.
Запустите консольную команду (например, по крону или schedule) для периодической обработки накопленных сообщений. При необходимости добавьте команды по очистке истории или повторной отправке событий.
Надеюсь, эта инструкция поможет вам внедрить надежное и отказоустойчивое взаимодействие между вашими приложениями. Если у вас возникнут вопросы или предложения – пишите в комментариях или свяжитесь со мной напрямую!