UNS Plugin
FnKit includes a built-in plugin for Unified Namespace (UNS) workflows — a common industrial IoT pattern where all enterprise data flows through a hierarchical MQTT topic structure following ISA-95.
The fnkit uns commands scaffold and manage three pre-built functions that work together to monitor, cache, and log UNS data.
Architecture
MQTT Broker
│
│ v1.0/# (wildcard subscription)
│
▼
┌──────────────────────────────┐
│ uns-framework │
│ Go MQTT function │
│ Caches every message │
└──────────────┬───────────────┘
│
▼
fnkit-cache (Valkey)
│
├── uns-cache (Node.js HTTP function)
│ Reads cached topics → JSON API
│
└── uns-log (Go HTTP function)
Logs changes → PostgreSQL
The three components:
| Component | Runtime | Protocol | Purpose |
|---|---|---|---|
| uns-framework | Go | MQTT | Subscribes to v1.0/#, caches all messages to Valkey |
| uns-cache | Node.js | HTTP | Reads cached topic data, returns JSON with change detection |
| uns-log | Go | HTTP | Logs changed values to PostgreSQL |
Quick Start
# 1. Start the shared cache
fnkit cache start
# 2. Create and configure the UNS monitor
fnkit uns uns init
cd uns-framework
cp .env.example .env
# Edit .env: set MQTT_BROKER, auth, TLS certs as needed
docker compose up -d
# 3. Create the cache reader (HTTP API for cached data)
cd ..
fnkit uns cache init
cd uns-cache
cp .env.example .env
docker compose up -d
# 4. (Optional) Create the PostgreSQL logger
cd ..
fnkit uns log init
cd uns-log
cp .env.example .env
# Set config in Valkey:
docker exec fnkit-cache valkey-cli SET fnkit:config:uns-log \
'{"table":"uns_log","topics":["v1.0/enterprise/site/area/line/temperature"]}'
docker compose up -d
# 5. Check status
fnkit uns status
Commands
| Command | Description |
|---|---|
fnkit uns uns init [name] |
Scaffold UNS topic monitor (default: uns-framework/) |
fnkit uns uns start [name] |
Build & start monitor container |
fnkit uns uns stop [name] |
Stop monitor container |
fnkit uns cache init [name] |
Scaffold UNS cache reader (default: uns-cache/) |
fnkit uns cache start [name] |
Build & start cache reader |
fnkit uns cache stop [name] |
Stop cache reader |
fnkit uns log init [name] |
Scaffold PostgreSQL logger (default: uns-log/) |
fnkit uns log start [name] |
Build & start logger |
fnkit uns log stop [name] |
Stop logger |
fnkit uns status |
Show status of all UNS components |
UNS Topic Monitor (uns-framework)
A Go MQTT function that subscribes to the entire UNS namespace (v1.0/#) and writes every message to the shared Valkey cache.
What It Does
For each incoming MQTT message:
- Shifts the current cached value → previous value
- Writes the new payload as the current value
- Tracks the topic in a registry set
- Updates metadata (last_updated, count, first_seen)
Generated Files
| File | Purpose |
|---|---|
function.go |
UNS monitor handler with cache pipeline |
cmd/main.go |
Entry point — loads function, starts framework |
go.mod |
Go module with function-framework-go + go-redis |
Dockerfile |
Multi-stage build (Go builder → distroless) |
docker-compose.yml |
Service config with env_file + cert volume |
.env.example |
All configuration with documentation |
certs/ |
Directory for TLS/mTLS certificates |
README.md |
Component-specific documentation |
Configuration
All configuration is via .env (copy from .env.example):
| Variable | Default | Description |
|---|---|---|
MQTT_BROKER |
mqtt://localhost:1883 |
Broker URL (mqtt:// or mqtts://) |
FUNCTION_TARGET |
unsMonitor |
Function registry name |
MQTT_TOPIC_PREFIX |
v1.0 |
UNS namespace version prefix |
MQTT_SUBSCRIBE_TOPIC |
v1.0/# |
Wildcard subscription |
MQTT_QOS |
1 |
QoS level (0, 1, or 2) |
MQTT_CLIENT_ID |
(auto) | MQTT client identifier |
MQTT_USERNAME |
— | Broker authentication |
MQTT_PASSWORD |
— | Broker authentication |
MQTT_CA |
— | CA certificate path (TLS) |
MQTT_CERT |
— | Client certificate path (mTLS) |
MQTT_KEY |
— | Client private key path (mTLS) |
MQTT_REJECT_UNAUTHORIZED |
true |
Set false for self-signed certs |
CACHE_URL |
redis://fnkit-cache:6379 |
Valkey/Redis connection |
CACHE_KEY_PREFIX |
uns |
Prefix for all cache keys |
CACHE_TTL |
0 |
TTL in seconds (0 = no expiry) |
TLS / mTLS
The monitor includes full TLS and mutual TLS support:
- Place certificates in the
certs/directory - Configure paths in
.env:
# TLS (server verification)
MQTT_BROKER=mqtts://broker:8883
MQTT_CA=/certs/ca.crt
# mTLS (mutual authentication)
MQTT_CERT=/certs/client.crt
MQTT_KEY=/certs/client.key
# Self-signed certificates
MQTT_REJECT_UNAUTHORIZED=false
The docker-compose.yml mounts ./certs:/certs:ro automatically. Certificate files are gitignored — only .gitkeep is tracked.
Cache Key Layout
The UNS monitor writes to Valkey with this key structure:
| Key Pattern | Type | Description |
|---|---|---|
uns:topics |
SET | All discovered topic paths |
uns:data:<topic> |
STRING | Latest payload (raw JSON) |
uns:prev:<topic> |
STRING | Previous payload (raw JSON) |
uns:meta:<topic> |
STRING | Metadata: {"last_updated", "count", "first_seen"} |
Querying the Cache
From CLI:
# List all discovered topics
docker exec fnkit-cache valkey-cli SMEMBERS uns:topics
# Get latest value
docker exec fnkit-cache valkey-cli GET "uns:data:v1.0/enterprise/site/area/line/temperature"
# Get previous value (for change detection)
docker exec fnkit-cache valkey-cli GET "uns:prev:v1.0/enterprise/site/area/line/temperature"
# Get metadata
docker exec fnkit-cache valkey-cli GET "uns:meta:v1.0/enterprise/site/area/line/temperature"
From a Go function:
rdb := redis.NewClient(&redis.Options{Addr: "fnkit-cache:6379"})
topics, _ := rdb.SMembers(ctx, "uns:topics").Result()
val, _ := rdb.Get(ctx, "uns:data:v1.0/enterprise/site/area/line/temperature").Result()
UNS Cache Reader (uns-cache)
A Node.js HTTP function that reads UNS topic data from the shared Valkey cache and returns JSON with change detection.
API
Get all cached topics:
curl -H "Authorization: Bearer <token>" http://localhost:8080/uns-cache
Get specific topics:
curl -H "Authorization: Bearer <token>" \
-H "Content-Type: application/json" \
-d '{"topics": ["v1.0/enterprise/site/area/line/temperature"]}' \
http://localhost:8080/uns-cache
Response Format
{
"topic_count": 1,
"changed_count": 1,
"topics": [
{
"topic": "v1.0/enterprise/site1/area1/line1/temperature",
"value": { "value": 23.1, "unit": "C" },
"previous_value": { "value": 22.5, "unit": "C" },
"changed": true,
"last_updated": "2026-02-20T15:10:44Z",
"count": 2,
"first_seen": "2026-02-20T15:10:42Z"
}
],
"changes": [
{
"topic": "v1.0/enterprise/site1/area1/line1/temperature",
"current": { "value": 23.1, "unit": "C" },
"previous": { "value": 22.5, "unit": "C" },
"last_updated": "2026-02-20T15:10:44Z"
}
]
}
topics— Full data for every requested topic (value, previous, metadata, change flag)changes— Only topics where current ≠ previous
Configuration
| Variable | Default | Description |
|---|---|---|
FUNCTION_TARGET |
uns-cache |
Function name |
CACHE_URL |
redis://fnkit-cache:6379 |
Valkey/Redis connection |
CACHE_KEY_PREFIX |
uns |
Must match uns-framework |
UNS PostgreSQL Logger (uns-log)
A Go HTTP function that reads UNS topic data from the cache and logs snapshot rows to PostgreSQL when any value changes.
How It Works
- Fetches config from Valkey (cached 30s)
- Reads all configured topics from cache
- Compares current values vs last logged snapshot
- If any topic changed → INSERTs a full snapshot row to PostgreSQL
- Returns JSON summary
Config in Valkey
Config is stored in the shared Valkey cache — not in .env files. The function reads its config using FUNCTION_TARGET as the key:
docker exec fnkit-cache valkey-cli SET fnkit:config:uns-log \
'{"table":"uns_log","topics":["v1.0/acme/factory1/mixing/line1/temperature","v1.0/acme/factory1/mixing/line1/pressure"]}'
Config format:
{
"table": "uns_log",
"topics": [
"v1.0/acme/factory1/mixing/line1/temperature",
"v1.0/acme/factory1/mixing/line1/pressure"
]
}
PostgreSQL Table (auto-created)
CREATE TABLE IF NOT EXISTS uns_log (
id BIGSERIAL PRIMARY KEY,
logged_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
enterprise TEXT NOT NULL,
site TEXT NOT NULL,
area TEXT NOT NULL,
line TEXT NOT NULL,
tag TEXT NOT NULL,
values JSONB NOT NULL,
changed TEXT[] NOT NULL
);
Every row is a complete snapshot — unchanged values are copied forward. The changed column lists which tags triggered the log.
UNS Topic Parsing
All hierarchy metadata is parsed directly from the topic path — no manual mapping needed:
v1.0/{enterprise}/{site}/{area}/{line}/{tag}
| UNS Level | Parsed From | Example |
|---|---|---|
enterprise |
parts[1] |
acme |
site |
parts[2] |
factory1 |
area |
parts[3] |
mixing |
line |
parts[4] |
line1 |
tag |
parts[5:] |
temperature |
Multiple Instances
Deploy the same image with different FUNCTION_TARGET values. Each reads its own config from Valkey:
# Instance 1 — line1 sensors
docker exec fnkit-cache valkey-cli SET fnkit:config:uns-log-line1 \
'{"table":"uns_log","topics":["v1.0/acme/factory1/mixing/line1/temperature","v1.0/acme/factory1/mixing/line1/pressure"]}'
# Instance 2 — line2 sensors
docker exec fnkit-cache valkey-cli SET fnkit:config:uns-log-line2 \
'{"table":"uns_log","topics":["v1.0/acme/factory1/mixing/line2/temperature","v1.0/acme/factory1/mixing/line2/speed"]}'
Trigger via the gateway:
curl -H "Authorization: Bearer <token>" http://localhost:8080/uns-log-line1
curl -H "Authorization: Bearer <token>" http://localhost:8080/uns-log-line2
Configuration
| Variable | Default | Description |
|---|---|---|
FUNCTION_TARGET |
uns-log |
Function name = Valkey config key |
DATABASE_URL |
postgres://fnkit:fnkit@fnkit-postgres:5432/fnkit?sslmode=disable |
PostgreSQL connection |
CACHE_URL |
redis://fnkit-cache:6379 |
Valkey/Redis connection |
CACHE_KEY_PREFIX |
uns |
Must match uns-framework |
UNS Framework
The Unified Namespace (UNS) Framework organises enterprise data in a hierarchical MQTT topic structure following ISA-95:
v1.0/{enterprise}/{site}/{area}/{line}/{cell}/...
By subscribing to v1.0/#, the UNS monitor captures the entire namespace and makes it queryable via the shared cache. This enables:
- Real-time dashboards — read current values from cache
- Change detection — compare current vs previous values
- Historical logging — log changes to PostgreSQL for analysis
- Event-driven actions — trigger functions when specific topics change