Custom Generic Zigbee Multi-Endpoint Switch Drivers

As I have been making dumb devices smart with the Zigbee 4-channel boards (various brands, MOES, etc), I've wanted a custom driver for these boards that I could modify into an actual device.

Example:

This is my use case for the modified drivers, with the Tower fan as an example:

I am now converting a Swamp Cooler to Zigbee using yet another one of these boards. I wanted to get out of the device <-> controller <-> virtual device setup I have been using.

I am currently rewriting my Tower Fan controller to all be in one single driver that controls the fan endpoints directly. That will be easily modified into the swamp cooler driver to control fan speed and turn the water pump on and off.

Here are some drivers AI produced for me.

This one is a drop in for the built-in driver. It creates the children in the same way, so it can literally replace the built-in driver that is already being used with a board, and still use the same children. For new devices, it will create the children:

Custom Zigbee Multi-Endpoint Switch
/* Custom Zigbee Multi-Endopoint Switch

Written by Deepseek AI - 4/10/26

*/

import hubitat.device.HubAction
import hubitat.device.Protocol

metadata {
    definition(name: "Custom Zigbee Multi-Endpoint Switch", namespace: "Hubitat", author: "chrisbvt") {
        capability "Actuator"
        capability "Configuration"
        capability "Refresh"
        capability "Sensor"
        capability "Switch"               // <-- parent acts as a master switch

        command "configure"
        command "refresh"
        command "on"
        command "off"
    }

    preferences {
        input name: "logEnable", type: "bool", title: "Enable debug logging", defaultValue: true
        input name: "txtEnable", type: "bool", title: "Enable descriptionText logging", defaultValue: true
    }
}

def logsOff() {
    log.warn "debug logging disabled..."
    device.updateSetting("logEnable", [value: "false", type: "bool"])
}

def configure() {
    if (logEnable) runIn(1800, logsOff)
    log.info "Configuring ${device.displayName}"
    // Create child devices for endpoints that exist on your device
    [1, 2].each { ep ->
        def childId = "${device.id}-${ep.toString().padLeft(2,'0')}"
        if (!childDevices.find { it.deviceNetworkId == childId }) {
            try {
                addChildDevice("hubitat", "Generic Component Switch", childId, [
                    name: "${device.displayName} Switch ${ep}",
                    label: "${device.displayName} Switch ${ep}",
                    isComponent: false
                ])
            } catch (e) {
                log.error "Failed to create child for endpoint ${ep}: ${e.message}"
            }
        }
    }
}

def refresh() {
    // optional – can be left empty
}

// Parent switch commands – control all children
def on() {
    if (logEnable) log.debug "Parent on – turning on all endpoints"
    childDevices.each { child ->
        componentOn(child)
    }
    // Optimistically update parent state
    sendEvent(name: "switch", value: "on", displayed: false)
}

def off() {
    if (logEnable) log.debug "Parent off – turning off all endpoints"
    childDevices.each { child ->
        componentOff(child)
    }
    sendEvent(name: "switch", value: "off", displayed: false)
}

private Integer getEndpointFromChild(childDevice) {
    def parts = childDevice.deviceNetworkId?.split("-")
    if (parts && parts.size() == 2 && parts[0] == device.id.toString()) {
        try {
            return parts[1] as Integer
        } catch (NumberFormatException e) {
            return null
        }
    }
    return null
}

def parse(String description) {
    if (logEnable) log.debug "Parsing: ${description}"
    def map = zigbee.parse(description)
    if (!map) return

    try {
        def clusterId = map['clusterId']
        def attrId = map['attrId'] ?: map['attributeId']
        def endpoint = map['endpoint']
        def value = map['value']

        if (clusterId == 0x0006 && attrId == 0x0000) {
            def state = (value == 0x01) ? "on" : "off"
            def child = childDevices.find { getEndpointFromChild(it) == endpoint }
            if (child) {
                if (txtEnable) log.info "${device.displayName} endpoint ${endpoint} is ${state}"
                child.sendEvent(name: "switch", value: state, displayed: true)
            }
        }
    } catch (e) {
        // Ignore any parsing errors – device still works
        if (logEnable) log.trace "Ignored non‑On/Off message"
    }
}

def componentOn(childDevice) {
    def ep = getEndpointFromChild(childDevice)
    if (ep == null) return
    childDevice.sendEvent(name: "switch", value: "on", displayed: false)
    def cmd = "he cmd 0x${device.deviceNetworkId} 0x${ep.toString().padLeft(2,'0')} 0x0006 0x01 {}"
    sendHubCommand(new HubAction(cmd, Protocol.ZIGBEE))
}

def componentOff(childDevice) {
    def ep = getEndpointFromChild(childDevice)
    if (ep == null) return
    childDevice.sendEvent(name: "switch", value: "off", displayed: false)
    def cmd = "he cmd 0x${device.deviceNetworkId} 0x${ep.toString().padLeft(2,'0')} 0x0006 0x00 {}"
    sendHubCommand(new HubAction(cmd, Protocol.ZIGBEE))
}

def componentRefresh(childDevice) {
    // not needed
}

This Driver does not create the children. All the endpoints are controlled from the single device driver using commands.

Custom Zigbee Multi-Endpoint Switch with no Children
/* Custom Zigbee Multi-Endopoint Switch with no Children 

Written by Deepseek AI - 4/10/26

*/

import hubitat.device.HubAction
import hubitat.device.Protocol

metadata {
    definition(name: "Custom Zigbee Multi-Endpoint Switch No Children", namespace: "Hubitat", author: "chrisbvt") {
        capability "Actuator"
        capability "Configuration"
        capability "Refresh"
        capability "Sensor"

        command "configure"
        command "refresh"
        command "endpointOn", [[name: "endpointNumber", type: "NUMBER", description: "Endpoint number (1-4)"]]
        command "endpointOff", [[name: "endpointNumber", type: "NUMBER", description: "Endpoint number (1-4)"]]

        attribute "switch1", "enum", ["on", "off"]
        attribute "switch2", "enum", ["on", "off"]
        attribute "switch3", "enum", ["on", "off"]
        attribute "switch4", "enum", ["on", "off"]
    }

    preferences {
        input name: "logEnable", type: "bool", title: "Enable debug logging", defaultValue: true
        input name: "txtEnable", type: "bool", title: "Enable descriptionText logging", defaultValue: true
    }
}

def logsOff() {
    log.warn "debug logging disabled..."
    device.updateSetting("logEnable", [value: "false", type: "bool"])
}

def configure() {
    if (logEnable) runIn(1800, logsOff)
    log.info "Configuring ${device.displayName}"

    // Configure endpoints 1-4 directly (most multi-endpoint switches use these)
    (1..4).each { ep ->
        // Bind the On/Off cluster (0x0006) for this endpoint
        sendZigbeeCommands(zigbee.bind(0x0006, ep))
        // Configure reporting: send state change immediately, min interval 0, max 300s, change 1
        sendZigbeeCommands(zigbee.configureReporting(0x0006, ep, 0x0000, 0x10, 0, 300, 1))
        // Read current state so we know initial value
        sendZigbeeCommands(zigbee.readAttribute(0x0006, ep, 0x0000))
    }
    log.info "Configuration commands sent for endpoints 1-4"
}

def refresh() {
    if (logEnable) log.debug "Refreshing device"
    configure()
}

def parse(String description) {
    if (logEnable) log.debug "Parsing: ${description}"
    def map = zigbee.parse(description)
    if (!map) return

    // Handle On/Off attribute reports or read responses
    if (map.clusterId == 0x0006 && (map.attrId == 0x0000 || map.attributeId == 0x0000)) {
        def ep = map.endpoint
        if (ep == null || ep < 1 || ep > 4) return  // ignore invalid endpoints
        def value = map.value
        def stateVal = (value == 0x01) ? "on" : "off"
        def attrName = "switch${ep}"

        if (txtEnable) log.info "${device.displayName} endpoint ${ep} is ${stateVal}"
        sendEvent(name: attrName, value: stateVal)
        return
    }

    if (logEnable) log.debug "Unhandled message: ${map}"
}

// Turn on a specific endpoint (1-4)
def endpointOn(endpointNumber) {
    // Convert to integer in case it's passed as string (e.g., from rule)
    def ep = endpointNumber as Integer
    if (ep < 1 || ep > 4) {
        log.warn "endpointOn: invalid endpoint number ${endpointNumber} (must be 1-4)"
        return
    }
    sendEndpointCommand(ep, 0x01)  // 0x01 = ON
    sendEvent(name: "switch${ep}", value: "on", displayed: false)
    if (logEnable) log.debug "Turned ON endpoint ${ep}"
}

// Turn off a specific endpoint (1-4)
def endpointOff(endpointNumber) {
    def ep = endpointNumber as Integer
    if (ep < 1 || ep > 4) {
        log.warn "endpointOff: invalid endpoint number ${endpointNumber} (must be 1-4)"
        return
    }
    sendEndpointCommand(ep, 0x00)  // 0x00 = OFF
    sendEvent(name: "switch${ep}", value: "off", displayed: false)
    if (logEnable) log.debug "Turned OFF endpoint ${ep}"
}

// Helper to send Zigbee On/Off command to a specific endpoint
private void sendEndpointCommand(int endpoint, int onOff) {
    def cmd = "he cmd 0x${device.deviceNetworkId} 0x${endpoint.toString().padLeft(2,'0')} 0x0006 0x${onOff.toString().padLeft(2,'0')} {}"
    sendHubCommand(new HubAction(cmd, Protocol.ZIGBEE))
}

// Helper to send multiple Zigbee commands
private void sendZigbeeCommands(List<String> cmds) {
    cmds.each { cmd ->
        sendHubCommand(new HubAction(cmd, Protocol.ZIGBEE))
    }
}
1 Like

And now I have a Swamp Cooler Device Driver for the board :sunglasses:

Speeds: Endpoints 1-3
Cool: Endpoint 4

There was an issue with parse() method from Deepseek, I fixed it in the Swamp Cooler driver so the board buttons or board rf remote can control the cooler.

Swamp Cooler Driver for 4-channel Zigbee Board
/* 
Zigbee Swamp Cooler Driver using 4-channel Zigbee Baord

Zigbee board integration methods created by Deepseek AI - 4/10/26
All other code ©Christopher Burges

04/12/26:  Intital Code for Swamp Cooler Device using the Zigbee board commands

*/

import hubitat.device.HubAction
import hubitat.device.Protocol

metadata {
    definition(name: "Swamp Cooler Driver", namespace: "Hubitat", author: "chrisbvt") {
        capability "Actuator"
        capability "Configuration"
        capability "Refresh"
        capability "Sensor"
        capability "Fan Control"
        capability "Switch"
        capability "Actuator"        
       
        attribute "speed", "ENUM"
        attribute "switch", "ENUM"   
        attribute "supportedFanSpeeds", "JSON_OBJECT"
        attribute "cool", "ENUM"  

        command "setSpeed", [[name:"setSpeed",type:"ENUM", description:"Set Fan Speed", constraints:["off","low","medium","high"]]]
        command "setCool", [[name:"setCool",type:"ENUM", description:"Set Fan Cool", constraints:["off","on"]]]
        command "on"
        command "off"
        command "configure"
        command "initialize"
        command "refresh"
    }

    preferences {
        input name: "logEnable", type: "bool", title: "Enable debug logging", defaultValue: true
        input name: "txtEnable", type: "bool", title: "Enable descriptionText logging", defaultValue: true
    }
}

def installed() {
	log.warn "installed..." 
    state.speed = "off"

    sendEvent(name: "speed", value: "off")
    sendEvent(name: "switch", value: "off")   
    sendEvent(name: "supportedFanSpeeds", value: '["off","low","medium","high"]')
    sendEvent(name: "cool", value: "off")
    configure()

	updated()
}

def updated() {
	log.info "updated..."
	log.warn "debug logging is: ${logEnable == true}"
	if (logEnable) runIn(1800,logsOff)
}

def refresh() {
    if (logEnable) log.debug "Refreshing device"
    configure()
}

def initialize() {
    state.speed = "off"

    sendEvent(name: "speed", value: "off")
    sendEvent(name: "switch", value: "off")   
    sendEvent(name: "supportedFanSpeeds", value: '["off","low","medium","high"]')
    sendEvent(name: "cool", value: "off")
}


// ************************************ Methods for Fan Driver ****************************
def setCool(status) {
    logDebug("setCool(${status}) called")
    sendEvent(name: "cool", value: status, descriptionText: getDescriptionText("cool set to ${status}")) 
    def powerOn = device.currentValue("switch") == "on"

    if (status == "on" && powerOn) {
        coolOn()
    }
    if (status == "off") {
        coolOff()
    }
}

def on() {
    sendEvent(name: "switch", value: "on", descriptionText: getDescriptionText("switch turned on")) 
    sendEvent(name: "speed", value: state?.speed, descriptionText: getDescriptionText("speed set to ${state?.speed}"))
    fanSpeedHandler(state?.speed)
}

def off() {
    sendEvent(name: "switch", value: "off", descriptionText: getDescriptionText("switch turned off")) 
    sendEvent(name: "speed", value: "off", descriptionText: getDescriptionText("speed set to off"))
    fanSpeedHandler("off")
}

def setSpeed(speed) {
    logDebug "setSpeed(${speed}) was called"
    if (speed == "off") {off()}
    else if (speed == "on") {on()}
    else if (speed == "low" || speed == "medium" || speed == "high") {  
        sendEvent(name: "speed", value: speed, descriptionText: getDescriptionText("speed set to ${speed}"))   
        state.speed = speed     
        if (device.currentValue("switch") == "off") {sendEvent(name: "switch", value: "on", descriptionText: getDescriptionText("switch turned on"))}       
        fanSpeedHandler(speed)  // Zigbee Call for low, medium, high switches
        state.speed = speed
    } else if (speed == "auto") {   // toggle cool with auto fan mode
        def cool = device.currentValue("cool")
        if (cool == "on") {setCool("off")}
        if (cool == "off") {setCool("on")}
    }
}

// update driver states only when command came from the baord itself (or rf remote via board)
def updateDriverState(name, value) {
    logDebug("updateDriverState(${name}, ${value}) called")
    if (name == "speed") {
        if (value == "low" || value == "medium" || value == "high") {     
            if (device.currentValue("switch") == "off") {sendEvent(name: "switch", value: "on", descriptionText: getDescriptionText("switch turned on"))}
            sendEvent(name: "speed", value: value, descriptionText: getDescriptionText("speed set to ${speed}"))
        } 
        else {
            if (value == "off") {sendEvent(name: "speed", value: value, descriptionText: getDescriptionText("speed set to ${speed}"))}
        }
    } else {sendEvent(name: name, value: value, descriptionText: getDescriptionText("${name} set to ${value}"))}
}

def setOffInMin(min) {
    logDebug "setOffInMin(${min}) was called"

    if (min == "X") {
        unschedule("autoTimerOff")
        unschedule("off")
        runIn(10800, off)
    } else if (device.currentValue("speed") != "off") {
        def sec = (min.toInteger() * 60)
        logDebug "Off in ${sec} seconds"
        runIn(sec, autoTimerOff)
    }
}

def autoTimerOff() {
    logDebug "autoFanOff() was called"
    if (device.currentValue("speed") != "off") {
        off()
    }
}

def autoTurnOff() {
    if (settings?.autoOff) {
        def secs = device.currentValue("offMinutes").toInteger() * 60
        runIn(secs, off)
    }
}

// ***************************** Device Methods for Fan Controller ****************************

// set the fan to the driver speed from child device
def fanSpeedHandler(speed) {
    logDebug("fanSpeedHandler(${speed}) was called")
    if (speed == "low" && currentFanSpeed() != "low") {fanOnLow();}
    if (speed == "medium" && currentFanSpeed() != "medium") {fanOnMed();}
    if (speed == "high" && currentFanSpeed() != "high") {fanOnHigh();}
    if (speed == "off" && currentFanSpeed() != "off") {fanOff();}
}

// Turn on speed and turn other speeds off
def fanOnLow() {
    logDebug("fanOnLow() called")
    endpointOn(1)    
    turnOffOtherSpeeds(false, true, true)
}

def fanOnMed() {
    endpointOn(2)
    turnOffOtherSpeeds(true, false, true)    
}

def fanOnHigh() {
    endpointOn(3)
    turnOffOtherSpeeds(true, true, false)
}

def fanOff() {
    endpointOff(1) // low off
    endpointOff(2) // med off
    endpointOff(3) // high off
    endpointOff(4) // cool off
}

def coolOn() {
    logDebug("coolOn() called")
    endpointOn(4)
}

def coolOff() {
    endpointOff(4)
}

def turnOffOtherSpeeds(low, medium, high) {
    logDebug("turnOffOtherSpeeds(${low}, ${medium}, ${high}) called")

    if (low && state?.switch2 == "on") {endpointOff(1)}         // low off
    if (medium && state?.switch3 == "on") {endpointOff(2)}      // med off
    if (high && state?.switch4 == "on") {endpointOff(3)}        // high off
}

// get current speed
String currentFanSpeed() {

    if (state?.switch1 == "on") {return "low"}
    if (state?.switch2 == "on") {return "medium"}
    if (state?.switch3 == "on") {return "high"}
    else {return "off"}
}

// ********** From Board Buttons (or RF remote) ***********

def fanSpeedChangeHandler(ep, value) {
    logDebug("fanSpeedChangeHandler(${ep}, ${value}) called")
    def speed = device.currentValue("speed")

    if (ep == 1) {
        if (value == "on" && device.currentValue("speed") != "low") {
            turnOffOtherSpeeds(false, true, true)
            updateDriverState("speed","low")
            state.speed = "low"
        }
        if (value == "off") { 
            if (speed == "low") {
                state.speed = "off"
                updateDriverOff()
            }
        }
    }
    if (ep == 2) {
        if (value == "on" && device.currentValue("speed") != "medium") {
            turnOffOtherSpeeds(true, false, true)
            updateDriverState("speed","medium")
            state.speed = "medium"
        }    
        if (value == "off") {  
            if (speed == "medium") {
                state.speed = "off"
                updateDriverOff()
            }
        }   
    }
    if (ep == 3) {
        if (value == "on" && device.currentValue("speed") != "high") {
            turnOffOtherSpeeds(true, true, false)
            updateDriverState("speed","high")
            state.speed = "high"
        }
        if (value == "off") { 
            if (speed == "high") {
                state.speed = "off"
                updateDriverOff()
            }
        } 
    }

}

def updateDriverOff() {
    updateDriverState("switch","off")
    updateDriverState("speed","off")    
}

// For rf remote or board buttons, lets a speed button act as off when pressed while current speed matches button
def checkOtherSpeedsOff(speed) {
    logDebug("checkOtherSpeedsOff(${speed}) called")

    def othersOff = false
    lowOff = state?.switch1 == "off"
    medOff = state?.switch2 == "off"
    highOff = stae?.switch3 == "off"
    logDebug("lowOff=${lowOff} medOff=${medOff} highOff=${highOff}")

    if (speed == "low") {if (medOff && highOff) {othersOff = true}}
    if (speed == "medium") {if (lowOff && highOff) {othersOff = true}}
    if (speed == "high") {if (lowOff && medOff) {othersOff = true}}

    logDebug("returning ${othersOff}")
    return othersOff
}

// ****************** Board Control Methods ******************************

def configure() {
    if (logEnable) runIn(1800, logsOff)
    log.info "Configuring ${device.displayName}"

    // Configure endpoints 1-4 directly (most multi-endpoint switches use these)
    (1..4).each { ep ->
        // Bind the On/Off cluster (0x0006) for this endpoint
        sendZigbeeCommands(zigbee.bind(0x0006, ep))
        // Configure reporting: send state change immediately, min interval 0, max 300s, change 1
        sendZigbeeCommands(zigbee.configureReporting(0x0006, ep, 0x0000, 0x10, 0, 300, 1))
        // Read current state so we know initial value
        sendZigbeeCommands(zigbee.readAttribute(0x0006, ep, 0x0000))
    }
    log.info "Configuration commands sent for endpoints 1-4"
}

// Parse incomming messages
def parse(description) {
    if (logEnable) log.debug "Parsing: ${description}"
    def map = zigbee.parseDescriptionAsMap(description)
    if (!map) return

    // Handle On/Off attribute reports or read responses
    if (map.clusterInt == 0x0006 && map.attrInt == 0x0000) {
        if (map.value) {
            def ep = Integer.parseInt(map.endpoint, 16)//map.endpoint
            if (ep == null || ep < 1 || ep > 4) return  // ignore invalid endpoints
            def value = map.value
            logDebug("Value is ${value}")
            def stateVal = (value == "01") ? "on" : "off"
            logDebug("stateVal = ${stateVal}")

            if (txtEnable) log.info "${device.displayName} endpoint ${ep} is ${stateVal}"
            state."switch${ep}" = stateVal
            if (ep != 4) fanSpeedChangeHandler(ep, stateVal) else {fanCoolChangeHandler(stateVal)}
            return
        }
    }
    if (logEnable) log.debug "Unhandled message: ${map}"
}

def fanCoolChangeHandler(value) {
    logDebug("fanCoolChangeHandler called with ${value}")

    if (value == "on") {
        sendEvent(name: "cool", value: "on", descriptionText: getDescriptionText("cool set to ${value}"))
    }
    if (value == "off") {
        sendEvent(name: "cool", value: "off", descriptionText: getDescriptionText("cool set to ${value}"))
    }    
}

// Turn on a specific endpoint (1-4)
def endpointOn(endpointNumber) {
    // Convert to integer in case it's passed as string (e.g., from rule)
    def ep = endpointNumber as Integer
    if (ep < 1 || ep > 4) {
        log.warn "endpointOn: invalid endpoint number ${endpointNumber} (must be 1-4)"
        return
    }
    sendEndpointCommand(ep, 0x01)  // 0x01 = ON
    state."switch${ep}" = "on"
    if (logEnable) log.debug "Turned ON endpoint ${ep}"
}

// Turn off a specific endpoint (1-4)
def endpointOff(endpointNumber) {
    def ep = endpointNumber as Integer
    if (ep < 1 || ep > 4) {
        log.warn "endpointOff: invalid endpoint number ${endpointNumber} (must be 1-4)"
        return
    }
    sendEndpointCommand(ep, 0x00)  // 0x00 = OFF
    state."switch${ep}" = "off"
    if (logEnable) log.debug "Turned OFF endpoint ${ep}"
}

// Helper to send Zigbee On/Off command to a specific endpoint
private void sendEndpointCommand(int endpoint, int onOff) {
    logDebug("sendEndpointCommand(${endpoint}, ${onOff}) called")
    def cmd = "he cmd 0x${device.deviceNetworkId} 0x${endpoint.toString().padLeft(2,'0')} 0x0006 0x${onOff.toString().padLeft(2,'0')} {}"
    sendHubCommand(new HubAction(cmd, Protocol.ZIGBEE))
}

// Helper to send multiple Zigbee commands
private void sendZigbeeCommands(List<String> cmds) {
    cmds.each { cmd ->
        sendHubCommand(new HubAction(cmd, Protocol.ZIGBEE))
    }
}

private getDescriptionText(msg) {
	def descriptionText = "${device.displayName} ${msg}"
	if (settings?.txtEnable) log.info "${descriptionText}"
	return descriptionText
}

private logDebug(msg) {
	if (settings?.logEnable) log.debug "${msg}"
}

def logsOff(){
	log.warn "debug logging disabled..."
	device.updateSetting("logEnable",[value:"false",type:"bool"])
}