Hi all, I have this device installed in my home QIACHIP Upgrade Universal WIFI Ceiling Fan and I wanted to get local control over it using MQTT. Of course its a Tuya device. So, I have a NAS from QNAP which I run Container Station and created a MQTT server and used MQTT Client for local Tuya devices by Volker76 on github here. Follow the instructions there on setup. Here's the YML configuration file I used.
YML Code:
services:
tuya-mqtt:
image: volkerhaensel/tuya_mqtt.net:latest
container_name: tuya-mqtt
networks:
macvlan_net:
ipv4_address: YOUR.IP.ADDRESS.HERE # Assign a unique LAN IP for Tuya-MQTT (e.g., 192.168.1.230)
volumes:
- tuya.net_config:/app/DataDir # DataDir is case-sensitive
restart: unless-stopped
expose:
- "80/tcp" # Web UI β http://YOUR.IP.ADDRESS.HERE
- "6666/udp" # Tuya discovery (unencrypted)
- "6667/udp" # Tuya discovery (encrypted)
healthcheck:
test: ["CMD-SHELL", "wget -q -O /dev/null http://localhost:80 || exit 1"]
interval: 60s
timeout: 5s
retries: 5
start_period: 20s
mosquitto:
image: eclipse-mosquitto:2
container_name: tuya-mosquitto
networks:
macvlan_net:
ipv4_address: YOUR.IP.ADDRESS.HERE # Assign a DIFFERENT unique LAN IP for Mosquitto (e.g., 192.168.1.232)
expose:
- "1883/tcp" # Clients connect β tcp://YOUR.IP.ADDRESS.HERE:1883
volumes:
- ./mosquitto:/mosquitto # Place mosquitto.conf in ./mosquitto/config/
restart: unless-stopped
networks:
macvlan_net:
driver: macvlan
driver_opts:
parent: br0 # Change if your LAN interface is different
ipam:
config:
- subnet: 192.168.1.0/24 # Match your LAN subnet (change if needed)
gateway: 192.168.1.1 # Change to your routerβs gateway IP
volumes:
tuya.net_config:
I entered into the Tuya-MQTT, set that up as per Volker76's github page. I was able to identified the device in my network and connected to the device. Of note, you may need a Tuya IOT login for this, again, see Volker76's github page for details.
After the fact I created this Driver using our AI overlord's help. Installed the driver into Hubitat, added a Virtual Device using the driver and coifgured the device under the device's preferences. It will ask for your mqtt Mosquitto server IP address (not the MQTT-Tuya IP address) and the Device ID, IP or Name.
You initialize and then you will then be able to control your Tuya device.
Driver code:
/**
* Tuya Lean UI β MQTT Fan+Light (Local)
* ------------------------------------------------------------
* Minimal, production-ready Hubitat driver with a simplified Preferences panel.
*
* Design:
* - No βtopic styleβ toggle β inbound auto-handles DP & dps; commands publish to BOTH.
* - No identifier mode β provide ONE identifier (ID or IP). If blank, auto-learn via wildcard.
* - Hidden sane defaults for presence timeout, re-assert delay, keepalive, ghost-session avoidance.
* - DP3 (speed) never forces Switch "on" β off state persists correctly.
* - Robust MQTT: unique clientId by default, single queued reconnect with backoff+jitter,
* heartbeat keepalive, watchdog, connection guard, safe publish, retained status.
*
* Features:
* - DP1 β Switch (power)
* - DP3 β FanControl (low/medium/high)
* - DP9 β Child "Generic Component Switch" (optional)
* - Presence inferred from inbound MQTT activity
*
* Platform: Hubitat
* License: MIT
* Author: you
*/
import groovy.transform.Field
/* βββββββββ Internal Tunables (not exposed in UI) βββββββββ */
@Field static final Integer PRESENCE_TIMEOUT_SEC = 120
@Field static final Integer RESEND_DELAY_SEC = 2 // re-assert speed once after this delay
@Field static final Integer KEEPALIVE_SEC = 60 // heartbeat interval
@Field static final Integer WATCHDOG_PERIOD = 30
@Field static final Integer BACKOFF_MIN = 5
@Field static final Integer BACKOFF_MAX = 300
@Field static final Boolean ALWAYS_UNIQUE_ID = true // avoid ghost sessions
@Field static final Boolean STATUS_TOPIC_ENABLED = true // publish retained online/offline
/* βββββββββ Constants βββββββββ */
@Field static final List<String> SPEED_ORDER = ["off","low","medium","high"]
@Field static final List<Integer> DPs = [1,3,9]
@Field static final String RECONNECT_JOB = "reconnectStep"
@Field static final String BOOTSTRAP_TOPIC_KEY = "__bootstrapWildcard__"
@Field static final String CMD_SEGMENT = "command" // fixed outbound segment
metadata {
definition(name: "Tuya Lean UI", namespace: "public", author: "you") {
capability "Initialize"
capability "Switch"
capability "FanControl"
capability "Refresh"
capability "PresenceSensor"
attribute "supportedFanSpeeds", "JSON_OBJECT"
attribute "speed", "STRING"
attribute "lastSeen", "STRING"
attribute "mqttStatus", "STRING"
attribute "activeRoot", "STRING"
attribute "mqttConnected", "STRING" // "true"/"false"
}
/* βββββββββ Minimal Preferences (only whatβs necessary) βββββββββ */
preferences {
input name: "brokerHost", type: "text", title: "MQTT Broker Host", defaultValue: "192.168.1.10", required: true
input name: "brokerPort", type: "text", title: "MQTT Broker Port", defaultValue: "1883", required: true
input name: "username", type: "text", title: "MQTT Username (optional)"
input name: "password", type: "password", title: "MQTT Password (optional)"
input name: "baseTopic", type: "text", title: "Base topic (blank = none, e.g., 'tuya')", defaultValue: ""
input name: "deviceIdent", type: "text", title: "Device identifier (ID or IP). Leave blank to auto-learn.", required: false
input name: "createLightChild", type: "bool", title: "Expose DP9 as child light", defaultValue: true
input name: "logEnable", type: "bool", title: "Enable debug logging", defaultValue: true
}
}
/* ========================= Lifecycle ========================= */
def installed() { initialize() }
def updated() { initialize() }
def uninstalled() {
try { interfaces.mqtt.disconnect() } catch(e) {}
unschedule()
}
def initialize() {
unschedule()
try { interfaces.mqtt.disconnect() } catch(e) {}
state.lastSeenEpoch = 0L
state.lastMqttOkEpoch = 0L
state.activeRoot = null
state.connected = false
state.backoff = BACKOFF_MIN
state.remove("pendingSpeedRaw")
state.remove("stabilizeUntil")
validatePrefs()
connectMqtt(false)
runIn(10, "connectionGuard")
runIn(2, "subscribeTopics")
runIn(3, "publishSupportedSpeeds")
runIn(4, "ensureLightChild")
runIn(5, "startWatchdogLoop")
runEvery1Minute("presenceTick")
if (logEnable) log.debug "Initialized (Tuya Lean UI)"
}
def refresh() {
if (logEnable) log.debug "Refresh (no-op; waiting for MQTT updates)"
}
/* ========================= MQTT Core & Resilience ========================= */
private connectMqtt(Boolean forceScramble=false) {
final Integer portInt = safePort()
final String uri = "tcp://${brokerHost}:${portInt}"
final String baseId = "hubitat-tuya-fan-" + device.id
final Boolean scramble= (ALWAYS_UNIQUE_ID || (forceScramble ?: false))
final String clientId= scramble ? "${baseId}-${(now()%100000)}" : baseId
if (logEnable) log.debug "Connecting MQTT ${uri} as ${clientId} (scramble=${scramble})"
try {
sendEvent(name: "mqttStatus", value: "connecting")
interfaces.mqtt.connect(uri, clientId, username ?: null, password ?: null)
} catch (e) {
if (logEnable) log.warn "MQTT connect threw: ${e?.message ?: e}"
scheduleReconnect()
}
}
private scheduleReconnect() {
if (state.connected) return
Integer delay = (state.backoff ?: BACKOFF_MIN) as Integer
delay = Math.max(BACKOFF_MIN, Math.min(delay, BACKOFF_MAX))
Integer jitter = Math.max(1, (int)(delay * 0.2)) // Β±20%
Integer withJitter = delay + (new Random().nextInt(jitter*2+1) - jitter)
try { unschedule(RECONNECT_JOB) } catch (ignored) {}
if (logEnable) log.warn "Scheduling reconnect in ${withJitter}s (backoff=${delay}s)"
runIn(withJitter, RECONNECT_JOB)
state.backoff = Math.min(delay * 2, BACKOFF_MAX)
}
def reconnectStep() {
try { interfaces.mqtt.disconnect() } catch (ignored) {}
pauseExecution(250)
connectMqtt(true)
}
private publishMqtt(String topic, String payload, Boolean retain=false, Integer qos=0) {
try {
interfaces.mqtt.publish(topic, payload, qos, retain)
state.lastMqttOkEpoch = now()
return true
} catch (e) {
if (logEnable) log.warn "Publish failed ${topic}: ${e?.message ?: e} β scheduling reconnect"
state.connected = false
sendEvent(name: "mqttConnected", value: "false")
setStatusOnline(false)
scheduleReconnect()
return false
}
}
private setStatusOnline(Boolean online) {
if (!STATUS_TOPIC_ENABLED) return
def root = activeRootOrNull()
if (!root) return
publishMqtt("${root}/status", online ? "online" : "offline", true, 0)
}
/* ========================= Topic Helpers ========================= */
private String rootFor(String ident) {
def b = (baseTopic?.trim() ?: "")
return b ? "${b}/${ident}" : "${ident}"
}
private List<String> candidateIdentifiers() {
def ids = []
if (deviceIdent?.trim()) ids << deviceIdent.trim() // exactly one field, ID or IP
return ids.unique()
}
private String activeRootOrNull() {
if (state.activeRoot) return state.activeRoot as String
def c = candidateIdentifiers()
return (c && c.size()>0) ? rootFor(c[0]) : null
}
/* State topics (subscribe to BOTH styles) */
private String tStateDP (String root, int dp) { "${root}/DP${dp}" }
private String tStateDps(String root, int dp) { "${root}/dps/${dp}" }
/* Command topics (publish to BOTH styles) */
private String tCmdDP (String root, int dp) { "${root}/DP${dp}/${CMD_SEGMENT}" }
private String tCmdDps (String root, int dp) { "${root}/${CMD_SEGMENT}/${dp}" }
private boolean topicMatchesDp(String topic, String root, int dp) {
return topic == tStateDP(root, dp) || topic == tStateDps(root, dp)
}
/* ========================= Subscriptions ========================= */
def subscribeTopics() {
def cands = candidateIdentifiers()
if (cands && cands.size() > 0) {
def roots = cands.collect { rootFor(it) }
try {
roots.each { r -> DPs.each { dp ->
interfaces.mqtt.subscribe(tStateDP(r, dp))
interfaces.mqtt.subscribe(tStateDps(r, dp))
}}
if (logEnable) log.debug "Subscribed to DP & dps state topics under roots: ${roots}"
} catch (e) {
if (logEnable) log.warn "Subscribe failed: ${e?.message ?: e}"
scheduleReconnect()
}
} else {
// Auto-learn root: wildcard-subscribe and infer from first state message
def wild = bootstrapWildcard()
try {
interfaces.mqtt.subscribe(wild)
state[BOOTSTRAP_TOPIC_KEY] = wild
if (logEnable) log.warn "Bootstrap learn active β subscribed '${wild}'. Will auto-learn on first DP/dps state."
} catch (e) {
if (logEnable) log.warn "Bootstrap subscribe failed: ${e?.message ?: e}"
scheduleReconnect()
}
}
}
private String bootstrapWildcard() {
def b = (baseTopic?.trim() ?: "")
return b ? "${b}/#" : "#"
}
private void learnActiveRootFromTopic(String topic) {
if (state.activeRoot) return
String root = null
def m = (topic =~ /^(.*)\/DP(?:1|3|9)(?:\/.*)?$/)
if (m.matches()) {
root = m[0][1]
} else {
m = (topic =~ /^(.*)\/dps\/(?:1|3|9)$/)
if (m.matches()) root = m[0][1]
}
if (root) {
state.activeRoot = root
sendEvent(name: "activeRoot", value: root)
if (logEnable) log.warn "Active root learned: ${root}"
def wild = (state[BOOTSTRAP_TOPIC_KEY] ?: null)
if (wild) {
try { interfaces.mqtt.unsubscribe(wild) } catch (ignored) {}
state.remove(BOOTSTRAP_TOPIC_KEY)
}
try {
DPs.each { dp ->
interfaces.mqtt.subscribe(tStateDP(root, dp))
interfaces.mqtt.subscribe(tStateDps(root, dp))
}
if (logEnable) log.debug "Resubscribed narrowly to ${root} (DP & dps) for DP1/3/9"
} catch (e) {
if (logEnable) log.warn "Narrow resubscribe failed: ${e?.message ?: e}"
}
}
}
/* ========================= MQTT Callbacks ========================= */
def mqttClientStatus(String status) {
sendEvent(name: "mqttStatus", value: status)
if (logEnable) log.debug "MQTT status: ${status}"
def s = status?.toLowerCase() ?: ""
if (s.contains("succeed")) {
state.connected = true
state.backoff = BACKOFF_MIN
sendEvent(name: "mqttConnected", value: "true")
presenceMark(true)
// Post-connect bootstrap: narrow subs, liveness, heartbeat
runIn(1, "subscribeTopics")
setStatusOnline(true)
postConnectPing()
runIn(2, "heartbeatTick")
} else if (s.startsWith("error") || s.contains("lost") || s.contains("disconnected")) {
state.connected = false
sendEvent(name: "mqttConnected", value: "false")
setStatusOnline(false)
scheduleReconnect()
}
runIn(10, "connectionGuard")
}
private void postConnectPing() {
def root = activeRootOrNull()
if (!root) return
def t = "${root}/driver_boot"
def ts = new Date().format("yyyy-MM-dd'T'HH:mm:ssZ")
publishMqtt(t, ts, false, 0)
if (logEnable) log.debug "Boot ping -> ${t} ${ts}"
}
def parse(String description) {
def msg = interfaces.mqtt.parseMessage(description)
if (!msg?.topic) return
final String topic = msg.topic.toString()
final String payload = (msg.payload == null) ? "" : msg.payload.toString().trim()
state.lastMqttOkEpoch = now()
markSeen()
if (!state.activeRoot) learnActiveRootFromTopic(topic)
final String root = activeRootOrNull()
if (!root) return
// DP1 (power)
if (topicMatchesDp(topic, root, 1)) {
final boolean onNow = asBool(payload)
sendEvent(name: "switch", value: onNow ? "on" : "off")
if (!onNow) sendEvent(name: "speed", value: "off")
if (logEnable) log.debug "DP1 -> ${onNow}"
return
}
// DP3 (speed)
if (topicMatchesDp(topic, root, 3)) {
final String raw = payload.replaceAll("\"","")
final String name = (raw=="1") ? "low" : (raw=="2") ? "medium" : (raw=="3") ? "high" : "off"
// Stabilization: if we want 2/3 and observe transient 1 (low), reassert
final long nowMs = now()
if (state.pendingSpeedRaw && (state.pendingSpeedRaw in ["2","3"]) && raw == "1") {
final long until = (state.stabilizeUntil ?: 0L) as long
if (nowMs <= until) {
if (logEnable) log.debug "DP3 transient low during stabilization; reasserting ${state.pendingSpeedRaw}"
publishDP3(state.pendingSpeedRaw as String)
return
}
}
sendEvent(name: "speed", value: name)
// IMPORTANT: do not force switch=on here; DP1 governs power/UI
if (logEnable) log.debug "DP3 -> ${name} (raw ${raw})"
if (state.pendingSpeedRaw && raw == state.pendingSpeedRaw) {
state.remove("pendingSpeedRaw")
state.remove("stabilizeUntil")
}
return
}
// DP9 (light)
if (topicMatchesDp(topic, root, 9)) {
final boolean onNow = asBool(payload)
childLight()?.sendEvent(name: "switch", value: onNow ? "on":"off")
if (logEnable) log.debug "DP9 -> ${onNow}"
return
}
}
/* ========================= Capabilities ========================= */
def on() {
sendEvent(name:"switch", value:"on") // optimistic UI
publishDP1(true)
}
def off() {
publishDP1(false)
sendEvent(name: "switch", value: "off") // optimistic UI
sendEvent(name: "speed", value: "off")
state.remove("pendingSpeedRaw")
state.remove("stabilizeUntil")
}
private publishDP1(boolean val){
final String root = activeRootOrNull()
if (!root) { log.warn "No active root to publish DP1"; return }
// publish to BOTH command paths for compatibility
publishMqtt(tCmdDP(root, 1), val ? "true" : "false")
publishMqtt(tCmdDps(root,1), val ? "true" : "false")
if (logEnable) log.debug "Publish DP1 ${val} -> ${tCmdDP(root,1)} & ${tCmdDps(root,1)}"
}
def setSpeed(String speed) {
speed = (speed ?: "off").toLowerCase()
if (logEnable) log.debug "setSpeed requested: ${speed}"
switch (speed) {
case "off":
off()
return
case "low":
case "medium":
case "high":
final String v = (speed == "low") ? "1" : (speed == "medium") ? "2" : "3"
ensureFanPowerOn()
state.pendingSpeedRaw = v
state.stabilizeUntil = now() + 5000 // 5s window
runInMillis(500, 'publishInitialSpeed')
runIn(RESEND_DELAY_SEC, "reassertPendingSpeed")
sendEvent(name:"switch", value:"on")
sendEvent(name:"speed", value:speed)
return
default:
if (logEnable) log.debug "Unsupported speed '${speed}', defaulting to medium"
setSpeed("medium")
return
}
}
def cycleSpeed() {
def cur = (device.currentValue("speed") ?: "off").toLowerCase()
def idx = SPEED_ORDER.indexOf(cur); if (idx < 0) idx = 0
setSpeed(SPEED_ORDER[(idx + 1) % SPEED_ORDER.size()])
}
private ensureFanPowerOn() {
if ((device.currentValue("switch") ?: "off") != "on") publishDP1(true)
else publishDP1(true) // harmless edge bump
}
// helper invoked by runInMillis from setSpeed
def publishInitialSpeed() { def v = state.pendingSpeedRaw; if (v) publishDP3(v as String) }
def reassertPendingSpeed() {
def v = state.pendingSpeedRaw
if (!v) return
if (logEnable) log.debug "Re-asserting DP3 ${v} after delay"
publishDP3(v as String)
}
private publishDP3(String v) {
final String root = activeRootOrNull()
if (!root) { log.warn "No active root to publish DP3"; return }
final String payload = "\"${v.replaceAll('\"','')}\""
// publish to BOTH command paths
publishMqtt(tCmdDP(root, 3), payload)
publishMqtt(tCmdDps(root,3), payload)
if (logEnable) log.debug "Publish DP3 ${payload} -> ${tCmdDP(root,3)} & ${tCmdDps(root,3)}"
}
private publishSupportedSpeeds() {
sendEvent(name: "supportedFanSpeeds", value: '["off","low","medium","high"]', isStateChange: true)
}
/* ========================= Child light (DP9) ========================= */
private ensureLightChild() {
if (!createLightChild) return
if (!childLight()) {
final String dni = "${device.deviceNetworkId}-light"
try {
def cd = addChildDevice("hubitat", "Generic Component Switch", dni,
[name: "${device.displayName} Light", label: "${device.displayName} Light", isComponent: true])
if (logEnable) log.debug "Created child light: ${cd?.deviceNetworkId}"
} catch (e) {
log.warn "Cannot create child light: ${e}"
}
}
}
private childLight() { getChildDevice("${device.deviceNetworkId}-light") }
def componentOn(cd) {
final String root = activeRootOrNull(); if (!root) return
publishMqtt(tCmdDP(root, 9), "true")
publishMqtt(tCmdDps(root,9), "true")
if (logEnable) log.debug "Publish DP9 true"
}
def componentOff(cd) {
final String root = activeRootOrNull(); if (!root) return
publishMqtt(tCmdDP(root, 9), "false")
publishMqtt(tCmdDps(root,9), "false")
if (logEnable) log.debug "Publish DP9 false"
}
def componentRefresh(cd) { if (logEnable) log.debug "componentRefresh (no-op)" }
/* ========================= Presence ========================= */
private markSeen() {
state.lastSeenEpoch = now()
sendEvent(name: "lastSeen", value: new Date(state.lastSeenEpoch).format("yyyy-MM-dd HH:mm:ss"))
presenceMark(true)
}
private presenceTick() {
final long last = (state.lastSeenEpoch ?: 0L) as long
final long ageS = (now() - last) / 1000L
presenceMark(ageS <= PRESENCE_TIMEOUT_SEC)
}
private presenceMark(boolean present) {
final String want = present ? "present" : "not present"
if (device.currentValue("presence") != want) sendEvent(name: "presence", value: want)
}
/* ========================= Heartbeat, Guard & Watchdog ========================= */
def startWatchdogLoop() {
runIn(Math.max(5, WATCHDOG_PERIOD), "watchdogTick")
if (KEEPALIVE_SEC > 0) runIn(Math.max(5, KEEPALIVE_SEC), "heartbeatTick")
}
private boolean mqttIsUp() { return (state.connected == true) }
def heartbeatTick() {
if (KEEPALIVE_SEC <= 0) return
if (!mqttIsUp()) { runIn(Math.max(5, KEEPALIVE_SEC), "heartbeatTick"); return }
def root = activeRootOrNull()
if (root) {
final String topic = "${root}/heartbeat"
final String ts = new Date().format("yyyy-MM-dd'T'HH:mm:ssZ")
publishMqtt(topic, ts, false, 0)
if (logEnable) log.debug "Heartbeat -> ${topic} ${ts}"
}
runIn(Math.max(5, KEEPALIVE_SEC), "heartbeatTick")
}
def watchdogTick() {
if (!state.connected) {
if (logEnable) log.debug "Watchdog: not connected; scheduling reconnect"
scheduleReconnect()
} else {
final long idleSec = (now() - (state.lastMqttOkEpoch ?: 0L)) / 1000L
if (KEEPALIVE_SEC > 0 && idleSec > (KEEPALIVE_SEC * 2)) {
if (logEnable) log.debug "Watchdog: idle ${idleSec}s > ${KEEPALIVE_SEC*2}s, sending proactive heartbeat"
heartbeatTick()
}
}
runIn(Math.max(5, WATCHDOG_PERIOD), "watchdogTick")
}
def connectionGuard() {
if (!state.connected) {
if (logEnable) log.warn "Connection guard: still not connected; forcing reconnect"
try { interfaces.mqtt.disconnect() } catch (ignored) {}
scheduleReconnect()
}
}
/* ========================= Validation & Helpers ========================= */
private void validatePrefs() {
if (!brokerHost?.trim()) throw new IllegalArgumentException("MQTT Broker Host is required")
// sanitize port
try {
Integer p = (brokerPort?.toString()?.replaceAll("[^0-9]", "") ?: "1883") as Integer
if (p <= 0 || p > 65535) throw new Exception("out of range")
} catch (ignored) {
log.warn "Invalid broker port '${brokerPort}', defaulting to 1883"
}
// baseTopic: normalize extra slashes
if (baseTopic != null) {
def norm = baseTopic.trim().replaceAll(/^\/+|\/+$/,"")
if (norm != baseTopic) device.updateSetting("baseTopic",[value:norm,type:"text"])
}
// deviceIdent: allow blank (auto-learn). If provided, trim spaces.
if (deviceIdent != null) {
def did = deviceIdent.trim()
if (did != deviceIdent) device.updateSetting("deviceIdent",[value:did,type:"text"])
}
}
private Integer safePort() {
try {
def cleaned = (brokerPort?.toString()?.replaceAll("[^0-9]", "") ?: "1883")
Integer p = cleaned as Integer
if (p <= 0 || p > 65535) return 1883
return p
} catch (e) {
return 1883
}
}
private static boolean asBool(def v) {
def s = (v==null ? "" : v.toString().trim().toLowerCase())
return (s in ["true","on","1","\"true\"","\"on\"","\"1\""])
}
Hope this helps someone out there. Please modify as needed. Don't ask me how this works, it was generated using ChatGPT 5.