Moes Zigbee Carbon Monoxide Alarm (HS-720ES)

Hello all,

I bought a Zigbee Moes Carbon Monoxide Alarm (model: HS-720ES, seems to be under Heiman on some websites) and I can't find a device category that fits on my C7 Hubitat.

Has anyone tried it or something similar ?

Fingerprint: profileId:"0104", endpointId:"01", inClusters:"0004,0005,EF00,0000,ED00", outClusters:"0019,000A", model:"TS0601", manufacturer:"_TZE284_rjxqso4a", controllerType: "ZGB"

Thanks!

1 Like

Pinging @kkossev

I agree, if anyone has a driver it would be @kkossev, but I'm not seeing where he has a driver for Tuya CO2 monitors, unless I'm missing something.

That's too bad but thanks for your answers guys

Can you try this ?

/*
 *  Moes / Heiman HS-720ES (TS0601 / _TZE284_rjxqso4a) โ€“ Zigbee Carbon Monoxide Alarm
 *
 *  Hubitat Elevation Driver (Groovy)
 *
 *  Tuya DPs (per public Z2M/ZHA converter info):
 *    DP  1 -> carbon_monoxide (bool)  0=clear, 1=detected
 *    DP  2 -> co (raw numeric)        ppm
 *    DP  9 -> self_test_result (enum) checking/success/failure/others
 *    DP 15 -> battery (raw numeric)   %
 *    DP 16 -> silence (bool/raw)      silence alarm (write)
 *
 *  Capabilities:
 *    - CarbonMonoxideDetector
 *    - Battery
 *    - Refresh
 *    - Configuration
 *
 *  Custom Attributes:
 *    - co (NUMBER, ppm)
 *    - selfTestResult (STRING)
 *    - silence (STRING: on/off)
 *
 *  Custom Commands:
 *    - setSilence(String state)  // "on" or "off"
 *
 *  Notes:
 *    - Tuya EF00/ED00 parsing implemented locally (no external library dependency).
 *    - A lightweight Tuya time sync is attempted in configure().
 *
 *  Author: ChatGPT (for Krassimir / Hubitat)
 *  Date: 2026-02-18
 */

metadata {
    definition(name: "Moes/Heiman HS-720ES Zigbee CO Alarm", namespace: "community", author: "ChatGPT", importUrl: "") {
        capability "CarbonMonoxideDetector"
        capability "Battery"
        capability "Refresh"
        capability "Configuration"

        attribute "co", "NUMBER"
        attribute "selfTestResult", "STRING"
        attribute "silence", "STRING"

        command "setSilence", [[name: "state*", type: "ENUM", constraints: ["on", "off"], description: "Silence alarm"]]

        fingerprint profileId: "0104",
                endpointId: "01",
                inClusters: "0004,0005,EF00,0000,ED00",
                outClusters: "0019,000A",
                model: "TS0601",
                manufacturer: "_TZE284_rjxqso4a",
                deviceJoinName: "HS-720ES CO Alarm"
    }

    preferences {
        input name: "logEnable", type: "bool", title: "Enable debug logging", defaultValue: true
        input name: "txtEnable", type: "bool", title: "Enable info logging", defaultValue: true
        input name: "coOffset",  type: "number", title: "CO offset (ppm)", defaultValue: 0, range: "-500..500"
    }
}

/* ------------------------------- Lifecycle ------------------------------- */

def installed() {
    if (txtEnable) log.info "Installed: ${device.displayName}"
}

def updated() {
    if (txtEnable) log.info "Updated: ${device.displayName}"
    if (logEnable) runIn(1800, "logsOff")
}

def configure() {
    if (txtEnable) log.info "Configuring ${device.displayName}"
    // Optional Tuya time sync (helps some Tuya devices behave nicely)
    try {
        sendHubCommand(new hubitat.device.HubAction(
                zigbee.command(0xEF00, 0x24, [:], createTuyaTimePayload()),
                hubitat.device.Protocol.ZIGBEE
        ))
    } catch (e) {
        if (logEnable) log.debug "Time sync not sent (non-fatal): ${e}"
    }
    refresh()
}

void logsOff() {
    device.updateSetting("logEnable", [value: "false", type: "bool"])
    log.warn "Debug logging disabled"
}

/* --------------------------------- Parse -------------------------------- */

def parse(String description) {
    if (logEnable) log.debug "parse(): ${description}"

    if (description?.startsWith("catchall:") || description?.startsWith("read attr -") || description?.startsWith("zigbee")) {
        Map descMap = zigbee.parseDescriptionAsMap(description)
        if (logEnable) log.debug "descMap = ${descMap}"

        if (descMap?.clusterInt in [0xEF00, 0xED00]) {
            parseTuyaCluster(descMap)
        }
    }
    return null
}

/* ------------------------------ Tuya Parsing ----------------------------- */
// Tuya DP frame: [DP (2B)][TYPE (1B)][LEN (2B)][DATA (N)]
private void parseTuyaCluster(Map descMap) {
    try {
        byte[] data = descMap?.data?.collect { (int) Integer.parseInt(it, 16) as byte } as byte[]
        if (!data) return

        int idx = 0
        while (idx + 4 < data.size()) {
            int dp = ((data[idx] & 0xFF) << 8) | (data[idx + 1] & 0xFF); idx += 2
            int dpType = data[idx++] & 0xFF
            int dpLen  = ((data[idx] & 0xFF) << 8) | (data[idx + 1] & 0xFF); idx += 2
            if (idx + dpLen > data.size()) break

            byte[] dpData = subBytes(data, idx, dpLen); idx += dpLen
            handleTuyaDP(dp, dpType, dpLen, dpData)
        }
    } catch (e) {
        log.warn "parseTuyaCluster() exception: ${e}"
    }
}

private void handleTuyaDP(int dp, int dpType, int dpLen, byte[] dpData) {
    if (logEnable) log.debug "DP=${dp} type=${hex(dpType)} len=${dpLen} data=${bytesToHex(dpData)}"

    switch (dp) {
        case 1: // carbon_monoxide bool (0=clear,1=detected)
            Integer b = tuyaGetBoolLike(dpType, dpData)
            boolean detected = (b != null && b != 0)
            sendEvent(name: "carbonMonoxide", value: detected ? "detected" : "clear",
                    descriptionText: detected ? "Carbon monoxide detected" : "Carbon monoxide clear")
            break

        case 2: // co ppm (raw)
            Long ppm = tuyaGetValue(dpType, dpData)
            if (ppm != null) {
                long adj = ppm + (settings.coOffset ?: 0)
                if (adj < 0) adj = 0
                sendEvent(name: "co", value: adj as long, unit: "ppm", descriptionText: "CO ${adj} ppm")
            }
            break

        case 9: // self_test_result enum
            Integer r = tuyaGetEnumLike(dpType, dpData)
            String res = [0: "checking", 1: "success", 2: "failure", 3: "others"][r] ?: "unknown(${r})"
            sendEvent(name: "selfTestResult", value: res, descriptionText: "Self-test: ${res}")
            break

        case 15: // battery %
            Long bat = tuyaGetValue(dpType, dpData)
            if (bat != null) {
                int pct = Math.max(0, Math.min(100, bat as int))
                sendEvent(name: "battery", value: pct, unit: "%", descriptionText: "Battery ${pct}%")
            }
            break

        case 16: // silence (bool/raw) โ€“ report + writeable
            Integer s = tuyaGetBoolLike(dpType, dpData)
            String state = (s != null && s != 0) ? "on" : "off"
            sendEvent(name: "silence", value: state, descriptionText: "Silence ${state}")
            break

        default:
            if (logEnable) log.debug "Unhandled DP ${dp} type=${hex(dpType)} len=${dpLen}"
            break
    }
}

/* ---------------------------- Tuya Helpers ------------------------------- */

private Integer tuyaGetBoolLike(int dpType, byte[] data) {
    if (!data || data.length < 1) return null
    // Tuya bool=0x01, enum=0x04, raw=0x00 often still uses 0/1 for flags
    return (data[0] & 0xFF)
}

private Integer tuyaGetEnumLike(int dpType, byte[] data) {
    if (!data || data.length < 1) return null
    return (data[0] & 0xFF)
}

private Long tuyaGetValue(int dpType, byte[] data) {
    // Tuya types: raw=0x00, bool=0x01, number=0x02 (4B), string=0x03, enum=0x04, bitmap=0x05
    if (!data) return null
    switch (dpType) {
        case 0x01: // bool
        case 0x04: // enum
            return (long) (data[0] & 0xFF)
        case 0x02: // number (often 4 bytes)
        case 0x00: // raw (variable)
        default:
            return bytesToUInt(data)
    }
}

private long bytesToUInt(byte[] data) {
    long v = 0
    data?.each { b -> v = (v << 8) | (b & 0xFF) }
    return v
}

private String bytesToHex(byte[] data) {
    data?.collect { String.format("%02X", it) }?.join("") ?: ""
}

private String hex(int val) { String.format("0x%02X", val & 0xFF) }

/* ------------------------------ Commands -------------------------------- */

def refresh() {
    if (txtEnable) log.info "Refresh requested"
    // Most Tuya TS0601 devices don't support explicit attribute polling; they report periodically.
}

void setSilence(String state) {
    boolean on = (state?.toLowerCase() == "on")
    if (txtEnable) log.info "Setting silence ${on ? 'on' : 'off'}"
    // optimistic UI update:
    sendEvent(name: "silence", value: on ? "on" : "off", descriptionText: "Silence ${on ? 'on' : 'off'} (requested)")

    List<hubitat.device.HubAction> cmds = sendTuyaBool(16, on)
    sendHubCommand(cmds)
}

/* --------------------------- Tuya Write Frames --------------------------- */

private List<hubitat.device.HubAction> sendTuyaBool(int dp, boolean value) {
    byte[] payload = [(byte) (value ? 0x01 : 0x00)] as byte[]
    // Prefer Tuya bool type (0x01). If your logs show it doesn't work, try changing type to 0x00 (raw).
    return [
            new hubitat.device.HubAction(
                    zigbee.command(0xEF00, 0x02, [:], tuyaPacket(dp, 0x01, payload)),
                    hubitat.device.Protocol.ZIGBEE
            )
    ]
}

private Map createTuyaTimePayload() {
    // Cmd 0x24 (set time) โ€“ payload: [DP(2B)=0x0101][TYPE=0x00][LEN=0x0008][UTC 4B][TZ 4B]
    long epoch = now() / 1000L
    int tz = (TimeZone.getDefault().getRawOffset() / 1000) // seconds offset (approx; ignores DST)
    byte[] utc = intTo4B(epoch as int)
    byte[] tzb = intTo4B(tz as int)
    byte[] dp = [(byte) 0x01, (byte) 0x01] as byte[]
    byte type = 0x00
    byte[] len = [(byte) 0x00, (byte) 0x08] as byte[]
    byte[] payload = concatBytes(concatBytes(concatBytes(dp, [type] as byte[]), len), concatBytes(utc, tzb))
    return [payload: payload]
}

private Map tuyaPacket(int dp, int type, byte[] value) {
    byte[] dpb = [(byte) ((dp >> 8) & 0xFF), (byte) (dp & 0xFF)] as byte[]
    byte[] typ = [(byte) (type & 0xFF)] as byte[]
    byte[] len = [(byte) ((value.length >> 8) & 0xFF), (byte) (value.length & 0xFF)] as byte[]
    byte[] payload = concatBytes(concatBytes(concatBytes(dpb, typ), len), value)
    return [payload: payload]
}

private byte[] intTo4B(int v) {
    return [(byte) ((v >> 24) & 0xFF), (byte) ((v >> 16) & 0xFF), (byte) ((v >> 8) & 0xFF), (byte) (v & 0xFF)] as byte[]
}

private byte[] concatBytes(byte[] a, byte[] b) { ((a ?: [] as byte[]) + (b ?: [] as byte[])) as byte[] }

private byte[] subBytes(byte[] src, int start, int len) {
    if (!src) return [] as byte[]
    int s = Math.max(0, start)
    int e = Math.min(src.length, start + Math.max(0, len))
    if (s >= e) return [] as byte[]
    byte[] out = new byte[e - s]
    int j = 0
    for (int i = s; i < e; i++) out[j++] = src[i]
    return out
}

Enable the Debug logging, then change the CO rapidly ... What is in the live logs?


You can also try this driver, but your particular device is not in it yet. The self-test feature will not work with it.