Skip to main content

Message Broker & Event-Driven Architecture

Panduan lengkap implementasi Message Broker untuk event-driven architecture di MStore Backend.

🎯 Overview

MStore Backend mendukung 5 message broker untuk berbagai use case:
BrokerUse CasePerformancePersistence
KafkaEvent streaming, audit logsHighYes
NATSMicroservice communicationVery HighOptional
RabbitMQTask queues, workflowsMediumYes
MQTTIoT devices, real-time updatesHighOptional
Redis Pub/SubCache invalidation, notificationsVery HighNo
Package: pkg/utils/message_broker/

📊 Architecture


1️⃣ Kafka

Overview

Apache Kafka - Distributed event streaming platform Use Cases:
  • Event sourcing
  • Audit logs
  • Data pipeline
  • Real-time analytics

Quick Start

package main

import (
    "context"
    "github.com/segmentio/kafka-go"
    utils_mb "gitlab.com/mushola-store/mstore_backend/pkg/utils/message_broker"
)

func main() {
    brokers := []string{"localhost:9092"}
    topic := "transactions"
    groupID := "pos-api-consumer"
    
    // Start consumer
    ctx := context.Background()
    utils_mb.KafkaConsumer(ctx, brokers, topic, groupID, handleMessage)
}

func handleMessage(msg kafka.Message) {
    fmt.Printf("Received: %s\n", string(msg.Value))
    // Process message
}

Publish Message

func PublishTransactionEvent(ctx context.Context, transaction *Transaction) error {
    brokers := []string{"localhost:9092"}
    topic := "transactions"
    
    // Serialize payload
    payload, err := json.Marshal(transaction)
    if err != nil {
        return err
    }
    
    // Create message
    msg := kafka.Message{
        Key:   []byte(transaction.TransactionCode),
        Value: payload,
    }
    
    // Publish
    utils_mb.PublishKafkaMessage(ctx, brokers, topic, "", msg)
    return nil
}

Consumer with Error Handling

func StartKafkaConsumer(ctx context.Context) {
    brokers := []string{"localhost:9092"}
    topic := "transactions"
    groupID := "pos-api-consumer"
    
    utils_mb.KafkaConsumer(ctx, brokers, topic, groupID, func(msg kafka.Message) {
        // Parse message
        var transaction Transaction
        if err := json.Unmarshal(msg.Value, &transaction); err != nil {
            log.Error("Failed to parse message", "error", err)
            return
        }
        
        // Process transaction
        if err := processTransaction(ctx, &transaction); err != nil {
            log.Error("Failed to process transaction", "error", err)
            // TODO: Send to DLQ (Dead Letter Queue)
            return
        }
        
        log.Info("Transaction processed", "id", transaction.ID)
    })
}

2️⃣ NATS

Overview

NATS - High-performance messaging system Use Cases:
  • Microservice RPC
  • Request-reply pattern
  • Service discovery
  • Load balancing

Quick Start

import (
    "github.com/nats-io/nats.go"
    utils_mb "gitlab.com/mushola-store/mstore_backend/pkg/utils/message_broker"
)

func main() {
    // Connect to NATS
    nc, err := utils_mb.ConnectNATS("nats://localhost:4222")
    if err != nil {
        panic(err)
    }
    defer nc.Close()
    
    // Subscribe to subject
    nc.Subscribe("orders.created", handleOrder)
    
    // Keep alive
    select {}
}

func handleOrder(msg *nats.Msg) {
    fmt.Printf("Received order: %s\n", string(msg.Data))
    
    // Reply if needed
    msg.Respond([]byte("Order received"))
}

Publish-Subscribe

// Publisher
func PublishOrderCreated(nc *nats.Conn, order *Order) error {
    payload, _ := json.Marshal(order)
    return nc.Publish("orders.created", payload)
}

// Subscriber
func SubscribeOrderEvents(nc *nats.Conn) {
    nc.Subscribe("orders.*", func(msg *nats.Msg) {
        log.Info("Order event", "subject", msg.Subject, "data", string(msg.Data))
    })
}

Request-Reply Pattern

// Server (Responder)
func StartOrderService(nc *nats.Conn) {
    nc.Subscribe("order.get", func(msg *nats.Msg) {
        // Parse request
        var req GetOrderRequest
        json.Unmarshal(msg.Data, &req)
        
        // Get order
        order, err := getOrder(req.OrderID)
        if err != nil {
            msg.Respond([]byte(`{"error": "Order not found"}`))
            return
        }
        
        // Send response
        resp, _ := json.Marshal(order)
        msg.Respond(resp)
    })
}

// Client (Requester)
func GetOrderViaNA TS(nc *nats.Conn, orderID string) (*Order, error) {
    req := GetOrderRequest{OrderID: orderID}
    payload, _ := json.Marshal(req)
    
    // Request with timeout
    msg, err := nc.Request("order.get", payload, 5*time.Second)
    if err != nil {
        return nil, err
    }
    
    // Parse response
    var order Order
    json.Unmarshal(msg.Data, &order)
    return &order, nil
}

3️⃣ RabbitMQ

Overview

RabbitMQ - Message queue broker Use Cases:
  • Task queues
  • Work distribution
  • Delayed jobs
  • Priority queues

Quick Start

import (
    "github.com/streadway/amqp"
    utils_mb "gitlab.com/mushola-store/mstore_backend/pkg/utils/message_broker"
)

func main() {
    // Connect
    conn, err := utils_mb.ConnectRabbitMQ("amqp://guest:guest@localhost:5672/")
    if err != nil {
        panic(err)
    }
    defer conn.Close()
    
    // Create channel
    ch, _ := conn.Channel()
    defer ch.Close()
    
    // Declare queue
    q, _ := ch.QueueDeclare("tasks", true, false, false, false, nil)
    
    // Consume
    msgs, _ := ch.Consume(q.Name, "", false, false, false, false, nil)
    
    for msg := range msgs {
        processTask(msg.Body)
        msg.Ack(false)
    }
}

Task Queue Pattern

// Producer
func EnqueueTask(ch *amqp.Channel, task *Task) error {
    payload, _ := json.Marshal(task)
    
    return ch.Publish(
        "",      // exchange
        "tasks", // routing key
        false,   // mandatory
        false,   // immediate
        amqp.Publishing{
            DeliveryMode: amqp.Persistent,
            ContentType:  "application/json",
            Body:         payload,
        },
    )
}

// Consumer (Worker)
func StartWorker(ch *amqp.Channel) {
    // Set prefetch count (QoS)
    ch.Qos(1, 0, false)
    
    msgs, _ := ch.Consume("tasks", "", false, false, false, false, nil)
    
    for msg := range msgs {
        var task Task
        json.Unmarshal(msg.Body, &task)
        
        // Process task
        if err := processTask(&task); err != nil {
            log.Error("Task failed", "error", err)
            msg.Nack(false, true) // Requeue
        } else {
            msg.Ack(false) // Acknowledge
        }
    }
}

4️⃣ MQTT

Overview

MQTT - Lightweight pub/sub protocol for IoT Use Cases:
  • IoT device communication
  • Real-time sensor data
  • Mobile push notifications
  • Live updates

Quick Start

import (
    mqtt "github.com/eclipse/paho.mqtt.golang"
    utils_mb "gitlab.com/mushola-store/mstore_backend/pkg/utils/message_broker"
)

func main() {
    // Connect
    client := utils_mb.ConnectMQTT("tcp://localhost:1883", "pos-api")
    
    // Subscribe
    client.Subscribe("devices/+/status", 0, handleDeviceStatus)
    
    // Keep alive
    select {}
}

func handleDeviceStatus(client mqtt.Client, msg mqtt.Message) {
    fmt.Printf("Device: %s, Status: %s\n", msg.Topic(), string(msg.Payload()))
}

Publish Device Events

func PublishDeviceEvent(client mqtt.Client, deviceID string, event *DeviceEvent) error {
    topic := fmt.Sprintf("devices/%s/events", deviceID)
    payload, _ := json.Marshal(event)
    
    token := client.Publish(topic, 0, false, payload)
    token.Wait()
    return token.Error()
}

5️⃣ Redis Pub/Sub

Overview

Redis Pub/Sub - In-memory pub/sub Use Cases:
  • Cache invalidation
  • Real-time notifications
  • Session updates
  • Broadcast messages

Quick Start

import (
    "github.com/redis/go-redis/v9"
    utils_cache "gitlab.com/mushola-store/mstore_backend/pkg/utils/cache"
)

func main() {
    // Get Redis connection
    redis := utils_cache.GetRedisConnection()
    
    // Subscribe
    pubsub := redis.Subscribe(ctx, "cache:invalidate")
    defer pubsub.Close()
    
    ch := pubsub.Channel()
    for msg := range ch {
        handleCacheInvalidation(msg.Payload)
    }
}

Cache Invalidation Pattern

// Publisher (when data changes)
func InvalidateCache(redis *redis.Client, key string) error {
    payload := map[string]interface{}{
        "key": key,
        "timestamp": time.Now().Unix(),
    }
    data, _ := json.Marshal(payload)
    
    return redis.Publish(ctx, "cache:invalidate", data).Err()
}

// Subscriber (cache manager)
func StartCacheInvalidationListener(redis *redis.Client) {
    pubsub := redis.Subscribe(ctx, "cache:invalidate")
    
    for msg := range pubsub.Channel() {
        var payload map[string]interface{}
        json.Unmarshal([]byte(msg.Payload), &payload)
        
        key := payload["key"].(string)
        redis.Del(ctx, key)
        
        log.Info("Cache invalidated", "key", key)
    }
}

🎯 Use Case Examples

1. Transaction Event Streaming (Kafka)

// When transaction is created
func (s *TransactionService) CreateTransaction(ctx context.Context, payload PayloadCreateTransaction) (*Transaction, error) {
    // Create transaction
    tx, err := s.repo.Create(ctx, payload)
    if err != nil {
        return nil, err
    }
    
    // Publish event to Kafka
    go func() {
        event := TransactionCreatedEvent{
            TransactionID: tx.ID,
            BranchID: tx.BranchID,
            Amount: tx.GrandTotal,
            Timestamp: time.Now(),
        }
        s.kafka.PublishTransactionEvent(context.Background(), &event)
    }()
    
    return tx, nil
}

2. Microservice Communication (NATS)

// Inventory Service
func (s *InventoryService) ReserveStock(ctx context.Context, productID uint64, qty int) error {
    // Reserve stock logic
    // ...
    
    // Notify other services
    event := StockReservedEvent{
        ProductID: productID,
        Quantity: qty,
        ReservedAt: time.Now(),
    }
    s.nats.Publish("inventory.reserved", event)
    
    return nil
}

// Order Service (subscriber)
func (s *OrderService) HandleStockReserved(msg *nats.Msg) {
    var event StockReservedEvent
    json.Unmarshal(msg.Data, &event)
    
    // Update order status
    s.updateOrderStatus(event.ProductID, "stock_reserved")
}

3. Background Job Processing (RabbitMQ)

// Enqueue email job
func (s *EmailService) SendTransactionReceipt(ctx context.Context, txID uint64) error {
    job := EmailJob{
        Type: "transaction_receipt",
        TransactionID: txID,
        Priority: 1,
    }
    
    return s.rabbitmq.EnqueueJob("emails", &job)
}

// Worker processes jobs
func (w *EmailWorker) Start() {
    for job := range w.rabbitmq.ConsumeJobs("emails") {
        switch job.Type {
        case "transaction_receipt":
            w.sendTransactionReceipt(job.TransactionID)
        case "order_confirmation":
            w.sendOrderConfirmation(job.OrderID)
        }
    }
}

4. IoT Device Updates (MQTT)

// POS device publishes status
func (d *POSDevice) PublishStatus() {
    status := DeviceStatus{
        DeviceID: d.ID,
        Online: true,
        BatteryLevel: d.GetBatteryLevel(),
        LastSeen: time.Now(),
    }
    
    d.mqtt.Publish(fmt.Sprintf("devices/%s/status", d.ID), status)
}

// Server subscribes to device updates
func (s *DeviceService) MonitorDevices() {
    s.mqtt.Subscribe("devices/+/status", func(msg mqtt.Message) {
        var status DeviceStatus
        json.Unmarshal(msg.Payload(), &status)
        
        s.updateDeviceStatus(status.DeviceID, &status)
    })
}

5. Real-time Notifications (Redis Pub/Sub)

// Publish notification
func (s *NotificationService) NotifyUser(userID uint64, message string) error {
    notification := Notification{
        UserID: userID,
        Message: message,
        Timestamp: time.Now(),
    }
    
    payload, _ := json.Marshal(notification)
    return s.redis.Publish(ctx, fmt.Sprintf("user:%d:notifications", userID), payload).Err()
}

// WebSocket server subscribes
func (ws *WebSocketServer) SubscribeUserNotifications(userID uint64) {
    pubsub := ws.redis.Subscribe(ctx, fmt.Sprintf("user:%d:notifications", userID))
    
    for msg := range pubsub.Channel() {
        // Send to WebSocket client
        ws.SendToClient(userID, msg.Payload)
    }
}

📚 Best Practices

1. Message Idempotency

// ✅ GOOD - Idempotent message handling
func handleMessage(msg kafka.Message) {
    messageID := string(msg.Key)
    
    // Check if already processed
    if cache.Exists(messageID) {
        log.Info("Message already processed", "id", messageID)
        return
    }
    
    // Process message
    processMessage(msg)
    
    // Mark as processed
    cache.Set(messageID, true, 24*time.Hour)
}

2. Error Handling & Retry

// ✅ GOOD - Retry with exponential backoff
func processWithRetry(msg kafka.Message, maxRetries int) error {
    for i := 0; i < maxRetries; i++ {
        err := processMessage(msg)
        if err == nil {
            return nil
        }
        
        // Exponential backoff
        backoff := time.Duration(math.Pow(2, float64(i))) * time.Second
        time.Sleep(backoff)
    }
    
    // Send to DLQ after max retries
    sendToDLQ(msg)
    return errors.New("max retries exceeded")
}

3. Graceful Shutdown

// ✅ GOOD - Graceful shutdown
func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    // Start consumer
    go startKafkaConsumer(ctx)
    
    // Wait for interrupt signal
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
    <-sigChan
    
    // Cancel context to stop consumers
    cancel()
    
    // Wait for graceful shutdown
    time.Sleep(5 * time.Second)
}

OpenTelemetry Tracing

Distributed tracing for message flows

System Design

Event-driven architecture

Cache & Redis

Redis caching strategies

Message Brokers: Pilih broker sesuai use case. Kafka untuk event streaming, NATS untuk RPC, RabbitMQ untuk task queues, MQTT untuk IoT, Redis Pub/Sub untuk real-time notifications.