Can you try this ?
/*
* Moes / Heiman HS-720ES (TS0601 / _TZE284_rjxqso4a) โ Zigbee Carbon Monoxide Alarm
*
* Hubitat Elevation Driver (Groovy)
*
* Tuya DPs (per public Z2M/ZHA converter info):
* DP 1 -> carbon_monoxide (bool) 0=clear, 1=detected
* DP 2 -> co (raw numeric) ppm
* DP 9 -> self_test_result (enum) checking/success/failure/others
* DP 15 -> battery (raw numeric) %
* DP 16 -> silence (bool/raw) silence alarm (write)
*
* Capabilities:
* - CarbonMonoxideDetector
* - Battery
* - Refresh
* - Configuration
*
* Custom Attributes:
* - co (NUMBER, ppm)
* - selfTestResult (STRING)
* - silence (STRING: on/off)
*
* Custom Commands:
* - setSilence(String state) // "on" or "off"
*
* Notes:
* - Tuya EF00/ED00 parsing implemented locally (no external library dependency).
* - A lightweight Tuya time sync is attempted in configure().
*
* Author: ChatGPT (for Krassimir / Hubitat)
* Date: 2026-02-18
*/
metadata {
definition(name: "Moes/Heiman HS-720ES Zigbee CO Alarm", namespace: "community", author: "ChatGPT", importUrl: "") {
capability "CarbonMonoxideDetector"
capability "Battery"
capability "Refresh"
capability "Configuration"
attribute "co", "NUMBER"
attribute "selfTestResult", "STRING"
attribute "silence", "STRING"
command "setSilence", [[name: "state*", type: "ENUM", constraints: ["on", "off"], description: "Silence alarm"]]
fingerprint profileId: "0104",
endpointId: "01",
inClusters: "0004,0005,EF00,0000,ED00",
outClusters: "0019,000A",
model: "TS0601",
manufacturer: "_TZE284_rjxqso4a",
deviceJoinName: "HS-720ES CO Alarm"
}
preferences {
input name: "logEnable", type: "bool", title: "Enable debug logging", defaultValue: true
input name: "txtEnable", type: "bool", title: "Enable info logging", defaultValue: true
input name: "coOffset", type: "number", title: "CO offset (ppm)", defaultValue: 0, range: "-500..500"
}
}
/* ------------------------------- Lifecycle ------------------------------- */
def installed() {
if (txtEnable) log.info "Installed: ${device.displayName}"
}
def updated() {
if (txtEnable) log.info "Updated: ${device.displayName}"
if (logEnable) runIn(1800, "logsOff")
}
def configure() {
if (txtEnable) log.info "Configuring ${device.displayName}"
// Optional Tuya time sync (helps some Tuya devices behave nicely)
try {
sendHubCommand(new hubitat.device.HubAction(
zigbee.command(0xEF00, 0x24, [:], createTuyaTimePayload()),
hubitat.device.Protocol.ZIGBEE
))
} catch (e) {
if (logEnable) log.debug "Time sync not sent (non-fatal): ${e}"
}
refresh()
}
void logsOff() {
device.updateSetting("logEnable", [value: "false", type: "bool"])
log.warn "Debug logging disabled"
}
/* --------------------------------- Parse -------------------------------- */
def parse(String description) {
if (logEnable) log.debug "parse(): ${description}"
if (description?.startsWith("catchall:") || description?.startsWith("read attr -") || description?.startsWith("zigbee")) {
Map descMap = zigbee.parseDescriptionAsMap(description)
if (logEnable) log.debug "descMap = ${descMap}"
if (descMap?.clusterInt in [0xEF00, 0xED00]) {
parseTuyaCluster(descMap)
}
}
return null
}
/* ------------------------------ Tuya Parsing ----------------------------- */
// Tuya DP frame: [DP (2B)][TYPE (1B)][LEN (2B)][DATA (N)]
private void parseTuyaCluster(Map descMap) {
try {
byte[] data = descMap?.data?.collect { (int) Integer.parseInt(it, 16) as byte } as byte[]
if (!data) return
int idx = 0
while (idx + 4 < data.size()) {
int dp = ((data[idx] & 0xFF) << 8) | (data[idx + 1] & 0xFF); idx += 2
int dpType = data[idx++] & 0xFF
int dpLen = ((data[idx] & 0xFF) << 8) | (data[idx + 1] & 0xFF); idx += 2
if (idx + dpLen > data.size()) break
byte[] dpData = subBytes(data, idx, dpLen); idx += dpLen
handleTuyaDP(dp, dpType, dpLen, dpData)
}
} catch (e) {
log.warn "parseTuyaCluster() exception: ${e}"
}
}
private void handleTuyaDP(int dp, int dpType, int dpLen, byte[] dpData) {
if (logEnable) log.debug "DP=${dp} type=${hex(dpType)} len=${dpLen} data=${bytesToHex(dpData)}"
switch (dp) {
case 1: // carbon_monoxide bool (0=clear,1=detected)
Integer b = tuyaGetBoolLike(dpType, dpData)
boolean detected = (b != null && b != 0)
sendEvent(name: "carbonMonoxide", value: detected ? "detected" : "clear",
descriptionText: detected ? "Carbon monoxide detected" : "Carbon monoxide clear")
break
case 2: // co ppm (raw)
Long ppm = tuyaGetValue(dpType, dpData)
if (ppm != null) {
long adj = ppm + (settings.coOffset ?: 0)
if (adj < 0) adj = 0
sendEvent(name: "co", value: adj as long, unit: "ppm", descriptionText: "CO ${adj} ppm")
}
break
case 9: // self_test_result enum
Integer r = tuyaGetEnumLike(dpType, dpData)
String res = [0: "checking", 1: "success", 2: "failure", 3: "others"][r] ?: "unknown(${r})"
sendEvent(name: "selfTestResult", value: res, descriptionText: "Self-test: ${res}")
break
case 15: // battery %
Long bat = tuyaGetValue(dpType, dpData)
if (bat != null) {
int pct = Math.max(0, Math.min(100, bat as int))
sendEvent(name: "battery", value: pct, unit: "%", descriptionText: "Battery ${pct}%")
}
break
case 16: // silence (bool/raw) โ report + writeable
Integer s = tuyaGetBoolLike(dpType, dpData)
String state = (s != null && s != 0) ? "on" : "off"
sendEvent(name: "silence", value: state, descriptionText: "Silence ${state}")
break
default:
if (logEnable) log.debug "Unhandled DP ${dp} type=${hex(dpType)} len=${dpLen}"
break
}
}
/* ---------------------------- Tuya Helpers ------------------------------- */
private Integer tuyaGetBoolLike(int dpType, byte[] data) {
if (!data || data.length < 1) return null
// Tuya bool=0x01, enum=0x04, raw=0x00 often still uses 0/1 for flags
return (data[0] & 0xFF)
}
private Integer tuyaGetEnumLike(int dpType, byte[] data) {
if (!data || data.length < 1) return null
return (data[0] & 0xFF)
}
private Long tuyaGetValue(int dpType, byte[] data) {
// Tuya types: raw=0x00, bool=0x01, number=0x02 (4B), string=0x03, enum=0x04, bitmap=0x05
if (!data) return null
switch (dpType) {
case 0x01: // bool
case 0x04: // enum
return (long) (data[0] & 0xFF)
case 0x02: // number (often 4 bytes)
case 0x00: // raw (variable)
default:
return bytesToUInt(data)
}
}
private long bytesToUInt(byte[] data) {
long v = 0
data?.each { b -> v = (v << 8) | (b & 0xFF) }
return v
}
private String bytesToHex(byte[] data) {
data?.collect { String.format("%02X", it) }?.join("") ?: ""
}
private String hex(int val) { String.format("0x%02X", val & 0xFF) }
/* ------------------------------ Commands -------------------------------- */
def refresh() {
if (txtEnable) log.info "Refresh requested"
// Most Tuya TS0601 devices don't support explicit attribute polling; they report periodically.
}
void setSilence(String state) {
boolean on = (state?.toLowerCase() == "on")
if (txtEnable) log.info "Setting silence ${on ? 'on' : 'off'}"
// optimistic UI update:
sendEvent(name: "silence", value: on ? "on" : "off", descriptionText: "Silence ${on ? 'on' : 'off'} (requested)")
List<hubitat.device.HubAction> cmds = sendTuyaBool(16, on)
sendHubCommand(cmds)
}
/* --------------------------- Tuya Write Frames --------------------------- */
private List<hubitat.device.HubAction> sendTuyaBool(int dp, boolean value) {
byte[] payload = [(byte) (value ? 0x01 : 0x00)] as byte[]
// Prefer Tuya bool type (0x01). If your logs show it doesn't work, try changing type to 0x00 (raw).
return [
new hubitat.device.HubAction(
zigbee.command(0xEF00, 0x02, [:], tuyaPacket(dp, 0x01, payload)),
hubitat.device.Protocol.ZIGBEE
)
]
}
private Map createTuyaTimePayload() {
// Cmd 0x24 (set time) โ payload: [DP(2B)=0x0101][TYPE=0x00][LEN=0x0008][UTC 4B][TZ 4B]
long epoch = now() / 1000L
int tz = (TimeZone.getDefault().getRawOffset() / 1000) // seconds offset (approx; ignores DST)
byte[] utc = intTo4B(epoch as int)
byte[] tzb = intTo4B(tz as int)
byte[] dp = [(byte) 0x01, (byte) 0x01] as byte[]
byte type = 0x00
byte[] len = [(byte) 0x00, (byte) 0x08] as byte[]
byte[] payload = concatBytes(concatBytes(concatBytes(dp, [type] as byte[]), len), concatBytes(utc, tzb))
return [payload: payload]
}
private Map tuyaPacket(int dp, int type, byte[] value) {
byte[] dpb = [(byte) ((dp >> 8) & 0xFF), (byte) (dp & 0xFF)] as byte[]
byte[] typ = [(byte) (type & 0xFF)] as byte[]
byte[] len = [(byte) ((value.length >> 8) & 0xFF), (byte) (value.length & 0xFF)] as byte[]
byte[] payload = concatBytes(concatBytes(concatBytes(dpb, typ), len), value)
return [payload: payload]
}
private byte[] intTo4B(int v) {
return [(byte) ((v >> 24) & 0xFF), (byte) ((v >> 16) & 0xFF), (byte) ((v >> 8) & 0xFF), (byte) (v & 0xFF)] as byte[]
}
private byte[] concatBytes(byte[] a, byte[] b) { ((a ?: [] as byte[]) + (b ?: [] as byte[])) as byte[] }
private byte[] subBytes(byte[] src, int start, int len) {
if (!src) return [] as byte[]
int s = Math.max(0, start)
int e = Math.min(src.length, start + Math.max(0, len))
if (s >= e) return [] as byte[]
byte[] out = new byte[e - s]
int j = 0
for (int i = s; i < e; i++) out[j++] = src[i]
return out
}
Enable the Debug logging, then change the CO rapidly ... What is in the live logs?
You can also try this driver, but your particular device is not in it yet. The self-test feature will not work with it.
This driver adds support in Hubitat for these Tuya smoke detectors, that use the Tuya specific non-standard Zigbee cluster 0xEF00 messages.
Currently, it is being tested with manufacturer models "_TZE200_ntcy3xu1" and "_TZE200_uebojraa"
Please note, that none of these devices which are designed (not just manufactured) in China are known to have any UL, CSA, TรV, AS/NZS or similar type certificates! Some models are listed as being EN14604 certified.
The recommended installation method is to โฆ