Backend Integration Guide: Delta Sync via MQTT
Panduan implementasi delta sync di mstore-backend untuk mendukung real-time updates ke mobile app.๐ Prerequisites
- MQTT Broker (EMQX/Mosquitto) sudah running
- PostgreSQL database
- Go 1.21+ atau Node.js 18+ (sesuai stack backend)
- MQTT client library
๐๏ธ Architecture Overview
Copy
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ MStore Backend โ
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Business Logic Layer โ โ
โ โ โข ProductService.UpdateStock() โ โ
โ โ โข InventoryService.UpdateQuantity() โ โ
โ โ โข TransactionService.Create() โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Event Publisher โ โ
โ โ โข MqttPublisher.PublishProductUpdate() โ โ
โ โ โข MqttPublisher.PublishInventoryUpdate() โ โ
โ โ โข MqttPublisher.PublishTransactionUpdate() โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ MQTT Publish
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ MQTT Broker (EMQX) โ
โ Topics: โ
โ โข {env}/{merchant}/{branch}/product/updates โ
โ โข {env}/{merchant}/{branch}/inventory/updates โ
โ โข {env}/{merchant}/{branch}/transaction/updates โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ MQTT Subscribe
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ MStore Mobile App โ
โ โข DeltaSyncService handles updates โ
โ โข Update local Isar database โ
โ โข Trigger UI refresh โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
๐ง Implementation Steps
Step 1: Setup MQTT Client
Go Example:
Copy
// pkg/mqtt/client.go
package mqtt
import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"os"
"time"
)
type Client struct {
client mqtt.Client
}
func NewClient() (*Client, error) {
opts := mqtt.NewClientOptions()
opts.AddBroker(os.Getenv("MQTT_BROKER"))
opts.SetUsername(os.Getenv("MQTT_USERNAME"))
opts.SetPassword(os.Getenv("MQTT_PASSWORD"))
opts.SetClientID(os.Getenv("MQTT_CLIENT_ID"))
opts.SetKeepAlive(60 * time.Second)
opts.SetAutoReconnect(true)
opts.SetConnectRetry(true)
opts.SetMaxReconnectInterval(10 * time.Second)
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
return nil, token.Error()
}
return &Client{client: client}, nil
}
func (c *Client) Publish(topic string, payload []byte) error {
token := c.client.Publish(topic, 1, false, payload)
token.Wait()
return token.Error()
}
Node.js Example:
Copy
// src/mqtt/client.js
const mqtt = require('mqtt');
class MqttClient {
constructor() {
this.client = mqtt.connect(process.env.MQTT_BROKER, {
username: process.env.MQTT_USERNAME,
password: process.env.MQTT_PASSWORD,
clientId: process.env.MQTT_CLIENT_ID,
keepalive: 60,
reconnectPeriod: 5000,
});
this.client.on('connect', () => {
console.log('MQTT connected');
});
this.client.on('error', (err) => {
console.error('MQTT error:', err);
});
}
publish(topic, payload) {
return new Promise((resolve, reject) => {
this.client.publish(topic, JSON.stringify(payload), { qos: 1 }, (err) => {
if (err) reject(err);
else resolve();
});
});
}
}
module.exports = new MqttClient();
Step 2: Create Event Publisher Service
Go Example:
Copy
// internal/service/event_publisher.go
package service
import (
"encoding/json"
"fmt"
"time"
"your-project/pkg/mqtt"
)
type EventPublisher struct {
mqttClient *mqtt.Client
}
func NewEventPublisher(mqttClient *mqtt.Client) *EventPublisher {
return &EventPublisher{mqttClient: mqttClient}
}
// Product Update Event
type ProductUpdateEvent struct {
EventType string `json:"event_type"`
Timestamp time.Time `json:"timestamp"`
Data struct {
ProductID int `json:"product_id"`
ProductName string `json:"product_name"`
OldStock int `json:"old_stock"`
NewStock int `json:"new_stock"`
Delta int `json:"delta"`
Reason string `json:"reason"`
TransactionID *int `json:"transaction_id,omitempty"`
UpdatedBy int `json:"updated_by"`
} `json:"data"`
}
func (ep *EventPublisher) PublishProductStockUpdate(
env, merchantCode, branchCode string,
productID int,
productName string,
oldStock, newStock int,
reason string,
transactionID *int,
updatedBy int,
) error {
event := ProductUpdateEvent{
EventType: "stock_updated",
Timestamp: time.Now().UTC(),
}
event.Data.ProductID = productID
event.Data.ProductName = productName
event.Data.OldStock = oldStock
event.Data.NewStock = newStock
event.Data.Delta = newStock - oldStock
event.Data.Reason = reason
event.Data.TransactionID = transactionID
event.Data.UpdatedBy = updatedBy
payload, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}
topic := fmt.Sprintf("%s/%s/%s/product/updates", env, merchantCode, branchCode)
return ep.mqttClient.Publish(topic, payload)
}
// Inventory Update Event
type InventoryUpdateEvent struct {
EventType string `json:"event_type"`
Timestamp time.Time `json:"timestamp"`
Data struct {
InventoryID int `json:"inventory_id"`
ProductID int `json:"product_id"`
ProductName string `json:"product_name"`
OldQuantity int `json:"old_quantity"`
NewQuantity int `json:"new_quantity"`
Delta int `json:"delta"`
Reason string `json:"reason"`
TransactionID *int `json:"transaction_id,omitempty"`
UpdatedBy int `json:"updated_by"`
} `json:"data"`
}
func (ep *EventPublisher) PublishInventoryUpdate(
env, merchantCode, branchCode string,
inventoryID, productID int,
productName string,
oldQty, newQty int,
reason string,
transactionID *int,
updatedBy int,
) error {
event := InventoryUpdateEvent{
EventType: "quantity_updated",
Timestamp: time.Now().UTC(),
}
event.Data.InventoryID = inventoryID
event.Data.ProductID = productID
event.Data.ProductName = productName
event.Data.OldQuantity = oldQty
event.Data.NewQuantity = newQty
event.Data.Delta = newQty - oldQty
event.Data.Reason = reason
event.Data.TransactionID = transactionID
event.Data.UpdatedBy = updatedBy
payload, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}
topic := fmt.Sprintf("%s/%s/%s/inventory/updates", env, merchantCode, branchCode)
return ep.mqttClient.Publish(topic, payload)
}
// Transaction Update Event
type TransactionUpdateEvent struct {
EventType string `json:"event_type"`
Timestamp time.Time `json:"timestamp"`
Data struct {
TransactionID int `json:"transaction_id"`
TransactionCode string `json:"transaction_code"`
OfflineID *string `json:"offline_id,omitempty"`
Status string `json:"status"`
GrandTotal float64 `json:"grand_total"`
CreatedBy int `json:"created_by"`
DeviceID string `json:"device_id"`
} `json:"data"`
}
func (ep *EventPublisher) PublishTransactionCreated(
env, merchantCode, branchCode string,
transactionID int,
transactionCode string,
offlineID *string,
status string,
grandTotal float64,
createdBy int,
deviceID string,
) error {
event := TransactionUpdateEvent{
EventType: "transaction_created",
Timestamp: time.Now().UTC(),
}
event.Data.TransactionID = transactionID
event.Data.TransactionCode = transactionCode
event.Data.OfflineID = offlineID
event.Data.Status = status
event.Data.GrandTotal = grandTotal
event.Data.CreatedBy = createdBy
event.Data.DeviceID = deviceID
payload, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}
topic := fmt.Sprintf("%s/%s/%s/transaction/updates", env, merchantCode, branchCode)
return ep.mqttClient.Publish(topic, payload)
}
Step 3: Integrate dengan Business Logic
Example: Product Stock Update
Copy
// internal/service/product_service.go
package service
import (
"context"
"your-project/internal/repository"
)
type ProductService struct {
repo *repository.ProductRepository
eventPublisher *EventPublisher
}
func (s *ProductService) UpdateStock(
ctx context.Context,
productID int,
newStock int,
reason string,
transactionID *int,
updatedBy int,
) error {
// 1. Get current product
product, err := s.repo.GetByID(ctx, productID)
if err != nil {
return err
}
oldStock := product.Stock
// 2. Update stock in database
if err := s.repo.UpdateStock(ctx, productID, newStock); err != nil {
return err
}
// 3. Publish MQTT event (async, don't block)
go func() {
err := s.eventPublisher.PublishProductStockUpdate(
product.Env,
product.MerchantCode,
product.BranchCode,
productID,
product.Name,
oldStock,
newStock,
reason,
transactionID,
updatedBy,
)
if err != nil {
// Log error but don't fail the operation
log.Printf("Failed to publish product update: %v", err)
}
}()
return nil
}
Example: Transaction Creation
Copy
// internal/service/transaction_service.go
package service
func (s *TransactionService) CreateOfflineTransaction(
ctx context.Context,
req *CreateOfflineTransactionRequest,
) (*Transaction, error) {
// 1. Create transaction in database
tx, err := s.repo.Create(ctx, req)
if err != nil {
return nil, err
}
// 2. Reduce stock for each item
for _, item := range req.Items {
if err := s.productService.UpdateStock(
ctx,
item.ProductID,
item.NewStock,
"transaction",
&tx.ID,
req.UserID,
); err != nil {
log.Printf("Failed to update stock: %v", err)
}
}
// 3. Publish transaction created event
go func() {
err := s.eventPublisher.PublishTransactionCreated(
req.Env,
req.MerchantCode,
req.BranchCode,
tx.ID,
tx.TransactionCode,
&req.OfflineID,
tx.Status,
tx.GrandTotal,
req.UserID,
req.DeviceID,
)
if err != nil {
log.Printf("Failed to publish transaction: %v", err)
}
}()
return tx, nil
}
Step 4: Database Triggers (Optional but Recommended)
Untuk memastikan semua perubahan stock ter-publish, gunakan database triggers:Copy
-- PostgreSQL Example
CREATE OR REPLACE FUNCTION notify_product_stock_change()
RETURNS TRIGGER AS $$
DECLARE
payload JSON;
BEGIN
IF OLD.stock IS DISTINCT FROM NEW.stock THEN
payload := json_build_object(
'event_type', 'stock_updated',
'timestamp', NOW(),
'data', json_build_object(
'product_id', NEW.id,
'product_name', NEW.name,
'old_stock', OLD.stock,
'new_stock', NEW.stock,
'delta', NEW.stock - OLD.stock,
'reason', 'database_trigger',
'updated_by', current_user
)
);
-- Send to application via pg_notify
PERFORM pg_notify('product_updates', payload::text);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER product_stock_change_trigger
AFTER UPDATE ON products
FOR EACH ROW
EXECUTE FUNCTION notify_product_stock_change();
Copy
// Listen to PostgreSQL notifications
func (ep *EventPublisher) ListenDatabaseNotifications(connStr string) {
conn, err := pq.NewListener(connStr, 10*time.Second, time.Minute, nil)
if err != nil {
log.Fatal(err)
}
if err := conn.Listen("product_updates"); err != nil {
log.Fatal(err)
}
for {
select {
case n := <-conn.Notify:
if n == nil {
continue
}
// Parse payload and publish to MQTT
var event ProductUpdateEvent
if err := json.Unmarshal([]byte(n.Extra), &event); err != nil {
log.Printf("Failed to parse notification: %v", err)
continue
}
// Publish to MQTT
// ... (extract env, merchant, branch from event)
}
}
}
๐ Security & ACL Configuration
EMQX ACL Rules
File:/etc/emqx/acl.conf
Copy
# Allow backend to publish to all update topics
{allow, {user, "backend"}, publish, ["+/+/+/product/updates"]}.
{allow, {user, "backend"}, publish, ["+/+/+/inventory/updates"]}.
{allow, {user, "backend"}, publish, ["+/+/+/transaction/updates"]}.
# Deny all devices from publishing to update topics
{deny, all, publish, ["+/+/+/product/updates"]}.
{deny, all, publish, ["+/+/+/inventory/updates"]}.
{deny, all, publish, ["+/+/+/transaction/updates"]}.
# Allow devices to subscribe only to their branch
{allow, {user, "DEV-DBBA60CF"}, subscribe, ["dev/+/BRN-MST-OSK00001/#"]}.
๐ Monitoring & Logging
Log Format
Copy
{
"timestamp": "2025-10-14T21:30:00Z",
"level": "INFO",
"service": "event_publisher",
"event_type": "stock_updated",
"topic": "dev/MCH-MST-001/BRN-MST-OSK00001/product/updates",
"product_id": 1386,
"delta": -1,
"latency_ms": 5
}
Metrics to Track
- Publish Success Rate:
mqtt_publish_success_total / mqtt_publish_total - Publish Latency: Time from DB update to MQTT publish
- Message Size: Average payload size
- Subscriber Count: Active mobile devices per branch
๐งช Testing
Manual Test dengan MQTT Client
Copy
# Subscribe to topic
mosquitto_sub -h localhost -p 1883 \
-u backend -P password \
-t "dev/MCH-MST-001/BRN-MST-OSK00001/product/updates" \
-v
# Publish test message
mosquitto_pub -h localhost -p 1883 \
-u backend -P password \
-t "dev/MCH-MST-001/BRN-MST-OSK00001/product/updates" \
-m '{
"event_type": "stock_updated",
"timestamp": "2025-10-14T21:30:00Z",
"data": {
"product_id": 1386,
"product_name": "Test Product",
"old_stock": 100,
"new_stock": 99,
"delta": -1,
"reason": "test"
}
}'
Integration Test
Copy
func TestPublishProductUpdate(t *testing.T) {
// Setup
mqttClient, _ := mqtt.NewClient()
publisher := NewEventPublisher(mqttClient)
// Execute
err := publisher.PublishProductStockUpdate(
"dev",
"MCH-MST-001",
"BRN-MST-OSK00001",
1386,
"Test Product",
100,
99,
"test",
nil,
1,
)
// Assert
assert.NoError(t, err)
}
๐ Deployment Checklist
Backend:
- MQTT client library installed
- EventPublisher service implemented
- Integrated dengan ProductService, InventoryService, TransactionService
- Environment variables configured
- Logging & monitoring setup
- Unit tests passed
MQTT Broker:
- EMQX/Mosquitto running
- ACL rules configured
- SSL/TLS enabled (production)
- Monitoring dashboard setup
Database:
- Triggers created (optional)
- Indexes optimized
- Backup strategy in place
๐ Support
Jika ada pertanyaan atau issue:- Check logs di backend & MQTT broker
- Verify ACL rules
- Test dengan mosquitto_sub/pub
- Contact DevOps team
Last Updated: 2025-10-14
Version: 1.0.0
Author: MStore Development Team