MQTT Functions
FnKit supports event-driven functions that subscribe to MQTT topics instead of listening on HTTP. Each function connects to an MQTT broker, subscribes to a topic, and processes messages as they arrive.
How It Works
MQTT Broker (e.g. Mosquitto)
│
├── topic: fnkit/my-handler → my-handler container (processes message)
├── topic: fnkit/sensor-data → sensor-data container (processes message)
└── topic: fnkit/alerts → alerts container (processes message)
Unlike HTTP functions which listen on a port, MQTT functions are subscribers — they connect to a broker and wait for messages on their topic. The topic is {prefix}/{target} by default (e.g. fnkit/my-handler).
Supported Runtimes
| Runtime | Command | Framework |
|---|---|---|
| Node.js | fnkit node-mqtt <name> |
function-framework-nodejs |
| Go | fnkit go-mqtt <name> |
function-framework-go |
| .NET | fnkit dotnet-mqtt <name> |
function-framework-dotnet |
These use the FnKit Function Framework (not the Google Cloud Functions Framework used by HTTP runtimes).
Quick Start
# Create an MQTT function
fnkit node-mqtt my-handler
cd my-handler
# Edit the handler function
# Then build and run
docker compose up -d
Environment Variables
MQTT functions are configured entirely via environment variables, set in the generated docker-compose.yml and Dockerfile.
Connection
| Variable | Description | Default |
|---|---|---|
MQTT_BROKER |
Broker connection URL | mqtt://localhost:1883 |
FUNCTION_TARGET |
Function name to invoke | — |
MQTT_CLIENT_ID |
Client identifier (auto-generated if empty) | — |
Topics
| Variable | Description | Default |
|---|---|---|
MQTT_TOPIC_PREFIX |
Topic prefix — subscribes to {prefix}/{target} |
fnkit |
MQTT_SUBSCRIBE_TOPIC |
Override the full subscribe topic (e.g. v1.0/# for wildcard) |
— |
MQTT_QOS |
QoS level (0, 1, or 2) | 1 |
Authentication
| Variable | Description | Default |
|---|---|---|
MQTT_USERNAME |
Broker authentication username | — |
MQTT_PASSWORD |
Broker authentication password | — |
TLS / mTLS
| Variable | Description | Default |
|---|---|---|
MQTT_CA |
Path to CA certificate | — |
MQTT_CERT |
Path to client certificate (mTLS) | — |
MQTT_KEY |
Path to client key (mTLS) | — |
MQTT_REJECT_UNAUTHORIZED |
Reject unauthorized TLS certificates | true |
Docker Compose
The generated docker-compose.yml includes all environment variables and an optional Mosquitto broker:
services:
my-handler:
build: .
container_name: my-handler
environment:
- MQTT_BROKER=mqtt://mosquitto:1883
- FUNCTION_TARGET=helloWorld
- MQTT_TOPIC_PREFIX=fnkit
- MQTT_QOS=1
networks:
- fnkit-network
depends_on:
mosquitto:
condition: service_started
# Uncomment to include a broker
# mosquitto:
# image: eclipse-mosquitto:2
# container_name: mosquitto
# ports:
# - "1883:1883"
# networks:
# - fnkit-network
networks:
fnkit-network:
name: fnkit-network
external: true
Running a Broker
MQTT functions need a broker to connect to. You can use any MQTT broker — here are common options:
Mosquitto (lightweight, local)
docker run -d \
--name mosquitto \
--network fnkit-network \
-p 1883:1883 \
eclipse-mosquitto:2
EMQX (production-grade)
docker run -d \
--name emqx \
--network fnkit-network \
-p 1883:1883 \
-p 18083:18083 \
emqx/emqx:5
Cloud Brokers
You can also use cloud MQTT services — just set MQTT_BROKER to the cloud endpoint:
MQTT_BROKER=mqtts://broker.hivemq.cloud:8883
MQTT_USERNAME=your-username
MQTT_PASSWORD=your-password
Testing
Publish a Test Message
With Mosquitto CLI tools:
# Install mosquitto-clients
apt install mosquitto-clients # or brew install mosquitto
# Publish to your function's topic
mosquitto_pub -h localhost -t "fnkit/my-handler" -m '{"hello": "world"}'
With Docker
docker exec mosquitto mosquitto_pub -t "fnkit/my-handler" -m '{"hello": "world"}'
Topic Patterns
Default
By default, functions subscribe to {MQTT_TOPIC_PREFIX}/{FUNCTION_TARGET}:
MQTT_TOPIC_PREFIX=fnkit + FUNCTION_TARGET=my-handler → fnkit/my-handler
Custom Topic
Override with MQTT_SUBSCRIBE_TOPIC for custom patterns:
# Subscribe to a specific topic
MQTT_SUBSCRIBE_TOPIC=sensors/temperature/room1
# Wildcard — all messages under a prefix
MQTT_SUBSCRIBE_TOPIC=sensors/#
# Single-level wildcard
MQTT_SUBSCRIBE_TOPIC=sensors/+/room1
Publishing from a Function
MQTT functions subscribe to topics by default, but you can also publish to other topics from within your handler. This is useful for processing pipelines — e.g. subscribe to sensors/raw, transform the data, and publish to sensors/processed.
Each runtime uses a standard MQTT client library to create a publish connection alongside the framework’s subscribe connection.
Node.js
npm install mqtt
const fnkit = require('@functionkit/functions-framework');
const mqtt = require('mqtt');
const pub = mqtt.connect(process.env.MQTT_BROKER || 'mqtt://localhost:1883');
fnkit.mqtt('processRaw', (req, res) => {
const processed = { temperature: req.body.raw * 0.1 };
pub.publish('sensors/processed', JSON.stringify(processed));
res.send({ status: 'published' });
});
Go
go get github.com/eclipse/paho.mqtt.golang
import (
"encoding/json"
"os"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/functionkit/function-framework-go/functions"
)
var pub mqtt.Client
func init() {
opts := mqtt.NewClientOptions().AddBroker(os.Getenv("MQTT_BROKER"))
pub = mqtt.NewClient(opts)
if token := pub.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf("publish client: %v", token.Error())
}
functions.MQTT("processRaw", processRaw)
}
func processRaw(req *functions.MqttRequest, res functions.MqttResponse) {
payload, _ := json.Marshal(map[string]float64{"temperature": 22.5})
pub.Publish("sensors/processed", 1, false, payload)
res.Send(map[string]string{"status": "published"})
}
.NET
dotnet add package MQTTnet
using MQTTnet;
using MQTTnet.Client;
public class Publisher
{
private static readonly Lazy<Task<IMqttClient>> _client = new(CreateClientAsync);
private static async Task<IMqttClient> CreateClientAsync()
{
var factory = new MqttFactory();
var client = factory.CreateMqttClient();
var broker = Environment.GetEnvironmentVariable("MQTT_BROKER") ?? "mqtt://localhost:1883";
var uri = new Uri(broker);
var options = new MqttClientOptionsBuilder()
.WithTcpServer(uri.Host, uri.Port)
.Build();
await client.ConnectAsync(options);
return client;
}
public static async Task PublishAsync(string topic, object payload)
{
var client = await _client.Value;
var json = System.Text.Json.JsonSerializer.Serialize(payload);
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(json)
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
await client.PublishAsync(message);
}
}
// Usage inside your function:
await Publisher.PublishAsync("sensors/processed", new { temperature = 22.5 });
Notes
- The publish client reuses the same
MQTT_BROKERenvironment variable as the subscriber - Broker authentication (
MQTT_USERNAME,MQTT_PASSWORD) and TLS settings apply to publish connections too — configure them on your client as needed - For high-throughput publishing, create the client once (as shown above) rather than per-message
QoS Levels
| Level | Name | Guarantee |
|---|---|---|
| 0 | At most once | Fire and forget — message may be lost |
| 1 | At least once | Message delivered at least once (may duplicate) |
| 2 | Exactly once | Message delivered exactly once (highest overhead) |
Default is QoS 1 (at least once), which is suitable for most use cases.
MQTT vs HTTP Functions
| HTTP Functions | MQTT Functions | |
|---|---|---|
| Trigger | HTTP request | MQTT message |
| Protocol | HTTP/1.1 | MQTT 3.1.1 / 5.0 |
| Pattern | Request/response | Pub/sub |
| Gateway | Routed via fnkit-gateway | Direct broker connection |
| Use cases | APIs, webhooks, web services | IoT, sensors, event processing |
| Runtimes | 9 (all languages) | 3 (Node.js, Go, .NET) |
Notes
- MQTT functions don’t use the API gateway — they connect directly to the broker
- Each function gets its own container and MQTT connection
- Functions can also use the shared cache via
CACHE_URL - The
CACHE_URL=redis://fnkit-cache:6379environment variable is set in generated Dockerfiles
OPC-UA Bridge (fnkit uns opcua)
FnKit includes a built-in OPC-UA → MQTT bridge that connects to OPC-UA servers, reads tags, and publishes data to MQTT topics. Written in Go, it compiles to a single static binary — run it as a Docker container on a server or as a standalone .exe on edge devices.
OPC-UA Server (opc.tcp://...)
│
│ Read (poll) / Subscribe (real-time)
│
▼
┌──────────────────────────────────────┐
│ opcua-bridge (Go binary) │
│ │
│ Tag Groups: │
│ ├── machine-data (subscribe) │
│ │ → legacy/mill8 │
│ ├── condition-monitoring (poll 60s) │
│ │ → condition_legacy/mill8 │
│ └── motor-torque-power (poll 10s) │
│ → motor_legacy/mill8 │
│ │
│ Status → connection/mill8 │
└──────────────────────────────────────┘
│
│ MQTT Publish (JSON)
│
▼
MQTT Broker → uns-framework → fnkit-cache → uns-log
Commands
| Command | Description |
|---|---|
fnkit uns opcua init [name] |
Scaffold OPC-UA bridge project (default: opcua-bridge/) |
fnkit uns opcua start [name] |
Build & start bridge container |
fnkit uns opcua stop [name] |
Stop bridge container |
fnkit uns opcua build [name] |
Cross-compile standalone binaries to dist/ |
Quick Start
# Create the bridge project
fnkit uns opcua init
cd opcua-bridge
# Edit tag configuration
vi tags.yaml
# Option A: Run as Docker container
cp .env.example .env
docker compose up -d
# Option B: Build standalone binaries for edge deployment
fnkit uns opcua build
# → dist/opcua-bridge-windows-amd64.exe
# → dist/opcua-bridge-linux-amd64
# → dist/opcua-bridge-linux-arm64
# → dist/opcua-bridge-darwin-arm64
Tag Configuration (tags.yaml)
Tags are organised into groups, each with its own MQTT topic and read mode:
opcua:
endpoint: "opc.tcp://192.168.1.100:4840/"
security_policy: "Basic256Sha256" # None | Basic256Sha256 | Basic256
security_mode: "SignAndEncrypt" # None | Sign | SignAndEncrypt
username: ""
password: ""
insecure: false
mqtt:
broker: "mqtt://localhost:1883"
client_id: "my-bridge"
qos: 1
status_topic: "connection/my-bridge"
groups:
- name: "machine-data"
topic: "legacy/mill8"
mode: "subscribe" # Real-time OPC-UA subscription
interval: 1000
tags:
- name: "Spindle Speed"
node_id: "ns=2;s=/Channel/Spindle/cmdSpeed"
- name: "Feed Rate"
node_id: "ns=2;s=/Channel/State/cmdFeedRateIpo"
- name: "condition-monitoring"
topic: "condition_legacy/mill8"
mode: "poll" # Read on interval
interval: 60000 # 60 seconds
tags:
- name: "X Motor Temp"
node_id: "ns=2;s=/DriveVsa/Drive/R0035[u3]"
Tag Group Modes
| Mode | Description |
|---|---|
poll |
Read all tags on a fixed interval (e.g. every 10s, 60s). Good for slow-changing data like temperatures. |
subscribe |
OPC-UA subscription — the server pushes changes in real-time. Good for fast-changing data like spindle speed. |
OPC-UA Security
| Security Policy | Security Mode | Use Case |
|---|---|---|
None |
None |
Development / testing |
Basic256Sha256 |
Sign |
Signed messages |
Basic256Sha256 |
SignAndEncrypt |
Full encryption (production) |
For certificate-based auth, place certs in certs/ and set paths in tags.yaml:
opcua:
certificate: "/certs/opcua-client.crt"
private_key: "/certs/opcua-client.key"
MQTT Security
Supports plain, TLS, mTLS, and username/password:
mqtt:
broker: "mqtts://broker:8883" # TLS
ca: "/certs/mqtt-ca.crt"
cert: "/certs/mqtt-client.crt" # mTLS
key: "/certs/mqtt-client.key"
insecure: false # Set true for self-signed certs
Environment Variable Overrides
All connection settings in tags.yaml can be overridden via environment variables (useful for Docker/CI):
| Variable | Description |
|---|---|
OPCUA_ENDPOINT |
OPC-UA server endpoint |
OPCUA_SECURITY_POLICY |
None / Basic256Sha256 / Basic256 |
OPCUA_SECURITY_MODE |
None / Sign / SignAndEncrypt |
OPCUA_USERNAME / OPCUA_PASSWORD |
OPC-UA authentication |
OPCUA_INSECURE |
Skip OPC-UA cert validation |
MQTT_BROKER |
MQTT broker URL |
MQTT_USERNAME / MQTT_PASSWORD |
MQTT authentication |
MQTT_INSECURE |
Skip MQTT TLS validation |
STATUS_TOPIC |
Connection status MQTT topic |
Edge Deployment
The build command cross-compiles standalone binaries via Docker:
fnkit uns opcua build
# Copy to edge machine:
# opcua-bridge-windows-amd64.exe
# tags.yaml
# certs/ (if using TLS)
#
# Run on the edge machine:
./opcua-bridge-windows-amd64.exe
No Docker, no runtime dependencies — just the binary, config file, and optional certs.
Built With
- gopcua/opcua — OPC-UA client library for Go
- eclipse/paho.mqtt.golang — MQTT client
- gopkg.in/yaml.v3 — YAML configuration
UNS Plugin (fnkit uns)
FnKit includes a built-in plugin for Unified Namespace (UNS) workflows — a common industrial IoT pattern where all enterprise data flows through a hierarchical MQTT topic structure following ISA-95.
The plugin provides three pre-built functions that work together:
MQTT Broker (v1.0/#)
│
▼
uns-framework (Go MQTT function)
│ Subscribes to v1.0/#
│ Caches every message to Valkey
▼
fnkit-cache (Valkey)
│
├── uns-cache (Node.js HTTP function)
│ Reads cached topics, returns JSON with change detection
│
└── uns-log (Go HTTP function)
Logs changed values to PostgreSQL
Commands
| Command | Description |
|---|---|
fnkit uns uns init [name] |
Scaffold UNS topic monitor (Go MQTT → Valkey) |
fnkit uns uns start [name] |
Build & start monitor container |
fnkit uns uns stop [name] |
Stop monitor container |
fnkit uns cache init [name] |
Scaffold UNS cache reader (Node.js HTTP → JSON) |
fnkit uns cache start [name] |
Build & start cache reader |
fnkit uns cache stop [name] |
Stop cache reader |
fnkit uns log init [name] |
Scaffold PostgreSQL logger (Go HTTP → Postgres) |
fnkit uns log start [name] |
Build & start logger |
fnkit uns log stop [name] |
Stop logger |
fnkit uns status |
Show status of all UNS components |
Quick Start
# 1. Start the shared cache
fnkit cache start
# 2. Create and configure the UNS monitor
fnkit uns uns init
cd uns-framework
cp .env.example .env
# Edit .env: set MQTT_BROKER, auth, TLS certs as needed
docker compose up -d
# 3. Create the cache reader (HTTP API for cached data)
cd ..
fnkit uns cache init
cd uns-cache
cp .env.example .env
docker compose up -d
# 4. (Optional) Create the PostgreSQL logger
cd ..
fnkit uns log init
cd uns-log
cp .env.example .env
# Set config in Valkey:
docker exec fnkit-cache valkey-cli SET fnkit:config:uns-log \
'{"table":"uns_log","topics":["v1.0/enterprise/site/area/line/temperature"]}'
docker compose up -d
TLS / mTLS Support
The UNS monitor (fnkit uns uns init) includes full TLS/mTLS support:
- Place certificates in the
certs/directory - Configure paths in
.env:
# TLS (server verification)
MQTT_BROKER=mqtts://broker:8883
MQTT_CA=/certs/ca.crt
# mTLS (mutual authentication)
MQTT_CERT=/certs/client.crt
MQTT_KEY=/certs/client.key
# Self-signed certificates
MQTT_REJECT_UNAUTHORIZED=false
The docker-compose.yml mounts ./certs:/certs:ro automatically.
Cache Key Layout
The UNS monitor writes to Valkey with this key structure:
| Key Pattern | Type | Description |
|---|---|---|
uns:topics |
SET | All discovered topic paths |
uns:data:<topic> |
STRING | Latest payload (raw JSON) |
uns:prev:<topic> |
STRING | Previous payload (raw JSON) |
uns:meta:<topic> |
STRING | {"last_updated", "count", "first_seen"} |
Query from CLI:
docker exec fnkit-cache valkey-cli SMEMBERS uns:topics
docker exec fnkit-cache valkey-cli GET "uns:data:v1.0/enterprise/site/area/line/temperature"
← Back to README · Runtimes → · Cache →