Skip to main content

Backend Integration Guide: Delta Sync via MQTT

Panduan implementasi delta sync di mstore-backend untuk mendukung real-time updates ke mobile app.

๐Ÿ“‹ Prerequisites

  • MQTT Broker (EMQX/Mosquitto) sudah running
  • PostgreSQL database
  • Go 1.21+ atau Node.js 18+ (sesuai stack backend)
  • MQTT client library

๐Ÿ—๏ธ Architecture Overview

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    MStore Backend                            โ”‚
โ”‚                                                              โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚              Business Logic Layer                    โ”‚  โ”‚
โ”‚  โ”‚  โ€ข ProductService.UpdateStock()                      โ”‚  โ”‚
โ”‚  โ”‚  โ€ข InventoryService.UpdateQuantity()                 โ”‚  โ”‚
โ”‚  โ”‚  โ€ข TransactionService.Create()                       โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                           โ†“                                  โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚              Event Publisher                         โ”‚  โ”‚
โ”‚  โ”‚  โ€ข MqttPublisher.PublishProductUpdate()              โ”‚  โ”‚
โ”‚  โ”‚  โ€ข MqttPublisher.PublishInventoryUpdate()            โ”‚  โ”‚
โ”‚  โ”‚  โ€ข MqttPublisher.PublishTransactionUpdate()          โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                           โ†“                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                           โ†“ MQTT Publish
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    MQTT Broker (EMQX)                        โ”‚
โ”‚  Topics:                                                     โ”‚
โ”‚  โ€ข {env}/{merchant}/{branch}/product/updates                โ”‚
โ”‚  โ€ข {env}/{merchant}/{branch}/inventory/updates              โ”‚
โ”‚  โ€ข {env}/{merchant}/{branch}/transaction/updates            โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                           โ†“ MQTT Subscribe
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    MStore Mobile App                         โ”‚
โ”‚  โ€ข DeltaSyncService handles updates                         โ”‚
โ”‚  โ€ข Update local Isar database                                โ”‚
โ”‚  โ€ข Trigger UI refresh                                        โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ”ง Implementation Steps

Step 1: Setup MQTT Client

Go Example:

// pkg/mqtt/client.go
package mqtt

import (
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "os"
    "time"
)

type Client struct {
    client mqtt.Client
}

func NewClient() (*Client, error) {
    opts := mqtt.NewClientOptions()
    opts.AddBroker(os.Getenv("MQTT_BROKER"))
    opts.SetUsername(os.Getenv("MQTT_USERNAME"))
    opts.SetPassword(os.Getenv("MQTT_PASSWORD"))
    opts.SetClientID(os.Getenv("MQTT_CLIENT_ID"))
    opts.SetKeepAlive(60 * time.Second)
    opts.SetAutoReconnect(true)
    opts.SetConnectRetry(true)
    opts.SetMaxReconnectInterval(10 * time.Second)
    
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        return nil, token.Error()
    }
    
    return &Client{client: client}, nil
}

func (c *Client) Publish(topic string, payload []byte) error {
    token := c.client.Publish(topic, 1, false, payload)
    token.Wait()
    return token.Error()
}

Node.js Example:

// src/mqtt/client.js
const mqtt = require('mqtt');

class MqttClient {
  constructor() {
    this.client = mqtt.connect(process.env.MQTT_BROKER, {
      username: process.env.MQTT_USERNAME,
      password: process.env.MQTT_PASSWORD,
      clientId: process.env.MQTT_CLIENT_ID,
      keepalive: 60,
      reconnectPeriod: 5000,
    });

    this.client.on('connect', () => {
      console.log('MQTT connected');
    });

    this.client.on('error', (err) => {
      console.error('MQTT error:', err);
    });
  }

  publish(topic, payload) {
    return new Promise((resolve, reject) => {
      this.client.publish(topic, JSON.stringify(payload), { qos: 1 }, (err) => {
        if (err) reject(err);
        else resolve();
      });
    });
  }
}

module.exports = new MqttClient();

Step 2: Create Event Publisher Service

Go Example:

// internal/service/event_publisher.go
package service

import (
    "encoding/json"
    "fmt"
    "time"
    "your-project/pkg/mqtt"
)

type EventPublisher struct {
    mqttClient *mqtt.Client
}

func NewEventPublisher(mqttClient *mqtt.Client) *EventPublisher {
    return &EventPublisher{mqttClient: mqttClient}
}

// Product Update Event
type ProductUpdateEvent struct {
    EventType string    `json:"event_type"`
    Timestamp time.Time `json:"timestamp"`
    Data      struct {
        ProductID   int     `json:"product_id"`
        ProductName string  `json:"product_name"`
        OldStock    int     `json:"old_stock"`
        NewStock    int     `json:"new_stock"`
        Delta       int     `json:"delta"`
        Reason      string  `json:"reason"`
        TransactionID *int  `json:"transaction_id,omitempty"`
        UpdatedBy   int     `json:"updated_by"`
    } `json:"data"`
}

func (ep *EventPublisher) PublishProductStockUpdate(
    env, merchantCode, branchCode string,
    productID int,
    productName string,
    oldStock, newStock int,
    reason string,
    transactionID *int,
    updatedBy int,
) error {
    event := ProductUpdateEvent{
        EventType: "stock_updated",
        Timestamp: time.Now().UTC(),
    }
    event.Data.ProductID = productID
    event.Data.ProductName = productName
    event.Data.OldStock = oldStock
    event.Data.NewStock = newStock
    event.Data.Delta = newStock - oldStock
    event.Data.Reason = reason
    event.Data.TransactionID = transactionID
    event.Data.UpdatedBy = updatedBy

    payload, err := json.Marshal(event)
    if err != nil {
        return fmt.Errorf("failed to marshal event: %w", err)
    }

    topic := fmt.Sprintf("%s/%s/%s/product/updates", env, merchantCode, branchCode)
    return ep.mqttClient.Publish(topic, payload)
}

// Inventory Update Event
type InventoryUpdateEvent struct {
    EventType string    `json:"event_type"`
    Timestamp time.Time `json:"timestamp"`
    Data      struct {
        InventoryID int     `json:"inventory_id"`
        ProductID   int     `json:"product_id"`
        ProductName string  `json:"product_name"`
        OldQuantity int     `json:"old_quantity"`
        NewQuantity int     `json:"new_quantity"`
        Delta       int     `json:"delta"`
        Reason      string  `json:"reason"`
        TransactionID *int  `json:"transaction_id,omitempty"`
        UpdatedBy   int     `json:"updated_by"`
    } `json:"data"`
}

func (ep *EventPublisher) PublishInventoryUpdate(
    env, merchantCode, branchCode string,
    inventoryID, productID int,
    productName string,
    oldQty, newQty int,
    reason string,
    transactionID *int,
    updatedBy int,
) error {
    event := InventoryUpdateEvent{
        EventType: "quantity_updated",
        Timestamp: time.Now().UTC(),
    }
    event.Data.InventoryID = inventoryID
    event.Data.ProductID = productID
    event.Data.ProductName = productName
    event.Data.OldQuantity = oldQty
    event.Data.NewQuantity = newQty
    event.Data.Delta = newQty - oldQty
    event.Data.Reason = reason
    event.Data.TransactionID = transactionID
    event.Data.UpdatedBy = updatedBy

    payload, err := json.Marshal(event)
    if err != nil {
        return fmt.Errorf("failed to marshal event: %w", err)
    }

    topic := fmt.Sprintf("%s/%s/%s/inventory/updates", env, merchantCode, branchCode)
    return ep.mqttClient.Publish(topic, payload)
}

// Transaction Update Event
type TransactionUpdateEvent struct {
    EventType string    `json:"event_type"`
    Timestamp time.Time `json:"timestamp"`
    Data      struct {
        TransactionID   int     `json:"transaction_id"`
        TransactionCode string  `json:"transaction_code"`
        OfflineID       *string `json:"offline_id,omitempty"`
        Status          string  `json:"status"`
        GrandTotal      float64 `json:"grand_total"`
        CreatedBy       int     `json:"created_by"`
        DeviceID        string  `json:"device_id"`
    } `json:"data"`
}

func (ep *EventPublisher) PublishTransactionCreated(
    env, merchantCode, branchCode string,
    transactionID int,
    transactionCode string,
    offlineID *string,
    status string,
    grandTotal float64,
    createdBy int,
    deviceID string,
) error {
    event := TransactionUpdateEvent{
        EventType: "transaction_created",
        Timestamp: time.Now().UTC(),
    }
    event.Data.TransactionID = transactionID
    event.Data.TransactionCode = transactionCode
    event.Data.OfflineID = offlineID
    event.Data.Status = status
    event.Data.GrandTotal = grandTotal
    event.Data.CreatedBy = createdBy
    event.Data.DeviceID = deviceID

    payload, err := json.Marshal(event)
    if err != nil {
        return fmt.Errorf("failed to marshal event: %w", err)
    }

    topic := fmt.Sprintf("%s/%s/%s/transaction/updates", env, merchantCode, branchCode)
    return ep.mqttClient.Publish(topic, payload)
}

Step 3: Integrate dengan Business Logic

Example: Product Stock Update

// internal/service/product_service.go
package service

import (
    "context"
    "your-project/internal/repository"
)

type ProductService struct {
    repo           *repository.ProductRepository
    eventPublisher *EventPublisher
}

func (s *ProductService) UpdateStock(
    ctx context.Context,
    productID int,
    newStock int,
    reason string,
    transactionID *int,
    updatedBy int,
) error {
    // 1. Get current product
    product, err := s.repo.GetByID(ctx, productID)
    if err != nil {
        return err
    }

    oldStock := product.Stock
    
    // 2. Update stock in database
    if err := s.repo.UpdateStock(ctx, productID, newStock); err != nil {
        return err
    }

    // 3. Publish MQTT event (async, don't block)
    go func() {
        err := s.eventPublisher.PublishProductStockUpdate(
            product.Env,
            product.MerchantCode,
            product.BranchCode,
            productID,
            product.Name,
            oldStock,
            newStock,
            reason,
            transactionID,
            updatedBy,
        )
        if err != nil {
            // Log error but don't fail the operation
            log.Printf("Failed to publish product update: %v", err)
        }
    }()

    return nil
}

Example: Transaction Creation

// internal/service/transaction_service.go
package service

func (s *TransactionService) CreateOfflineTransaction(
    ctx context.Context,
    req *CreateOfflineTransactionRequest,
) (*Transaction, error) {
    // 1. Create transaction in database
    tx, err := s.repo.Create(ctx, req)
    if err != nil {
        return nil, err
    }

    // 2. Reduce stock for each item
    for _, item := range req.Items {
        if err := s.productService.UpdateStock(
            ctx,
            item.ProductID,
            item.NewStock,
            "transaction",
            &tx.ID,
            req.UserID,
        ); err != nil {
            log.Printf("Failed to update stock: %v", err)
        }
    }

    // 3. Publish transaction created event
    go func() {
        err := s.eventPublisher.PublishTransactionCreated(
            req.Env,
            req.MerchantCode,
            req.BranchCode,
            tx.ID,
            tx.TransactionCode,
            &req.OfflineID,
            tx.Status,
            tx.GrandTotal,
            req.UserID,
            req.DeviceID,
        )
        if err != nil {
            log.Printf("Failed to publish transaction: %v", err)
        }
    }()

    return tx, nil
}

Untuk memastikan semua perubahan stock ter-publish, gunakan database triggers:
-- PostgreSQL Example
CREATE OR REPLACE FUNCTION notify_product_stock_change()
RETURNS TRIGGER AS $$
DECLARE
  payload JSON;
BEGIN
  IF OLD.stock IS DISTINCT FROM NEW.stock THEN
    payload := json_build_object(
      'event_type', 'stock_updated',
      'timestamp', NOW(),
      'data', json_build_object(
        'product_id', NEW.id,
        'product_name', NEW.name,
        'old_stock', OLD.stock,
        'new_stock', NEW.stock,
        'delta', NEW.stock - OLD.stock,
        'reason', 'database_trigger',
        'updated_by', current_user
      )
    );
    
    -- Send to application via pg_notify
    PERFORM pg_notify('product_updates', payload::text);
  END IF;
  
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER product_stock_change_trigger
AFTER UPDATE ON products
FOR EACH ROW
EXECUTE FUNCTION notify_product_stock_change();
Kemudian di aplikasi, listen ke pg_notify:
// Listen to PostgreSQL notifications
func (ep *EventPublisher) ListenDatabaseNotifications(connStr string) {
    conn, err := pq.NewListener(connStr, 10*time.Second, time.Minute, nil)
    if err != nil {
        log.Fatal(err)
    }

    if err := conn.Listen("product_updates"); err != nil {
        log.Fatal(err)
    }

    for {
        select {
        case n := <-conn.Notify:
            if n == nil {
                continue
            }
            
            // Parse payload and publish to MQTT
            var event ProductUpdateEvent
            if err := json.Unmarshal([]byte(n.Extra), &event); err != nil {
                log.Printf("Failed to parse notification: %v", err)
                continue
            }
            
            // Publish to MQTT
            // ... (extract env, merchant, branch from event)
        }
    }
}

๐Ÿ” Security & ACL Configuration

EMQX ACL Rules

File: /etc/emqx/acl.conf
# Allow backend to publish to all update topics
{allow, {user, "backend"}, publish, ["+/+/+/product/updates"]}.
{allow, {user, "backend"}, publish, ["+/+/+/inventory/updates"]}.
{allow, {user, "backend"}, publish, ["+/+/+/transaction/updates"]}.

# Deny all devices from publishing to update topics
{deny, all, publish, ["+/+/+/product/updates"]}.
{deny, all, publish, ["+/+/+/inventory/updates"]}.
{deny, all, publish, ["+/+/+/transaction/updates"]}.

# Allow devices to subscribe only to their branch
{allow, {user, "DEV-DBBA60CF"}, subscribe, ["dev/+/BRN-MST-OSK00001/#"]}.

๐Ÿ“Š Monitoring & Logging

Log Format

{
  "timestamp": "2025-10-14T21:30:00Z",
  "level": "INFO",
  "service": "event_publisher",
  "event_type": "stock_updated",
  "topic": "dev/MCH-MST-001/BRN-MST-OSK00001/product/updates",
  "product_id": 1386,
  "delta": -1,
  "latency_ms": 5
}

Metrics to Track

  • Publish Success Rate: mqtt_publish_success_total / mqtt_publish_total
  • Publish Latency: Time from DB update to MQTT publish
  • Message Size: Average payload size
  • Subscriber Count: Active mobile devices per branch

๐Ÿงช Testing

Manual Test dengan MQTT Client

# Subscribe to topic
mosquitto_sub -h localhost -p 1883 \
  -u backend -P password \
  -t "dev/MCH-MST-001/BRN-MST-OSK00001/product/updates" \
  -v

# Publish test message
mosquitto_pub -h localhost -p 1883 \
  -u backend -P password \
  -t "dev/MCH-MST-001/BRN-MST-OSK00001/product/updates" \
  -m '{
    "event_type": "stock_updated",
    "timestamp": "2025-10-14T21:30:00Z",
    "data": {
      "product_id": 1386,
      "product_name": "Test Product",
      "old_stock": 100,
      "new_stock": 99,
      "delta": -1,
      "reason": "test"
    }
  }'

Integration Test

func TestPublishProductUpdate(t *testing.T) {
    // Setup
    mqttClient, _ := mqtt.NewClient()
    publisher := NewEventPublisher(mqttClient)

    // Execute
    err := publisher.PublishProductStockUpdate(
        "dev",
        "MCH-MST-001",
        "BRN-MST-OSK00001",
        1386,
        "Test Product",
        100,
        99,
        "test",
        nil,
        1,
    )

    // Assert
    assert.NoError(t, err)
}

๐Ÿš€ Deployment Checklist

Backend:

  • MQTT client library installed
  • EventPublisher service implemented
  • Integrated dengan ProductService, InventoryService, TransactionService
  • Environment variables configured
  • Logging & monitoring setup
  • Unit tests passed

MQTT Broker:

  • EMQX/Mosquitto running
  • ACL rules configured
  • SSL/TLS enabled (production)
  • Monitoring dashboard setup

Database:

  • Triggers created (optional)
  • Indexes optimized
  • Backup strategy in place

๐Ÿ“ž Support

Jika ada pertanyaan atau issue:
  1. Check logs di backend & MQTT broker
  2. Verify ACL rules
  3. Test dengan mosquitto_sub/pub
  4. Contact DevOps team

Last Updated: 2025-10-14
Version: 1.0.0
Author: MStore Development Team