MQTT Tuya WiFi Fan/Light Driver for Local Control

Hi all, I have this device installed in my home QIACHIP Upgrade Universal WIFI Ceiling Fan and I wanted to get local control over it using MQTT. Of course its a Tuya device. So, I have a NAS from QNAP which I run Container Station and created a MQTT server and used MQTT Client for local Tuya devices by Volker76 on github here. Follow the instructions there on setup. Here's the YML configuration file I used.

YML Code:
services:
  tuya-mqtt:
    image: volkerhaensel/tuya_mqtt.net:latest
    container_name: tuya-mqtt
    networks:
      macvlan_net:
        ipv4_address: YOUR.IP.ADDRESS.HERE   # Assign a unique LAN IP for Tuya-MQTT (e.g., 192.168.1.230)
    volumes:
      - tuya.net_config:/app/DataDir         # DataDir is case-sensitive
    restart: unless-stopped
    expose:
      - "80/tcp"                             # Web UI β†’ http://YOUR.IP.ADDRESS.HERE
      - "6666/udp"                           # Tuya discovery (unencrypted)
      - "6667/udp"                           # Tuya discovery (encrypted)
    healthcheck:
      test: ["CMD-SHELL", "wget -q -O /dev/null http://localhost:80 || exit 1"]
      interval: 60s
      timeout: 5s
      retries: 5
      start_period: 20s

  mosquitto:
    image: eclipse-mosquitto:2
    container_name: tuya-mosquitto
    networks:
      macvlan_net:
        ipv4_address: YOUR.IP.ADDRESS.HERE   # Assign a DIFFERENT unique LAN IP for Mosquitto (e.g., 192.168.1.232)
    expose:
      - "1883/tcp"                           # Clients connect β†’ tcp://YOUR.IP.ADDRESS.HERE:1883
    volumes:
      - ./mosquitto:/mosquitto               # Place mosquitto.conf in ./mosquitto/config/
    restart: unless-stopped

networks:
  macvlan_net:
    driver: macvlan
    driver_opts:
      parent: br0                            # Change if your LAN interface is different
    ipam:
      config:
        - subnet: 192.168.1.0/24             # Match your LAN subnet (change if needed)
          gateway: 192.168.1.1               # Change to your router’s gateway IP

volumes:
  tuya.net_config:

I entered into the Tuya-MQTT, set that up as per Volker76's github page. I was able to identified the device in my network and connected to the device. Of note, you may need a Tuya IOT login for this, again, see Volker76's github page for details.

After the fact I created this Driver using our AI overlord's help. Installed the driver into Hubitat, added a Virtual Device using the driver and coifgured the device under the device's preferences. It will ask for your mqtt Mosquitto server IP address (not the MQTT-Tuya IP address) and the Device ID, IP or Name.

You initialize and then you will then be able to control your Tuya device.

Driver code:
/**
 * Tuya Lean UI β€” MQTT Fan+Light (Local)
 * ------------------------------------------------------------
 * Minimal, production-ready Hubitat driver with a simplified Preferences panel.
 *
 * Design:
 *  - No β€œtopic style” toggle β€” inbound auto-handles DP & dps; commands publish to BOTH.
 *  - No identifier mode β€” provide ONE identifier (ID or IP). If blank, auto-learn via wildcard.
 *  - Hidden sane defaults for presence timeout, re-assert delay, keepalive, ghost-session avoidance.
 *  - DP3 (speed) never forces Switch "on" β†’ off state persists correctly.
 *  - Robust MQTT: unique clientId by default, single queued reconnect with backoff+jitter,
 *    heartbeat keepalive, watchdog, connection guard, safe publish, retained status.
 *
 * Features:
 *  - DP1 β†’ Switch (power)
 *  - DP3 β†’ FanControl (low/medium/high)
 *  - DP9 β†’ Child "Generic Component Switch" (optional)
 *  - Presence inferred from inbound MQTT activity
 *
 * Platform: Hubitat
 * License: MIT
 * Author: you
 */

import groovy.transform.Field

/* ───────── Internal Tunables (not exposed in UI) ───────── */
@Field static final Integer PRESENCE_TIMEOUT_SEC = 120
@Field static final Integer RESEND_DELAY_SEC     = 2      // re-assert speed once after this delay
@Field static final Integer KEEPALIVE_SEC        = 60     // heartbeat interval
@Field static final Integer WATCHDOG_PERIOD      = 30
@Field static final Integer BACKOFF_MIN          = 5
@Field static final Integer BACKOFF_MAX          = 300
@Field static final Boolean ALWAYS_UNIQUE_ID     = true   // avoid ghost sessions
@Field static final Boolean STATUS_TOPIC_ENABLED = true   // publish retained online/offline

/* ───────── Constants ───────── */
@Field static final List<String>  SPEED_ORDER         = ["off","low","medium","high"]
@Field static final List<Integer> DPs                 = [1,3,9]
@Field static final String        RECONNECT_JOB       = "reconnectStep"
@Field static final String        BOOTSTRAP_TOPIC_KEY = "__bootstrapWildcard__"
@Field static final String        CMD_SEGMENT         = "command"  // fixed outbound segment

metadata {
  definition(name: "Tuya Lean UI", namespace: "public", author: "you") {
    capability "Initialize"
    capability "Switch"
    capability "FanControl"
    capability "Refresh"
    capability "PresenceSensor"

    attribute "supportedFanSpeeds", "JSON_OBJECT"
    attribute "speed", "STRING"
    attribute "lastSeen", "STRING"
    attribute "mqttStatus", "STRING"
    attribute "activeRoot", "STRING"
    attribute "mqttConnected", "STRING" // "true"/"false"
  }

  /* ───────── Minimal Preferences (only what’s necessary) ───────── */
  preferences {
    input name: "brokerHost",  type: "text",      title: "MQTT Broker Host", defaultValue: "192.168.1.10", required: true
    input name: "brokerPort",  type: "text",      title: "MQTT Broker Port", defaultValue: "1883",          required: true
    input name: "username",    type: "text",      title: "MQTT Username (optional)"
    input name: "password",    type: "password",  title: "MQTT Password (optional)"
    input name: "baseTopic",   type: "text",      title: "Base topic (blank = none, e.g., 'tuya')", defaultValue: ""
    input name: "deviceIdent", type: "text",      title: "Device identifier (ID or IP). Leave blank to auto-learn.", required: false
    input name: "createLightChild", type: "bool", title: "Expose DP9 as child light", defaultValue: true
    input name: "logEnable",   type: "bool",      title: "Enable debug logging", defaultValue: true
  }
}

/* ========================= Lifecycle ========================= */

def installed() { initialize() }
def updated()   { initialize() }
def uninstalled() {
  try { interfaces.mqtt.disconnect() } catch(e) {}
  unschedule()
}

def initialize() {
  unschedule()
  try { interfaces.mqtt.disconnect() } catch(e) {}

  state.lastSeenEpoch   = 0L
  state.lastMqttOkEpoch = 0L
  state.activeRoot      = null
  state.connected       = false
  state.backoff         = BACKOFF_MIN
  state.remove("pendingSpeedRaw")
  state.remove("stabilizeUntil")

  validatePrefs()
  connectMqtt(false)
  runIn(10, "connectionGuard")

  runIn(2, "subscribeTopics")
  runIn(3, "publishSupportedSpeeds")
  runIn(4, "ensureLightChild")
  runIn(5, "startWatchdogLoop")
  runEvery1Minute("presenceTick")
  if (logEnable) log.debug "Initialized (Tuya Lean UI)"
}

def refresh() {
  if (logEnable) log.debug "Refresh (no-op; waiting for MQTT updates)"
}

/* ========================= MQTT Core & Resilience ========================= */

private connectMqtt(Boolean forceScramble=false) {
  final Integer portInt = safePort()
  final String  uri     = "tcp://${brokerHost}:${portInt}"
  final String  baseId  = "hubitat-tuya-fan-" + device.id
  final Boolean scramble= (ALWAYS_UNIQUE_ID || (forceScramble ?: false))
  final String  clientId= scramble ? "${baseId}-${(now()%100000)}" : baseId

  if (logEnable) log.debug "Connecting MQTT ${uri} as ${clientId} (scramble=${scramble})"
  try {
    sendEvent(name: "mqttStatus", value: "connecting")
    interfaces.mqtt.connect(uri, clientId, username ?: null, password ?: null)
  } catch (e) {
    if (logEnable) log.warn "MQTT connect threw: ${e?.message ?: e}"
    scheduleReconnect()
  }
}

private scheduleReconnect() {
  if (state.connected) return
  Integer delay = (state.backoff ?: BACKOFF_MIN) as Integer
  delay = Math.max(BACKOFF_MIN, Math.min(delay, BACKOFF_MAX))

  Integer jitter = Math.max(1, (int)(delay * 0.2)) // Β±20%
  Integer withJitter = delay + (new Random().nextInt(jitter*2+1) - jitter)

  try { unschedule(RECONNECT_JOB) } catch (ignored) {}
  if (logEnable) log.warn "Scheduling reconnect in ${withJitter}s (backoff=${delay}s)"
  runIn(withJitter, RECONNECT_JOB)
  state.backoff = Math.min(delay * 2, BACKOFF_MAX)
}

def reconnectStep() {
  try { interfaces.mqtt.disconnect() } catch (ignored) {}
  pauseExecution(250)
  connectMqtt(true)
}

private publishMqtt(String topic, String payload, Boolean retain=false, Integer qos=0) {
  try {
    interfaces.mqtt.publish(topic, payload, qos, retain)
    state.lastMqttOkEpoch = now()
    return true
  } catch (e) {
    if (logEnable) log.warn "Publish failed ${topic}: ${e?.message ?: e} β†’ scheduling reconnect"
    state.connected = false
    sendEvent(name: "mqttConnected", value: "false")
    setStatusOnline(false)
    scheduleReconnect()
    return false
  }
}

private setStatusOnline(Boolean online) {
  if (!STATUS_TOPIC_ENABLED) return
  def root = activeRootOrNull()
  if (!root) return
  publishMqtt("${root}/status", online ? "online" : "offline", true, 0)
}

/* ========================= Topic Helpers ========================= */

private String rootFor(String ident) {
  def b = (baseTopic?.trim() ?: "")
  return b ? "${b}/${ident}" : "${ident}"
}

private List<String> candidateIdentifiers() {
  def ids = []
  if (deviceIdent?.trim()) ids << deviceIdent.trim()  // exactly one field, ID or IP
  return ids.unique()
}

private String activeRootOrNull() {
  if (state.activeRoot) return state.activeRoot as String
  def c = candidateIdentifiers()
  return (c && c.size()>0) ? rootFor(c[0]) : null
}

/* State topics (subscribe to BOTH styles) */
private String tStateDP (String root, int dp) { "${root}/DP${dp}" }
private String tStateDps(String root, int dp) { "${root}/dps/${dp}" }

/* Command topics (publish to BOTH styles) */
private String tCmdDP  (String root, int dp) { "${root}/DP${dp}/${CMD_SEGMENT}" }
private String tCmdDps (String root, int dp) { "${root}/${CMD_SEGMENT}/${dp}" }

private boolean topicMatchesDp(String topic, String root, int dp) {
  return topic == tStateDP(root, dp) || topic == tStateDps(root, dp)
}

/* ========================= Subscriptions ========================= */

def subscribeTopics() {
  def cands = candidateIdentifiers()
  if (cands && cands.size() > 0) {
    def roots = cands.collect { rootFor(it) }
    try {
      roots.each { r -> DPs.each { dp ->
        interfaces.mqtt.subscribe(tStateDP(r,  dp))
        interfaces.mqtt.subscribe(tStateDps(r, dp))
      }}
      if (logEnable) log.debug "Subscribed to DP & dps state topics under roots: ${roots}"
    } catch (e) {
      if (logEnable) log.warn "Subscribe failed: ${e?.message ?: e}"
      scheduleReconnect()
    }
  } else {
    // Auto-learn root: wildcard-subscribe and infer from first state message
    def wild = bootstrapWildcard()
    try {
      interfaces.mqtt.subscribe(wild)
      state[BOOTSTRAP_TOPIC_KEY] = wild
      if (logEnable) log.warn "Bootstrap learn active β€” subscribed '${wild}'. Will auto-learn on first DP/dps state."
    } catch (e) {
      if (logEnable) log.warn "Bootstrap subscribe failed: ${e?.message ?: e}"
      scheduleReconnect()
    }
  }
}

private String bootstrapWildcard() {
  def b = (baseTopic?.trim() ?: "")
  return b ? "${b}/#" : "#"
}

private void learnActiveRootFromTopic(String topic) {
  if (state.activeRoot) return
  String root = null
  def m = (topic =~ /^(.*)\/DP(?:1|3|9)(?:\/.*)?$/)
  if (m.matches()) {
    root = m[0][1]
  } else {
    m = (topic =~ /^(.*)\/dps\/(?:1|3|9)$/)
    if (m.matches()) root = m[0][1]
  }
  if (root) {
    state.activeRoot = root
    sendEvent(name: "activeRoot", value: root)
    if (logEnable) log.warn "Active root learned: ${root}"
    def wild = (state[BOOTSTRAP_TOPIC_KEY] ?: null)
    if (wild) {
      try { interfaces.mqtt.unsubscribe(wild) } catch (ignored) {}
      state.remove(BOOTSTRAP_TOPIC_KEY)
    }
    try {
      DPs.each { dp ->
        interfaces.mqtt.subscribe(tStateDP(root,  dp))
        interfaces.mqtt.subscribe(tStateDps(root, dp))
      }
      if (logEnable) log.debug "Resubscribed narrowly to ${root} (DP & dps) for DP1/3/9"
    } catch (e) {
      if (logEnable) log.warn "Narrow resubscribe failed: ${e?.message ?: e}"
    }
  }
}

/* ========================= MQTT Callbacks ========================= */

def mqttClientStatus(String status) {
  sendEvent(name: "mqttStatus", value: status)
  if (logEnable) log.debug "MQTT status: ${status}"
  def s = status?.toLowerCase() ?: ""
  if (s.contains("succeed")) {
    state.connected = true
    state.backoff   = BACKOFF_MIN
    sendEvent(name: "mqttConnected", value: "true")
    presenceMark(true)

    // Post-connect bootstrap: narrow subs, liveness, heartbeat
    runIn(1, "subscribeTopics")
    setStatusOnline(true)
    postConnectPing()

    runIn(2, "heartbeatTick")
  } else if (s.startsWith("error") || s.contains("lost") || s.contains("disconnected")) {
    state.connected = false
    sendEvent(name: "mqttConnected", value: "false")
    setStatusOnline(false)
    scheduleReconnect()
  }
  runIn(10, "connectionGuard")
}

private void postConnectPing() {
  def root = activeRootOrNull()
  if (!root) return
  def t  = "${root}/driver_boot"
  def ts = new Date().format("yyyy-MM-dd'T'HH:mm:ssZ")
  publishMqtt(t, ts, false, 0)
  if (logEnable) log.debug "Boot ping -> ${t} ${ts}"
}

def parse(String description) {
  def msg = interfaces.mqtt.parseMessage(description)
  if (!msg?.topic) return
  final String topic   = msg.topic.toString()
  final String payload = (msg.payload == null) ? "" : msg.payload.toString().trim()

  state.lastMqttOkEpoch = now()
  markSeen()

  if (!state.activeRoot) learnActiveRootFromTopic(topic)
  final String root = activeRootOrNull()
  if (!root) return

  // DP1 (power)
  if (topicMatchesDp(topic, root, 1)) {
    final boolean onNow = asBool(payload)
    sendEvent(name: "switch", value: onNow ? "on" : "off")
    if (!onNow) sendEvent(name: "speed", value: "off")
    if (logEnable) log.debug "DP1 -> ${onNow}"
    return
  }

  // DP3 (speed)
  if (topicMatchesDp(topic, root, 3)) {
    final String raw  = payload.replaceAll("\"","")
    final String name = (raw=="1") ? "low" : (raw=="2") ? "medium" : (raw=="3") ? "high" : "off"

    // Stabilization: if we want 2/3 and observe transient 1 (low), reassert
    final long nowMs = now()
    if (state.pendingSpeedRaw && (state.pendingSpeedRaw in ["2","3"]) && raw == "1") {
      final long until = (state.stabilizeUntil ?: 0L) as long
      if (nowMs <= until) {
        if (logEnable) log.debug "DP3 transient low during stabilization; reasserting ${state.pendingSpeedRaw}"
        publishDP3(state.pendingSpeedRaw as String)
        return
      }
    }

    sendEvent(name: "speed", value: name)
    // IMPORTANT: do not force switch=on here; DP1 governs power/UI
    if (logEnable) log.debug "DP3 -> ${name} (raw ${raw})"

    if (state.pendingSpeedRaw && raw == state.pendingSpeedRaw) {
      state.remove("pendingSpeedRaw")
      state.remove("stabilizeUntil")
    }
    return
  }

  // DP9 (light)
  if (topicMatchesDp(topic, root, 9)) {
    final boolean onNow = asBool(payload)
    childLight()?.sendEvent(name: "switch", value: onNow ? "on":"off")
    if (logEnable) log.debug "DP9 -> ${onNow}"
    return
  }
}

/* ========================= Capabilities ========================= */

def on()  {
  sendEvent(name:"switch", value:"on") // optimistic UI
  publishDP1(true)
}
def off() {
  publishDP1(false)
  sendEvent(name: "switch", value: "off") // optimistic UI
  sendEvent(name: "speed",  value: "off")
  state.remove("pendingSpeedRaw")
  state.remove("stabilizeUntil")
}

private publishDP1(boolean val){
  final String root = activeRootOrNull()
  if (!root) { log.warn "No active root to publish DP1"; return }
  // publish to BOTH command paths for compatibility
  publishMqtt(tCmdDP(root, 1), val ? "true" : "false")
  publishMqtt(tCmdDps(root,1), val ? "true" : "false")
  if (logEnable) log.debug "Publish DP1 ${val} -> ${tCmdDP(root,1)} & ${tCmdDps(root,1)}"
}

def setSpeed(String speed) {
  speed = (speed ?: "off").toLowerCase()
  if (logEnable) log.debug "setSpeed requested: ${speed}"

  switch (speed) {
    case "off":
      off()
      return

    case "low":
    case "medium":
    case "high":
      final String v = (speed == "low") ? "1" : (speed == "medium") ? "2" : "3"

      ensureFanPowerOn()

      state.pendingSpeedRaw = v
      state.stabilizeUntil  = now() + 5000 // 5s window
      runInMillis(500, 'publishInitialSpeed')

      runIn(RESEND_DELAY_SEC, "reassertPendingSpeed")

      sendEvent(name:"switch", value:"on")
      sendEvent(name:"speed",  value:speed)
      return

    default:
      if (logEnable) log.debug "Unsupported speed '${speed}', defaulting to medium"
      setSpeed("medium")
      return
  }
}

def cycleSpeed() {
  def cur = (device.currentValue("speed") ?: "off").toLowerCase()
  def idx = SPEED_ORDER.indexOf(cur); if (idx < 0) idx = 0
  setSpeed(SPEED_ORDER[(idx + 1) % SPEED_ORDER.size()])
}

private ensureFanPowerOn() {
  if ((device.currentValue("switch") ?: "off") != "on") publishDP1(true)
  else publishDP1(true) // harmless edge bump
}

// helper invoked by runInMillis from setSpeed
def publishInitialSpeed() { def v = state.pendingSpeedRaw; if (v) publishDP3(v as String) }
def reassertPendingSpeed() {
  def v = state.pendingSpeedRaw
  if (!v) return
  if (logEnable) log.debug "Re-asserting DP3 ${v} after delay"
  publishDP3(v as String)
}

private publishDP3(String v) {
  final String root = activeRootOrNull()
  if (!root) { log.warn "No active root to publish DP3"; return }
  final String payload = "\"${v.replaceAll('\"','')}\""
  // publish to BOTH command paths
  publishMqtt(tCmdDP(root, 3), payload)
  publishMqtt(tCmdDps(root,3), payload)
  if (logEnable) log.debug "Publish DP3 ${payload} -> ${tCmdDP(root,3)} & ${tCmdDps(root,3)}"
}

private publishSupportedSpeeds() {
  sendEvent(name: "supportedFanSpeeds", value: '["off","low","medium","high"]', isStateChange: true)
}

/* ========================= Child light (DP9) ========================= */

private ensureLightChild() {
  if (!createLightChild) return
  if (!childLight()) {
    final String dni = "${device.deviceNetworkId}-light"
    try {
      def cd = addChildDevice("hubitat", "Generic Component Switch", dni,
        [name: "${device.displayName} Light", label: "${device.displayName} Light", isComponent: true])
      if (logEnable) log.debug "Created child light: ${cd?.deviceNetworkId}"
    } catch (e) {
      log.warn "Cannot create child light: ${e}"
    }
  }
}
private childLight() { getChildDevice("${device.deviceNetworkId}-light") }

def componentOn(cd)  {
  final String root = activeRootOrNull(); if (!root) return
  publishMqtt(tCmdDP(root, 9), "true")
  publishMqtt(tCmdDps(root,9), "true")
  if (logEnable) log.debug "Publish DP9 true"
}
def componentOff(cd) {
  final String root = activeRootOrNull(); if (!root) return
  publishMqtt(tCmdDP(root, 9), "false")
  publishMqtt(tCmdDps(root,9), "false")
  if (logEnable) log.debug "Publish DP9 false"
}
def componentRefresh(cd) { if (logEnable) log.debug "componentRefresh (no-op)" }

/* ========================= Presence ========================= */

private markSeen() {
  state.lastSeenEpoch = now()
  sendEvent(name: "lastSeen", value: new Date(state.lastSeenEpoch).format("yyyy-MM-dd HH:mm:ss"))
  presenceMark(true)
}
private presenceTick() {
  final long last = (state.lastSeenEpoch ?: 0L) as long
  final long ageS = (now() - last) / 1000L
  presenceMark(ageS <= PRESENCE_TIMEOUT_SEC)
}
private presenceMark(boolean present) {
  final String want = present ? "present" : "not present"
  if (device.currentValue("presence") != want) sendEvent(name: "presence", value: want)
}

/* ========================= Heartbeat, Guard & Watchdog ========================= */

def startWatchdogLoop() {
  runIn(Math.max(5, WATCHDOG_PERIOD), "watchdogTick")
  if (KEEPALIVE_SEC > 0) runIn(Math.max(5, KEEPALIVE_SEC), "heartbeatTick")
}
private boolean mqttIsUp() { return (state.connected == true) }

def heartbeatTick() {
  if (KEEPALIVE_SEC <= 0) return
  if (!mqttIsUp()) { runIn(Math.max(5, KEEPALIVE_SEC), "heartbeatTick"); return }

  def root = activeRootOrNull()
  if (root) {
    final String topic = "${root}/heartbeat"
    final String ts    = new Date().format("yyyy-MM-dd'T'HH:mm:ssZ")
    publishMqtt(topic, ts, false, 0)
    if (logEnable) log.debug "Heartbeat -> ${topic} ${ts}"
  }
  runIn(Math.max(5, KEEPALIVE_SEC), "heartbeatTick")
}

def watchdogTick() {
  if (!state.connected) {
    if (logEnable) log.debug "Watchdog: not connected; scheduling reconnect"
    scheduleReconnect()
  } else {
    final long idleSec = (now() - (state.lastMqttOkEpoch ?: 0L)) / 1000L
    if (KEEPALIVE_SEC > 0 && idleSec > (KEEPALIVE_SEC * 2)) {
      if (logEnable) log.debug "Watchdog: idle ${idleSec}s > ${KEEPALIVE_SEC*2}s, sending proactive heartbeat"
      heartbeatTick()
    }
  }
  runIn(Math.max(5, WATCHDOG_PERIOD), "watchdogTick")
}

def connectionGuard() {
  if (!state.connected) {
    if (logEnable) log.warn "Connection guard: still not connected; forcing reconnect"
    try { interfaces.mqtt.disconnect() } catch (ignored) {}
    scheduleReconnect()
  }
}

/* ========================= Validation & Helpers ========================= */

private void validatePrefs() {
  if (!brokerHost?.trim()) throw new IllegalArgumentException("MQTT Broker Host is required")
  // sanitize port
  try {
    Integer p = (brokerPort?.toString()?.replaceAll("[^0-9]", "") ?: "1883") as Integer
    if (p <= 0 || p > 65535) throw new Exception("out of range")
  } catch (ignored) {
    log.warn "Invalid broker port '${brokerPort}', defaulting to 1883"
  }
  // baseTopic: normalize extra slashes
  if (baseTopic != null) {
    def norm = baseTopic.trim().replaceAll(/^\/+|\/+$/,"")
    if (norm != baseTopic) device.updateSetting("baseTopic",[value:norm,type:"text"])
  }
  // deviceIdent: allow blank (auto-learn). If provided, trim spaces.
  if (deviceIdent != null) {
    def did = deviceIdent.trim()
    if (did != deviceIdent) device.updateSetting("deviceIdent",[value:did,type:"text"])
  }
}

private Integer safePort() {
  try {
    def cleaned = (brokerPort?.toString()?.replaceAll("[^0-9]", "") ?: "1883")
    Integer p = cleaned as Integer
    if (p <= 0 || p > 65535) return 1883
    return p
  } catch (e) {
    return 1883
  }
}

private static boolean asBool(def v) {
  def s = (v==null ? "" : v.toString().trim().toLowerCase())
  return (s in ["true","on","1","\"true\"","\"on\"","\"1\""])
}

Hope this helps someone out there. Please modify as needed. Don't ask me how this works, it was generated using ChatGPT 5.

1 Like

This code is larger and has more options for you to change and debug.
The code in the original post is leaner and simpler.

/**
 * Tuya MQTT Fan+Light (Local) 
 * ------------------------------------------------------------
 * - Robust MQTT: single queued reconnect with backoff+jitter, heartbeat keepalive,
 *   watchdog, connection guard, safe publish, online/offline retained status.
 * - Bootstrap Learn Mode: if no identifier is set, subscribe to wildcard and
 *   auto-learn the active root from first state topic.
 * - Correct Switch UI: DP3 no longer forces "switch=on". UI stays "off" after power-off.
 * - Minimal redundancy, small helpers, optimistic UI updates for snappy tiles.
 *
 * Features:
 *  - DP1 (power)  -> Switch
 *  - DP3 (1/2/3)  -> FanControl (low/medium/high)
 *  - DP9 (light)  -> Child "Generic Component Switch"
 *  - Presence -> based on inbound MQTT activity
 *  - Topic styles: DP or dps
 *  - Identifier Mode: Auto / ID/Name / IP, with active-root learning
 *
 * Platform: Hubitat
 * License: MIT
 * Author: the family
 */

import groovy.transform.Field

@Field static final List<String> SPEED_ORDER     = ["off","low","medium","high"]
@Field static final List<Integer> DPs            = [1,3,9]
@Field static final Integer BACKOFF_MIN          = 5     // seconds
@Field static final Integer BACKOFF_MAX          = 300   // 5 minutes max
@Field static final Integer WATCHDOG_PERIOD      = 30    // seconds
@Field static final Integer DEFAULT_KEEPALIVE    = 30
@Field static final String  RECONNECT_JOB        = "reconnectStep"
@Field static final String  BOOTSTRAP_TOPIC_KEY  = "__bootstrapWildcard__"

metadata {
  definition(name: "Tuya MQTT Fan+Light (Local) β€” Resilient", namespace: "public", author: "you") {
    capability "Initialize"
    capability "Switch"            // DP1
    capability "FanControl"        // DP3 (1/2/3 -> low/medium/high)
    capability "Refresh"
    capability "PresenceSensor"

    command "testMqtt"
    command "reconnectNow"

    attribute "supportedFanSpeeds", "JSON_OBJECT"
    attribute "speed", "STRING"
    attribute "lastSeen", "STRING"
    attribute "mqttStatus", "STRING"
    attribute "activeRoot", "STRING"
    attribute "mqttConnected", "STRING" // "true"/"false"
  }

  preferences {
    // MQTT
    input name: "brokerHost", type: "text", title: "MQTT Broker Host", defaultValue: "192.168.1.10", required: true
    input name: "brokerPort", type: "text", title: "MQTT Broker Port (e.g., 1883)", defaultValue: "1883", required: true
    input name: "username",   type: "text", title: "MQTT Username (optional)"
    input name: "password",   type: "password", title: "MQTT Password (optional)"

    // Topics
    input name: "baseTopic",  type: "text", title: "Base topic (blank = none, e.g., 'tuya')", defaultValue: ""
    input name: "topicStyle", type: "enum", title: "Topic style", options: ["DP","dps"], defaultValue: "DP"
    input name: "commandSegment", type: "text", title: "Command segment", defaultValue: "command"

    // Identifier strategy
    input name: "identifierMode", type: "enum", title: "Identifier Mode",
      options: ["Auto","ID/Name","IP"], defaultValue: "Auto"
    input name: "deviceId", type: "text", title: "Device ID or Name (as seen in MQTT)"
    input name: "deviceIp", type: "text", title: "Device IP (e.g., 192.168.1.50)"

    // Behavior
    input name: "presenceTimeoutSec", type: "number", title: "Presence timeout (seconds)", defaultValue: 120
    input name: "resendDelaySec", type: "number", title: "Re-assert speed delay (seconds)", defaultValue: 1
    input name: "createLightChild", type: "bool", title: "Expose DP9 as child light", defaultValue: true

    // Resilience
    input name: "keepalivePingSec", type: "number", title: "Heartbeat ping interval (seconds, 0 = off)", defaultValue: DEFAULT_KEEPALIVE
    input name: "statusTopicEnabled", type: "bool", title: "Publish retained online/offline status", defaultValue: true
    input name: "alwaysUniqueClientId", type: "bool", title: "Always use unique clientId (avoid ghost sessions)", defaultValue: true
    input name: "enableBootstrapLearn", type: "bool", title: "If no identifier, wildcard-subscribe and auto-learn root", defaultValue: true

    // Logging
    input name: "logEnable", type: "bool", title: "Enable debug logging", defaultValue: true
  }
}

/* ========================= Lifecycle ========================= */

def installed()   { initialize() }
def updated()     { initialize() }
def uninstalled() {
  try { interfaces.mqtt.disconnect() } catch(e) {}
  unschedule()
}

def initialize() {
  unschedule()
  try { interfaces.mqtt.disconnect() } catch(e) {}
  state.lastSeenEpoch   = 0L
  state.lastMqttOkEpoch = 0L
  state.activeRoot      = null
  state.connected       = false
  state.backoff         = BACKOFF_MIN
  state.remove("pendingSpeedRaw")
  state.remove("stabilizeUntil")
  state.remove("offGuardUntil")
  validatePrefs()

  connectMqtt(false)
  runIn(10, "connectionGuard")

  runIn(2, "subscribeTopics")
  runIn(3, "publishSupportedSpeeds")
  runIn(4, "ensureLightChild")
  runIn(5, "startWatchdogLoop")
  runEvery1Minute("presenceTick")
  if (logEnable) log.debug "Initialized"
}

def refresh() {
  if (logEnable) log.debug "Refresh (no-op; waiting for MQTT updates)"
}

/* ========================= MQTT Core & Resilience ========================= */

private connectMqtt(Boolean forceScramble=false) {
  final Integer portInt = (brokerPort?.toString()?.replaceAll(",", "") ?: "1883") as Integer
  final String  uri     = "tcp://${brokerHost}:${portInt}"
  final String  baseId  = "hubitat-tuya-fan-" + device.id
  final Boolean scramble= (alwaysUniqueClientId ? true : false) || (forceScramble ?: false)
  final String  clientId= scramble ? "${baseId}-${(now()%100000)}" : baseId

  if (logEnable) log.debug "Connecting MQTT ${uri} as ${clientId} (scramble=${scramble})"
  try {
    sendEvent(name: "mqttStatus", value: "connecting")
    interfaces.mqtt.connect(uri, clientId, username ?: null, password ?: null)
  } catch (e) {
    if (logEnable) log.warn "MQTT connect threw: ${e?.message ?: e}"
    scheduleReconnect()
  }
}

private scheduleReconnect() {
  if (state.connected) return
  Integer delay = (state.backoff ?: BACKOFF_MIN) as Integer
  delay = Math.max(BACKOFF_MIN, Math.min(delay, BACKOFF_MAX))

  Integer jitter = Math.max(1, (int)(delay * 0.2))
  Integer withJitter = delay + (new Random().nextInt(jitter*2+1) - jitter)

  try { unschedule(RECONNECT_JOB) } catch (ignored) {}
  if (logEnable) log.warn "Scheduling reconnect in ${withJitter}s (backoff=${delay}s)"
  runIn(withJitter, RECONNECT_JOB)
  state.backoff = Math.min(delay * 2, BACKOFF_MAX)
}

def reconnectStep() {
  try { interfaces.mqtt.disconnect() } catch (ignored) {}
  pauseExecution(250)
  connectMqtt(true) // aggressively scramble to kill ghost sessions
}

private publishMqtt(String topic, String payload, Boolean retain=false, Integer qos=0) {
  try {
    interfaces.mqtt.publish(topic, payload, qos, retain)
    state.lastMqttOkEpoch = now()
    return true
  } catch (e) {
    if (logEnable) log.warn "Publish failed ${topic}: ${e?.message ?: e} β†’ scheduling reconnect"
    state.connected = false
    sendEvent(name: "mqttConnected", value: "false")
    setStatusOnline(false)
    scheduleReconnect()
    return false
  }
}

private setStatusOnline(Boolean online) {
  if (!statusTopicEnabled) return
  def root = activeRootOrNull()
  if (!root) return
  publishMqtt("${root}/status", online ? "online" : "offline", true, 0)
}

/* ========================= Topic Helpers ========================= */

private String rootFor(String ident) {
  def b = (baseTopic?.trim() ?: "")
  return b ? "${b}/${ident}" : "${ident}"
}

private List<String> candidateIdentifiers() {
  def ids = []
  switch ((identifierMode ?: "Auto")) {
    case "ID/Name":
      if (deviceId?.trim()) ids << deviceId.trim()
      break
    case "IP":
      if (deviceIp?.trim()) ids << deviceIp.trim()
      break
    default: // Auto
      if (deviceId?.trim()) ids << deviceId.trim()
      if (deviceIp?.trim()) ids << deviceIp.trim()
      break
  }
  if (ids.isEmpty() && deviceId?.trim()) ids << deviceId.trim()
  return ids.unique()
}

private String activeRootOrNull() {
  if (state.activeRoot) return state.activeRoot as String
  def c = candidateIdentifiers()
  return (c && c.size()>0) ? rootFor(c[0]) : null
}

private String tState(String root, int dp) {
  (topicStyle == "DP") ? "${root}/DP${dp}" : "${root}/dps/${dp}"
}
private String tCmd(String root, int dp) {
  (topicStyle == "DP") ? "${root}/DP${dp}/${commandSegment ?: 'command'}"
                       : "${root}/${commandSegment ?: 'command'}/${dp}"
}

private boolean topicMatchesDp(String topic, String root, int dp) {
  if (!root) return false
  // Accept both styles regardless of selected outbound style
  return topic == "${root}/DP${dp}" || topic == "${root}/dps/${dp}"
}

/* ========================= Subscriptions ========================= */

def subscribeTopics() {
  def cands = candidateIdentifiers()
  if (cands && cands.size() > 0) {
    def roots = cands.collect { rootFor(it) }
    try {
      roots.each { r -> DPs.each { dp -> interfaces.mqtt.subscribe(tState(r, dp)) } }
      if (logEnable) log.debug "Subscribed state topics under roots: ${roots}"
    } catch (e) {
      if (logEnable) log.warn "Subscribe failed: ${e?.message ?: e}"
      scheduleReconnect()
    }
  } else if (enableBootstrapLearn) {
    def wild = bootstrapWildcard()
    try {
      interfaces.mqtt.subscribe(wild)
      state[BOOTSTRAP_TOPIC_KEY] = wild
      if (logEnable) log.warn "Bootstrap learn active β€” subscribed '${wild}'. Will auto-learn on first DP state."
    } catch (e) {
      if (logEnable) log.warn "Bootstrap subscribe failed: ${e?.message ?: e}"
      scheduleReconnect()
    }
  } else {
    log.warn "No identifiers and bootstrap learn disabled; set Device ID/Name or Device IP."
  }
}

private String bootstrapWildcard() {
  def b = (baseTopic?.trim() ?: "")
  return b ? "${b}/#" : "#"
}

private void learnActiveRootFromTopic(String topic) {
  if (state.activeRoot) return
  String root = null
  def m = (topic =~ /^(.*)\/DP(?:1|3|9)(?:\/.*)?$/)
  if (m.matches()) {
    root = m[0][1]
  } else {
    m = (topic =~ /^(.*)\/dps\/(?:1|3|9)$/)
    if (m.matches()) root = m[0][1]
  }
  if (root) {
    state.activeRoot = root
    sendEvent(name: "activeRoot", value: root)
    if (logEnable) log.warn "Active root learned: ${root}"
    def wild = (state[BOOTSTRAP_TOPIC_KEY] ?: null)
    if (wild) {
      try { interfaces.mqtt.unsubscribe(wild) } catch (ignored) {}
      state.remove(BOOTSTRAP_TOPIC_KEY)
    }
    try {
      DPs.each { dp -> interfaces.mqtt.subscribe(tState(root, dp)) }
      if (logEnable) log.debug "Resubscribed narrowly to ${root} DP1/3/9"
    } catch (e) {
      if (logEnable) log.warn "Narrow resubscribe failed: ${e?.message ?: e}"
    }
  }
}

/* ========================= MQTT Callbacks ========================= */

def mqttClientStatus(String status) {
  sendEvent(name: "mqttStatus", value: status)
  if (logEnable) log.debug "MQTT status: ${status}"
  def s = status?.toLowerCase() ?: ""
  if (s.contains("succeed")) {
    state.connected = true
    state.backoff   = BACKOFF_MIN
    sendEvent(name: "mqttConnected", value: "true")
    presenceMark(true)
    runIn(1, "subscribeTopics")
    setStatusOnline(true)
    runIn(2, "heartbeatTick")
  } else if (s.startsWith("error") || s.contains("lost") || s.contains("disconnected")) {
    state.connected = false
    sendEvent(name: "mqttConnected", value: "false")
    setStatusOnline(false)
    scheduleReconnect()
  }
  runIn(10, "connectionGuard")
}

def parse(String description) {
  def msg = interfaces.mqtt.parseMessage(description)
  if (!msg?.topic) return
  final String topic   = msg.topic.toString()
  final String payload = (msg.payload == null) ? "" : msg.payload.toString().trim()

  state.lastMqttOkEpoch = now()
  markSeen()

  if (!state.activeRoot) learnActiveRootFromTopic(topic)
  final String root = activeRootOrNull()
  if (!root) return

  // DP1 (power)
  if (topicMatchesDp(topic, root, 1)) {
    final boolean onNow = asBool(payload)
    sendEvent(name: "switch", value: onNow ? "on" : "off")
    if (!onNow) sendEvent(name: "speed", value: "off")
    if (logEnable) log.debug "DP1 -> ${onNow}"
    return
  }

  // DP3 (speed)
  if (topicMatchesDp(topic, root, 3)) {
    final String raw  = payload.replaceAll("\"","")
    final String name = (raw=="1") ? "low" : (raw=="2") ? "medium" : (raw=="3") ? "high" : "off"

    // Stabilization window: if we want 2/3 and see a transient 1 (low), reassert
    final long nowMs  = now()
    if (state.pendingSpeedRaw && (state.pendingSpeedRaw in ["2","3"]) && raw == "1") {
      final long until = (state.stabilizeUntil ?: 0L) as long
      if (nowMs <= until) {
        if (logEnable) log.debug "DP3 transient low during stabilization; reasserting ${state.pendingSpeedRaw}"
        publishDP3(state.pendingSpeedRaw as String)
        return
      }
    }

    sendEvent(name: "speed", value: name)
    // IMPORTANT: do not force switch=on here. DP1 governs power/UI.
    if (logEnable) log.debug "DP3 -> ${name} (raw ${raw})"

    if (state.pendingSpeedRaw && raw == state.pendingSpeedRaw) {
      state.remove("pendingSpeedRaw")
      state.remove("stabilizeUntil")
    }
    return
  }

  // DP9 (light)
  if (topicMatchesDp(topic, root, 9)) {
    final boolean onNow = asBool(payload)
    childLight()?.sendEvent(name: "switch", value: onNow ? "on":"off")
    if (logEnable) log.debug "DP9 -> ${onNow}"
    return
  }
}

/* ========================= Capabilities ========================= */

def on()  {
  // Optimistic UI for snappy tiles; DP1 echo will confirm later
  sendEvent(name:"switch", value:"on")
  publishDP1(true)
}
def off() {
  publishDP1(false)
  // Optimistic UI + clear speed to prevent stale DP3 from flipping UI back
  sendEvent(name: "switch", value: "off")
  sendEvent(name: "speed",  value: "off")
  state.remove("pendingSpeedRaw")
  state.remove("stabilizeUntil")
  state.offGuardUntil = now() + 5000 // reserved if you later want to gate DP3->switch
}

private publishDP1(boolean val){
  final String root = activeRootOrNull()
  if (!root) {
    log.warn "No active root to publish DP1"
    return
  }
  final boolean ok = publishMqtt(tCmd(root, 1), val ? "true" : "false")
  if (logEnable && ok) log.debug "Publish DP1 ${val} -> ${tCmd(root,1)}"
}

def setSpeed(String speed) {
  speed = (speed ?: "off").toLowerCase()
  if (logEnable) log.debug "setSpeed requested: ${speed}"

  switch (speed) {
    case "off":
      off()
      return

    case "low":
    case "medium":
    case "high":
      final String v = (speed == "low") ? "1" : (speed == "medium") ? "2" : "3"
      ensureFanPowerOn()

      // Initial publish after 500ms to avoid racing default-low
      state.pendingSpeedRaw = v
      state.stabilizeUntil  = now() + 5000 // 5s stabilization window
      runInMillis(500, 'publishInitialSpeed')

      // One timed reassert
      Integer d = (resendDelaySec ?: 1) as Integer
      if (d < 1) d = 1
      runIn(d, "reassertPendingSpeed")

      sendEvent(name:"switch", value:"on")   // desired state
      sendEvent(name:"speed",  value:speed)
      return

    default:
      if (logEnable) log.debug "Unsupported speed '${speed}', defaulting to medium"
      setSpeed("medium")
      return
  }
}

def cycleSpeed() {
  def cur = (device.currentValue("speed") ?: "off").toLowerCase()
  def idx = SPEED_ORDER.indexOf(cur); if (idx < 0) idx = 0
  setSpeed(SPEED_ORDER[(idx + 1) % SPEED_ORDER.size()])
}

private ensureFanPowerOn() {
  if ((device.currentValue("switch") ?: "off") != "on") publishDP1(true)
  else publishDP1(true) // some bridges gate speed by power edge; harmless repeat
}

// helper invoked by runInMillis from setSpeed
def publishInitialSpeed() {
  def v = state.pendingSpeedRaw
  if (v) publishDP3(v as String)
}
def reassertPendingSpeed() {
  def v = state.pendingSpeedRaw
  if (!v) return
  if (logEnable) log.debug "Re-asserting DP3 ${v} after delay"
  publishDP3(v as String)
}

private publishDP3(String v) {
  final String root = activeRootOrNull()
  if (!root) {
    log.warn "No active root to publish DP3"
    return
  }
  final String  payload = "\"${v.replaceAll('\"','')}\""
  final boolean ok      = publishMqtt(tCmd(root, 3), payload)
  if (logEnable && ok) log.debug "Publish DP3 ${payload} -> ${tCmd(root,3)}"
}

private publishSupportedSpeeds() {
  sendEvent(name: "supportedFanSpeeds", value: '["off","low","medium","high"]', isStateChange: true)
}

/* ========================= Child light (DP9) ========================= */

private ensureLightChild() {
  if (!createLightChild) return
  if (!childLight()) {
    final String dni = "${device.deviceNetworkId}-light"
    try {
      def cd = addChildDevice("hubitat", "Generic Component Switch", dni,
        [name: "${device.displayName} Light", label: "${device.displayName} Light", isComponent: true])
      if (logEnable) log.debug "Created child light: ${cd?.deviceNetworkId}"
    } catch (e) {
      log.warn "Cannot create child light: ${e}"
    }
  }
}
private childLight() { getChildDevice("${device.deviceNetworkId}-light") }

def componentOn(cd)  {
  final String root = activeRootOrNull(); if (!root) return
  final boolean ok  = publishMqtt(tCmd(root, 9), "true")
  if (logEnable && ok) log.debug "Publish DP9 true -> ${tCmd(root,9)}"
}
def componentOff(cd) {
  final String root = activeRootOrNull(); if (!root) return
  final boolean ok  = publishMqtt(tCmd(root, 9), "false")
  if (logEnable && ok) log.debug "Publish DP9 false -> ${tCmd(root,9)}"
}
def componentRefresh(cd) { if (logEnable) log.debug "componentRefresh (no-op)" }

/* ========================= Presence ========================= */

private markSeen() {
  state.lastSeenEpoch = now()
  sendEvent(name: "lastSeen", value: new Date(state.lastSeenEpoch).format("yyyy-MM-dd HH:mm:ss"))
  presenceMark(true)
}
private presenceTick() {
  final long last = (state.lastSeenEpoch ?: 0L) as long
  final long ageS = (now() - last) / 1000L
  presenceMark(ageS <= (presenceTimeoutSec ?: 120))
}
private presenceMark(boolean present) {
  final String want = present ? "present" : "not present"
  if (device.currentValue("presence") != want) sendEvent(name: "presence", value: want)
}

/* ========================= Heartbeat, Guard & Watchdog ========================= */

def startWatchdogLoop() {
  runIn(Math.max(5, WATCHDOG_PERIOD), "watchdogTick")
  Integer ka0 = 0
  try { ka0 = ((keepalivePingSec ?: 0) as Integer) } catch(e) { ka0 = 0 }
  if (ka0 > 0) runIn(Math.max(5, ka0), "heartbeatTick")
}
private boolean mqttIsUp() { return (state.connected == true) }

def heartbeatTick() {
  Integer ka = 0
  try { ka = ((keepalivePingSec ?: 0) as Integer) } catch(e) { ka = 0 }
  if (ka <= 0) return
  if (!mqttIsUp()) { runIn(Math.max(5, ka), "heartbeatTick"); return }

  def root = activeRootOrNull()
  if (root) {
    final String topic = "${root}/heartbeat"
    final String ts    = new Date().format("yyyy-MM-dd'T'HH:mm:ssZ")
    publishMqtt(topic, ts, false, 0)
    if (logEnable) log.debug "Heartbeat -> ${topic} ${ts}"
  }
  runIn(Math.max(5, ka), "heartbeatTick")
}

def watchdogTick() {
  if (!state.connected) {
    if (logEnable) log.debug "Watchdog: not connected; scheduling reconnect"
    scheduleReconnect()
  } else {
    final long idleSec = (now() - (state.lastMqttOkEpoch ?: 0L)) / 1000L
    Integer ka = 0
    try { ka = ((keepalivePingSec ?: 0) as Integer) } catch(e) { ka = 0 }
    if (ka > 0 && idleSec > (ka * 2)) {
      if (logEnable) log.debug "Watchdog: idle ${idleSec}s > ${ka*2}s, sending proactive heartbeat"
      heartbeatTick()
    }
  }
  runIn(Math.max(5, WATCHDOG_PERIOD), "watchdogTick")
}

def connectionGuard() {
  if (!state.connected) {
    if (logEnable) log.warn "Connection guard: still not connected; forcing reconnect"
    try { interfaces.mqtt.disconnect() } catch (ignored) {}
    scheduleReconnect()
  }
}

/* ========================= Utility Commands ========================= */

def testMqtt() {
  def topic   = (activeRootOrNull() ?: "hubitat/test/fan")
  def t       = "${topic}/driver_test"
  def payload = "ping-${new Date().format("HH:mm:ss")}"
  def ok      = publishMqtt(t, payload, false, 0)
  if (logEnable) log.debug "Test MQTT publish -> ${t}: ${payload} (ok=${ok})"
}

def reconnectNow() {
  if (logEnable) log.warn "Manual reconnect requested"
  try { interfaces.mqtt.disconnect() } catch (ignored) {}
  state.connected = false
  state.backoff   = BACKOFF_MIN
  scheduleReconnect()
}

/* ========================= Helpers ========================= */

private static boolean asBool(def v) {
  def s = (v==null ? "" : v.toString().trim().toLowerCase())
  return (s in ["true","on","1","\"true\"","\"on\"","\"1\""])
}