Sitemap

Laravel ve RabbitMQ Entegrasyonunu

3 min readNov 5, 2024

Laravel ve RabbitMQ entegrasyonunu detaylıca inceleyelim:

1. RabbitMQ Nedir?

  • Açık kaynaklı bir mesaj kuyruğu (message queue) sistemidir
  • AMQP (Advanced Message Queuing Protocol) protokolünü kullanır
  • Asenkron iletişim sağlar
  • Dağıtık sistemler için güvenilir mesajlaşma altyapısı sunar

2. Ne Zaman Kullanılır?

Örnek senaryolar:

- Mikroservisler arası iletişim
- Arka plan işlemleri (background jobs)
- Real-time bildirimler
- Yük dengeleme (load balancing)
- Servisler arası veri senkronizasyonu
- Uzun süren işlemlerin yönetimi

3. Temel RabbitMQ Kavramları:

Producer    ->    Exchange    ->    Queue    ->    Consumer
(Üretici) (Değiştirici) (Kuyruk) (Tüketici)

a) Exchange Types:

// Direct Exchange
$channel->exchange_declare('direct_exchange', 'direct');

// Topic Exchange
$channel->exchange_declare('topic_exchange', 'topic');

// Fanout Exchange
$channel->exchange_declare('fanout_exchange', 'fanout');

// Headers Exchange
$channel->exchange_declare('headers_exchange', 'headers');

4. Laravel RabbitMQ Entegrasyonu:

a) Kurulum:

# RabbitMQ paketi kurulumu
composer require php-amqplib/php-amqplib

# Laravel Queue paketi (eğer yoksa)
composer require vladimir-yuldashev/laravel-queue-rabbitmq

b) Konfigürasyon:

// config/queue.php
'connections' => [
'rabbitmq' => [
'driver' => 'rabbitmq',
'host' => env('RABBITMQ_HOST', 'localhost'),
'port' => env('RABBITMQ_PORT', 5672),
'vhost' => env('RABBITMQ_VHOST', '/'),
'login' => env('RABBITMQ_LOGIN', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'queue' => env('RABBITMQ_QUEUE', 'default'),

// Exchange ayarları
'exchange_declare' => true,
'exchange_type' => 'direct',
'exchange_name' => env('RABBITMQ_EXCHANGE_NAME', 'default_exchange'),

// Queue ayarları
'queue_declare' => true,
'queue_declare_bind' => true,

// Diğer özellikler
'worker_sleep_time' => 3
],
],

5. Job Oluşturma ve Gönderme:

a) Job Sınıfı:

class ProcessOrder implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

private $order;

public function __construct(Order $order)
{
$this->order = $order;
}

public function handle()
{
// İş mantığı
logger()->info('Processing order: ' . $this->order->id);
}
}

b) Job’ı Kuyruğa Gönderme:

// Temel gönderim
ProcessOrder::dispatch($order)->onQueue('orders');

// Gecikmeli gönderim
ProcessOrder::dispatch($order)
->onQueue('orders')
->delay(now()->addMinutes(5));

// Öncelikli gönderim
ProcessOrder::dispatch($order)
->onQueue('high_priority_orders');

6. Worker ve Consumer Yönetimi:

a) Worker Başlatma:

# Temel worker başlatma
php artisan queue:work rabbitmq

# Belirli bir queue için worker
php artisan queue:work rabbitmq --queue=orders

# Supervisor konfigürasyonu
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work rabbitmq --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600

7. Error Handling ve Retry Mekanizması:

class ProcessOrder implements ShouldQueue
{
public $tries = 3; // Maximum deneme sayısı
public $backoff = [60, 180, 360]; // Artan bekleme süreleri

public function handle()
{
try {
// İş mantığı
} catch (Exception $e) {
logger()->error('Order processing failed', [
'order' => $this->order->id,
'error' => $e->getMessage()
]);

throw $e;
}
}

public function failed($exception)
{
// Hata durumunda yapılacaklar
}
}

8. Advanced RabbitMQ Özellikleri:

a) Message Priority:

// Yüksek öncelikli mesaj gönderme
$job = (new ProcessOrder($order))
->onQueue('orders')
->withMessageProperties([
'priority' => 10
]);

dispatch($job);

b) Dead Letter Exchange:

// config/queue.php
'rabbitmq' => [
'dead_letter_exchange' => 'dlx',
'dead_letter_routing_key' => 'failed_orders'
]

c) Exchange ve Queue Bağlama:

// Binding oluşturma
$channel->queue_bind($queue_name, $exchange_name, $routing_key);

// Topic exchange örneği
$channel->queue_bind('error_queue', 'logs', 'error.*');
$channel->queue_bind('all_logs_queue', 'logs', '#');

9. Monitoring ve Debugging:

a) RabbitMQ Management Interface:

# Management plugin aktifleştirme
rabbitmq-plugins enable rabbitmq_management

# Web arayüzü: http://localhost:15672

b) Laravel Log Entegrasyonu:

\Log::channel('rabbitmq')->info('Message sent', [
'queue' => 'orders',
'payload' => $data
]);

10. Best Practices:

Queue İsimlendirme:

// Anlamlı queue isimleri kullanın
const QUEUE_NAMES = [
'high_priority_orders' => 'orders.high',
'normal_orders' => 'orders.normal',
'low_priority_orders' => 'orders.low'
];

Message Serialization:

class ProcessOrder implements ShouldQueue
{
public function __sleep()
{
// Sadece gerekli propertyler serialize edilsin
return ['orderId'];
}
}

Circuit Breaker Pattern:

class ProcessOrder implements ShouldQueue
{
public function handle()
{
if (!$this->isServiceHealthy()) {
$this->release(30); // 30 saniye sonra tekrar dene
return;
}

// İş mantığı
}
}

11. Testing:

class RabbitMQTest extends TestCase
{
public function test_order_processing()
{
Queue::fake();

// İşlemi tetikle
ProcessOrder::dispatch($order);

// Queue'ya gönderildiğini kontrol et
Queue::assertPushedOn('orders', ProcessOrder::class);

// Job içeriğini kontrol et
Queue::assertPushed(ProcessOrder::class, function ($job) use ($order) {
return $job->order->id === $order->id;
});
}
}

--

--

No responses yet