Skip to main content

Delta Sync Architecture via MQTT

๐Ÿ“‹ Overview

Implementasi Hybrid Sync Strategy yang menggabungkan:
  1. Polling-based Sync - Full refresh setiap 5 menit untuk fallback
  2. Delta Sync via MQTT - Real-time updates untuk perubahan incremental

๐Ÿ—๏ธ Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    MStore Mobile App                         โ”‚
โ”‚                                                              โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚           OfflineSyncService (Polling)               โ”‚  โ”‚
โ”‚  โ”‚  โ€ข Full refresh every 5 minutes                      โ”‚  โ”‚
โ”‚  โ”‚  โ€ข Fallback when MQTT unavailable                    โ”‚  โ”‚
โ”‚  โ”‚  โ€ข Initial data load                                 โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                           โ†“                                  โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚           DeltaSyncService (MQTT)                    โ”‚  โ”‚
โ”‚  โ”‚  โ€ข Real-time delta updates                           โ”‚  โ”‚
โ”‚  โ”‚  โ€ข Product stock changes                             โ”‚  โ”‚
โ”‚  โ”‚  โ€ข Inventory quantity changes                        โ”‚  โ”‚
โ”‚  โ”‚  โ€ข Transaction notifications                         โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                           โ†“                                  โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚              Local Isar Database                     โ”‚  โ”‚
โ”‚  โ”‚  โ€ข ProductEntity                                     โ”‚  โ”‚
โ”‚  โ”‚  โ€ข InventoryEntity                                   โ”‚  โ”‚
โ”‚  โ”‚  โ€ข TransactionLocalEntity                            โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                           โ†‘
                           โ”‚ MQTT v5
                           โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    MQTT Broker (EMQX)                        โ”‚
โ”‚                                                              โ”‚
โ”‚  Topics:                                                     โ”‚
โ”‚  โ€ข {env}/{merchant}/{branch}/product/updates                โ”‚
โ”‚  โ€ข {env}/{merchant}/{branch}/inventory/updates              โ”‚
โ”‚  โ€ข {env}/{merchant}/{branch}/transaction/updates            โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                           โ†‘
                           โ”‚ Publish
                           โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    MStore Backend                            โ”‚
โ”‚                                                              โ”‚
โ”‚  Event Triggers:                                             โ”‚
โ”‚  โ€ข Product stock updated โ†’ Publish delta                     โ”‚
โ”‚  โ€ข Inventory quantity changed โ†’ Publish delta                โ”‚
โ”‚  โ€ข Transaction created/updated โ†’ Publish notification        โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ“ก MQTT Topics & Payloads

1. Product Stock Updates

Topic Pattern:
{env}/{merchant_code}/{branch_code}/product/updates
Example:
dev/MCH-MST-001/BRN-MST-OSK00001/product/updates
Payload Format:
{
  "event_type": "stock_updated",
  "timestamp": "2025-10-14T21:30:00Z",
  "data": {
    "product_id": 1386,
    "product_name": "Chicken Breast sadia 2 kg",
    "old_stock": 100,
    "new_stock": 99,
    "delta": -1,
    "reason": "transaction",
    "transaction_id": 12345,
    "updated_by": "user_5"
  }
}

2. Inventory Quantity Updates

Topic Pattern:
{env}/{merchant_code}/{branch_code}/inventory/updates
Example:
dev/MCH-MST-001/BRN-MST-OSK00001/inventory/updates
Payload Format:
{
  "event_type": "quantity_updated",
  "timestamp": "2025-10-14T21:30:00Z",
  "data": {
    "inventory_id": 6486,
    "product_id": 1386,
    "product_name": "Chicken Breast sadia 2 kg",
    "old_quantity": 100,
    "new_quantity": 99,
    "delta": -1,
    "reason": "transaction",
    "transaction_id": 12345,
    "updated_by": "user_5"
  }
}

3. Transaction Notifications

Topic Pattern:
{env}/{merchant_code}/{branch_code}/transaction/updates
Example:
dev/MCH-MST-001/BRN-MST-OSK00001/transaction/updates
Payload Format:
{
  "event_type": "transaction_created",
  "timestamp": "2025-10-14T21:30:00Z",
  "data": {
    "transaction_id": 12345,
    "transaction_code": "TRX-20251014-001",
    "offline_id": "56752b9f-3cb4-4b29-8512-ce3e0112c527",
    "status": "paid",
    "grand_total": 16800.0,
    "created_by": "user_5",
    "device_id": "DEV-DBBA60CF",
    "items": [
      {
        "product_id": 1386,
        "quantity": 1,
        "price": 15000
      }
    ]
  }
}

๐Ÿ”„ Sync Flow

Initial Load (App Start)

1. App starts
2. OfflineSyncService triggers full refresh
   โ†’ GET /api/v1/master/product
   โ†’ GET /api/v1/master/inventory
3. Data saved to Isar
4. MQTT connects and subscribes to delta topics
5. DeltaSyncService ready to receive updates

Real-time Update (Delta Sync)

1. Backend detects change (e.g., stock reduced)
2. Backend publishes to MQTT topic
3. Mobile receives MQTT message
4. DeltaSyncService parses payload
5. Update local Isar database (delta only)
6. Trigger UI refresh via BLoC event
7. User sees updated stock immediately

Fallback (Polling)

1. Every 5 minutes OR app resume
2. OfflineSyncService checks TTL
3. If stale โ†’ Full refresh from API
4. Merge with local changes
5. Resolve conflicts (server wins)

๐Ÿ› ๏ธ Backend Implementation Requirements

1. Database Triggers (PostgreSQL Example)

-- Product stock update trigger
CREATE OR REPLACE FUNCTION notify_product_stock_change()
RETURNS TRIGGER AS $$
DECLARE
  payload JSON;
BEGIN
  IF OLD.stock IS DISTINCT FROM NEW.stock THEN
    payload := json_build_object(
      'event_type', 'stock_updated',
      'timestamp', NOW(),
      'data', json_build_object(
        'product_id', NEW.id,
        'product_name', NEW.name,
        'old_stock', OLD.stock,
        'new_stock', NEW.stock,
        'delta', NEW.stock - OLD.stock,
        'updated_by', current_user
      )
    );
    
    -- Publish to MQTT via pg_notify or external service
    PERFORM pg_notify(
      'product_updates',
      payload::text
    );
  END IF;
  
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER product_stock_change_trigger
AFTER UPDATE ON products
FOR EACH ROW
EXECUTE FUNCTION notify_product_stock_change();

2. MQTT Publisher Service (Go/Node.js)

// Go example
package mqtt

import (
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "encoding/json"
)

type ProductUpdateEvent struct {
    EventType string    `json:"event_type"`
    Timestamp time.Time `json:"timestamp"`
    Data      struct {
        ProductID   int     `json:"product_id"`
        ProductName string  `json:"product_name"`
        OldStock    int     `json:"old_stock"`
        NewStock    int     `json:"new_stock"`
        Delta       int     `json:"delta"`
        Reason      string  `json:"reason"`
    } `json:"data"`
}

func PublishProductUpdate(client mqtt.Client, event ProductUpdateEvent) error {
    topic := fmt.Sprintf("%s/%s/%s/product/updates",
        os.Getenv("APP_ENV"),
        event.MerchantCode,
        event.BranchCode,
    )
    
    payload, err := json.Marshal(event)
    if err != nil {
        return err
    }
    
    token := client.Publish(topic, 1, false, payload)
    token.Wait()
    return token.Error()
}

3. REST API Endpoints (Fallback)

Tetap pertahankan existing endpoints untuk polling:
GET /api/v1/master/product?branch_code={code}&limit=200&last_id=0
GET /api/v1/master/inventory?branch_code={code}&limit=200&last_id=0
GET /api/v1/transaction?limit=10

๐Ÿ“Š Conflict Resolution Strategy

Priority Rules:

  1. Server Always Wins - Untuk master data (product, inventory)
  2. Offline-First - Untuk transaction (queue dan sync later)
  3. Last-Write-Wins - Untuk config/settings

Example Conflict:

Scenario:
- Device A: Stock = 100, reduces to 99 (offline)
- Device B: Stock = 100, reduces to 98 (online, triggers MQTT)
- Device A comes online

Resolution:
1. Device A receives MQTT update: stock = 98
2. Device A has local pending: stock = 99
3. Apply server value: stock = 98 (server wins)
4. Re-calculate local transaction impact if needed

๐Ÿ” Security Considerations

MQTT Authentication:

// Use per-device credentials
final connMessage = MqttConnectMessage()
    .authenticateAs(
      deviceId,           // username
      deviceSecretToken   // password from backend
    )

Topic ACL (EMQX Config):

# Allow device to subscribe only to its branch
{allow, {user, "DEV-DBBA60CF"}, subscribe, ["dev/+/BRN-MST-OSK00001/#"]}.

# Deny publish to update topics (only backend can publish)
{deny, all, publish, ["+/+/+/product/updates"]}.
{deny, all, publish, ["+/+/+/inventory/updates"]}.
{deny, all, publish, ["+/+/+/transaction/updates"]}.

๐Ÿ“ˆ Performance Metrics

Expected Improvements:

  • Latency: 5 min โ†’ < 1 sec (300x faster)
  • Bandwidth: 200 KB/5min โ†’ ~1 KB/update (200x reduction)
  • Battery: Minimal impact (MQTT keep-alive 60s)
  • Data Freshness: 99.9% real-time

Monitoring:

// Track delta sync performance
AppLog.i('delta_sync', 'Update applied', meta: {
  'event_type': 'stock_updated',
  'product_id': 1386,
  'latency_ms': receivedAt - publishedAt,
  'payload_size': payloadBytes,
});

๐Ÿงช Testing Strategy

Unit Tests:

test('DeltaSyncService handles product update', () async {
  final payload = {
    'event_type': 'stock_updated',
    'data': {'product_id': 1386, 'new_stock': 99}
  };
  
  await deltaSyncService.handleProductUpdate(payload);
  
  final product = await isar.productEntitys
      .filter()
      .apiIdEqualTo('1386')
      .findFirst();
  
  expect(product.stock, 99);
});

Integration Tests:

  1. Publish test message to MQTT
  2. Verify mobile receives and processes
  3. Check Isar database updated
  4. Verify UI reflects change

Load Tests:

  • 100 concurrent updates/sec
  • Verify no message loss
  • Check memory/CPU usage

๐Ÿš€ Deployment Checklist

Backend:

  • Setup MQTT broker (EMQX/Mosquitto)
  • Configure ACL rules
  • Implement database triggers
  • Deploy MQTT publisher service
  • Setup monitoring (Prometheus/Grafana)

Mobile:

  • Implement DeltaSyncService
  • Update MqttBloc subscriptions
  • Add conflict resolution logic
  • Update UI to handle real-time updates
  • Test offline โ†’ online scenarios

DevOps:

  • MQTT broker high availability
  • SSL/TLS certificates
  • Firewall rules (port 1883/8883)
  • Backup & disaster recovery

๐Ÿ“š References


Last Updated: 2025-10-14
Version: 1.0.0
Author: MStore Development Team