Message Broker & Event-Driven Architecture
Panduan lengkap implementasi Message Broker untuk event-driven architecture di MStore Backend.
🎯 Overview
MStore Backend mendukung 5 message broker untuk berbagai use case:
Broker Use Case Performance Persistence Kafka Event streaming, audit logs High Yes NATS Microservice communication Very High Optional RabbitMQ Task queues, workflows Medium Yes MQTT IoT devices, real-time updates High Optional Redis Pub/Sub Cache invalidation, notifications Very High No
Package : pkg/utils/message_broker/
📊 Architecture
1️⃣ Kafka
Overview
Apache Kafka - Distributed event streaming platform
Use Cases :
Event sourcing
Audit logs
Data pipeline
Real-time analytics
Quick Start
package main
import (
" context "
" github.com/segmentio/kafka-go "
utils_mb " gitlab.com/mushola-store/mstore_backend/pkg/utils/message_broker "
)
func main () {
brokers := [] string { "localhost:9092" }
topic := "transactions"
groupID := "pos-api-consumer"
// Start consumer
ctx := context . Background ()
utils_mb . KafkaConsumer ( ctx , brokers , topic , groupID , handleMessage )
}
func handleMessage ( msg kafka . Message ) {
fmt . Printf ( "Received: %s \n " , string ( msg . Value ))
// Process message
}
Publish Message
func PublishTransactionEvent ( ctx context . Context , transaction * Transaction ) error {
brokers := [] string { "localhost:9092" }
topic := "transactions"
// Serialize payload
payload , err := json . Marshal ( transaction )
if err != nil {
return err
}
// Create message
msg := kafka . Message {
Key : [] byte ( transaction . TransactionCode ),
Value : payload ,
}
// Publish
utils_mb . PublishKafkaMessage ( ctx , brokers , topic , "" , msg )
return nil
}
Consumer with Error Handling
func StartKafkaConsumer ( ctx context . Context ) {
brokers := [] string { "localhost:9092" }
topic := "transactions"
groupID := "pos-api-consumer"
utils_mb . KafkaConsumer ( ctx , brokers , topic , groupID , func ( msg kafka . Message ) {
// Parse message
var transaction Transaction
if err := json . Unmarshal ( msg . Value , & transaction ); err != nil {
log . Error ( "Failed to parse message" , "error" , err )
return
}
// Process transaction
if err := processTransaction ( ctx , & transaction ); err != nil {
log . Error ( "Failed to process transaction" , "error" , err )
// TODO: Send to DLQ (Dead Letter Queue)
return
}
log . Info ( "Transaction processed" , "id" , transaction . ID )
})
}
2️⃣ NATS
Overview
NATS - High-performance messaging system
Use Cases :
Microservice RPC
Request-reply pattern
Service discovery
Load balancing
Quick Start
import (
" github.com/nats-io/nats.go "
utils_mb " gitlab.com/mushola-store/mstore_backend/pkg/utils/message_broker "
)
func main () {
// Connect to NATS
nc , err := utils_mb . ConnectNATS ( "nats://localhost:4222" )
if err != nil {
panic ( err )
}
defer nc . Close ()
// Subscribe to subject
nc . Subscribe ( "orders.created" , handleOrder )
// Keep alive
select {}
}
func handleOrder ( msg * nats . Msg ) {
fmt . Printf ( "Received order: %s \n " , string ( msg . Data ))
// Reply if needed
msg . Respond ([] byte ( "Order received" ))
}
Publish-Subscribe
// Publisher
func PublishOrderCreated ( nc * nats . Conn , order * Order ) error {
payload , _ := json . Marshal ( order )
return nc . Publish ( "orders.created" , payload )
}
// Subscriber
func SubscribeOrderEvents ( nc * nats . Conn ) {
nc . Subscribe ( "orders.*" , func ( msg * nats . Msg ) {
log . Info ( "Order event" , "subject" , msg . Subject , "data" , string ( msg . Data ))
})
}
Request-Reply Pattern
// Server (Responder)
func StartOrderService ( nc * nats . Conn ) {
nc . Subscribe ( "order.get" , func ( msg * nats . Msg ) {
// Parse request
var req GetOrderRequest
json . Unmarshal ( msg . Data , & req )
// Get order
order , err := getOrder ( req . OrderID )
if err != nil {
msg . Respond ([] byte ( `{"error": "Order not found"}` ))
return
}
// Send response
resp , _ := json . Marshal ( order )
msg . Respond ( resp )
})
}
// Client (Requester)
func GetOrderViaNA TS ( nc * nats . Conn , orderID string ) ( * Order , error ) {
req := GetOrderRequest { OrderID : orderID }
payload , _ := json . Marshal ( req )
// Request with timeout
msg , err := nc . Request ( "order.get" , payload , 5 * time . Second )
if err != nil {
return nil , err
}
// Parse response
var order Order
json . Unmarshal ( msg . Data , & order )
return & order , nil
}
3️⃣ RabbitMQ
Overview
RabbitMQ - Message queue broker
Use Cases :
Task queues
Work distribution
Delayed jobs
Priority queues
Quick Start
import (
" github.com/streadway/amqp "
utils_mb " gitlab.com/mushola-store/mstore_backend/pkg/utils/message_broker "
)
func main () {
// Connect
conn , err := utils_mb . ConnectRabbitMQ ( "amqp://guest:guest@localhost:5672/" )
if err != nil {
panic ( err )
}
defer conn . Close ()
// Create channel
ch , _ := conn . Channel ()
defer ch . Close ()
// Declare queue
q , _ := ch . QueueDeclare ( "tasks" , true , false , false , false , nil )
// Consume
msgs , _ := ch . Consume ( q . Name , "" , false , false , false , false , nil )
for msg := range msgs {
processTask ( msg . Body )
msg . Ack ( false )
}
}
Task Queue Pattern
// Producer
func EnqueueTask ( ch * amqp . Channel , task * Task ) error {
payload , _ := json . Marshal ( task )
return ch . Publish (
"" , // exchange
"tasks" , // routing key
false , // mandatory
false , // immediate
amqp . Publishing {
DeliveryMode : amqp . Persistent ,
ContentType : "application/json" ,
Body : payload ,
},
)
}
// Consumer (Worker)
func StartWorker ( ch * amqp . Channel ) {
// Set prefetch count (QoS)
ch . Qos ( 1 , 0 , false )
msgs , _ := ch . Consume ( "tasks" , "" , false , false , false , false , nil )
for msg := range msgs {
var task Task
json . Unmarshal ( msg . Body , & task )
// Process task
if err := processTask ( & task ); err != nil {
log . Error ( "Task failed" , "error" , err )
msg . Nack ( false , true ) // Requeue
} else {
msg . Ack ( false ) // Acknowledge
}
}
}
4️⃣ MQTT
Overview
MQTT - Lightweight pub/sub protocol for IoT
Use Cases :
IoT device communication
Real-time sensor data
Mobile push notifications
Live updates
Quick Start
import (
mqtt " github.com/eclipse/paho.mqtt.golang "
utils_mb " gitlab.com/mushola-store/mstore_backend/pkg/utils/message_broker "
)
func main () {
// Connect
client := utils_mb . ConnectMQTT ( "tcp://localhost:1883" , "pos-api" )
// Subscribe
client . Subscribe ( "devices/+/status" , 0 , handleDeviceStatus )
// Keep alive
select {}
}
func handleDeviceStatus ( client mqtt . Client , msg mqtt . Message ) {
fmt . Printf ( "Device: %s , Status: %s \n " , msg . Topic (), string ( msg . Payload ()))
}
Publish Device Events
func PublishDeviceEvent ( client mqtt . Client , deviceID string , event * DeviceEvent ) error {
topic := fmt . Sprintf ( "devices/ %s /events" , deviceID )
payload , _ := json . Marshal ( event )
token := client . Publish ( topic , 0 , false , payload )
token . Wait ()
return token . Error ()
}
5️⃣ Redis Pub/Sub
Overview
Redis Pub/Sub - In-memory pub/sub
Use Cases :
Cache invalidation
Real-time notifications
Session updates
Broadcast messages
Quick Start
import (
" github.com/redis/go-redis/v9 "
utils_cache " gitlab.com/mushola-store/mstore_backend/pkg/utils/cache "
)
func main () {
// Get Redis connection
redis := utils_cache . GetRedisConnection ()
// Subscribe
pubsub := redis . Subscribe ( ctx , "cache:invalidate" )
defer pubsub . Close ()
ch := pubsub . Channel ()
for msg := range ch {
handleCacheInvalidation ( msg . Payload )
}
}
Cache Invalidation Pattern
// Publisher (when data changes)
func InvalidateCache ( redis * redis . Client , key string ) error {
payload := map [ string ] interface {}{
"key" : key ,
"timestamp" : time . Now (). Unix (),
}
data , _ := json . Marshal ( payload )
return redis . Publish ( ctx , "cache:invalidate" , data ). Err ()
}
// Subscriber (cache manager)
func StartCacheInvalidationListener ( redis * redis . Client ) {
pubsub := redis . Subscribe ( ctx , "cache:invalidate" )
for msg := range pubsub . Channel () {
var payload map [ string ] interface {}
json . Unmarshal ([] byte ( msg . Payload ), & payload )
key := payload [ "key" ].( string )
redis . Del ( ctx , key )
log . Info ( "Cache invalidated" , "key" , key )
}
}
🎯 Use Case Examples
1. Transaction Event Streaming (Kafka)
// When transaction is created
func ( s * TransactionService ) CreateTransaction ( ctx context . Context , payload PayloadCreateTransaction ) ( * Transaction , error ) {
// Create transaction
tx , err := s . repo . Create ( ctx , payload )
if err != nil {
return nil , err
}
// Publish event to Kafka
go func () {
event := TransactionCreatedEvent {
TransactionID : tx . ID ,
BranchID : tx . BranchID ,
Amount : tx . GrandTotal ,
Timestamp : time . Now (),
}
s . kafka . PublishTransactionEvent ( context . Background (), & event )
}()
return tx , nil
}
2. Microservice Communication (NATS)
// Inventory Service
func ( s * InventoryService ) ReserveStock ( ctx context . Context , productID uint64 , qty int ) error {
// Reserve stock logic
// ...
// Notify other services
event := StockReservedEvent {
ProductID : productID ,
Quantity : qty ,
ReservedAt : time . Now (),
}
s . nats . Publish ( "inventory.reserved" , event )
return nil
}
// Order Service (subscriber)
func ( s * OrderService ) HandleStockReserved ( msg * nats . Msg ) {
var event StockReservedEvent
json . Unmarshal ( msg . Data , & event )
// Update order status
s . updateOrderStatus ( event . ProductID , "stock_reserved" )
}
3. Background Job Processing (RabbitMQ)
// Enqueue email job
func ( s * EmailService ) SendTransactionReceipt ( ctx context . Context , txID uint64 ) error {
job := EmailJob {
Type : "transaction_receipt" ,
TransactionID : txID ,
Priority : 1 ,
}
return s . rabbitmq . EnqueueJob ( "emails" , & job )
}
// Worker processes jobs
func ( w * EmailWorker ) Start () {
for job := range w . rabbitmq . ConsumeJobs ( "emails" ) {
switch job . Type {
case "transaction_receipt" :
w . sendTransactionReceipt ( job . TransactionID )
case "order_confirmation" :
w . sendOrderConfirmation ( job . OrderID )
}
}
}
4. IoT Device Updates (MQTT)
// POS device publishes status
func ( d * POSDevice ) PublishStatus () {
status := DeviceStatus {
DeviceID : d . ID ,
Online : true ,
BatteryLevel : d . GetBatteryLevel (),
LastSeen : time . Now (),
}
d . mqtt . Publish ( fmt . Sprintf ( "devices/ %s /status" , d . ID ), status )
}
// Server subscribes to device updates
func ( s * DeviceService ) MonitorDevices () {
s . mqtt . Subscribe ( "devices/+/status" , func ( msg mqtt . Message ) {
var status DeviceStatus
json . Unmarshal ( msg . Payload (), & status )
s . updateDeviceStatus ( status . DeviceID , & status )
})
}
5. Real-time Notifications (Redis Pub/Sub)
// Publish notification
func ( s * NotificationService ) NotifyUser ( userID uint64 , message string ) error {
notification := Notification {
UserID : userID ,
Message : message ,
Timestamp : time . Now (),
}
payload , _ := json . Marshal ( notification )
return s . redis . Publish ( ctx , fmt . Sprintf ( "user: %d :notifications" , userID ), payload ). Err ()
}
// WebSocket server subscribes
func ( ws * WebSocketServer ) SubscribeUserNotifications ( userID uint64 ) {
pubsub := ws . redis . Subscribe ( ctx , fmt . Sprintf ( "user: %d :notifications" , userID ))
for msg := range pubsub . Channel () {
// Send to WebSocket client
ws . SendToClient ( userID , msg . Payload )
}
}
📚 Best Practices
1. Message Idempotency
// ✅ GOOD - Idempotent message handling
func handleMessage ( msg kafka . Message ) {
messageID := string ( msg . Key )
// Check if already processed
if cache . Exists ( messageID ) {
log . Info ( "Message already processed" , "id" , messageID )
return
}
// Process message
processMessage ( msg )
// Mark as processed
cache . Set ( messageID , true , 24 * time . Hour )
}
2. Error Handling & Retry
// ✅ GOOD - Retry with exponential backoff
func processWithRetry ( msg kafka . Message , maxRetries int ) error {
for i := 0 ; i < maxRetries ; i ++ {
err := processMessage ( msg )
if err == nil {
return nil
}
// Exponential backoff
backoff := time . Duration ( math . Pow ( 2 , float64 ( i ))) * time . Second
time . Sleep ( backoff )
}
// Send to DLQ after max retries
sendToDLQ ( msg )
return errors . New ( "max retries exceeded" )
}
3. Graceful Shutdown
// ✅ GOOD - Graceful shutdown
func main () {
ctx , cancel := context . WithCancel ( context . Background ())
defer cancel ()
// Start consumer
go startKafkaConsumer ( ctx )
// Wait for interrupt signal
sigChan := make ( chan os . Signal , 1 )
signal . Notify ( sigChan , os . Interrupt , syscall . SIGTERM )
<- sigChan
// Cancel context to stop consumers
cancel ()
// Wait for graceful shutdown
time . Sleep ( 5 * time . Second )
}
OpenTelemetry Tracing Distributed tracing for message flows
System Design Event-driven architecture
Cache & Redis Redis caching strategies
Message Brokers : Pilih broker sesuai use case. Kafka untuk event streaming, NATS untuk RPC, RabbitMQ untuk task queues, MQTT untuk IoT, Redis Pub/Sub untuk real-time notifications.