[BETA] YAMA (Yet Another MQTT App)

I should add that it took me a while to find the zen of the Maker API which was far simpler than what I had to go through for SmartThings including writing my own version of the MakerAPI.

1 Like

OK, I do have a suggestion - rather than a message for each attribute, collect them in a single JSON bundle. This has two advantages -- one it would cut down the traffic 10x. The other is that level and switch are entangled so I have to cache the level and wait for the switch value. A single bundle would make it simpler

Of course, for compatibility, it would need to be under a flag.

I've thought about that before. It is kind of a 'pay me now, pay me later' thing.

As it is now, as events come in, the app spits them out to MQTT. Very low latency as there is no parsing or processing involved.

To group them, I either have to cache everything in the app, then build a json object, then send... Or if I don't cache every value it would be worse as I would have to fetch all of the attributes 1st, then make the json object and then send.

Caching is fast, but very memory/structure intensive - and hard to do in Hubitat on a large set of values.

Fetching all of the attribute data on queue is simply too slow for external automation uses where latency is very important.

No doubt grouping would reduce network traffic/# of MQTT messages, but at the expense of latency and hub loading. So it would need some thought.

My initial design was to intentionally do no processing, and do any grouping/caching/aggregation elsewhere (node-red typically) as those external systems have much larger memory and CPU cycles available.

So I don't disagree with you, but at the same time I'm not sure that's the direction I really want to take the app / I think that change makes a whole different set of problems.

3 Likes

I understand -- I had assumed you could get the info all at once. So I can do my own caching.

Stepping back, there's a limit to what makes sense within the Hubitat (SmartThings) givens. As part of writing my column, I've been going back to first principles. Fortunately the tools have improved a lot since I started my current app 25 years ago.

I do notice that there is a lot of redundant information being presented. Does Hubitat repeated provide the same info? I presume that's a platform design issue.

I'm not sure how I could comment on that without some specific examples.

In any case this app simply passes along every command and attribute a device has, regardless whether the attribute and some of the commands are all that interesting or useful or not.

That's the double-edged sword of making my app super simple, and not providing a ton of configuration options. You get everything, with very little configuration. But if you don't want everything, you can't limit it down.

I agree with making it simple and not changing anything.

I'm assuming the reason I see so much traffic even when devices are not changing state is that Hubitat (or the device drivers) are generating messages continually.

I wouldn't think so, no. Watching my mqtt broker, I only see new messages on topics when the value updates.

In hubitat drivers it is an option on how to do the value updates. Typically the driver authors would not force an update if the value hasn't changed, and let the platform decide when to actually store a state change. When it is left up to the platform, it won't do a state change unless the value has changed from the previous value.

I'm not aware of any of the inbox drivers that are forcing State changes, but it certainly possible if you are using some user drivers that they might be doing so.

First the good news - I changed the IP address to a DNS name and it seems to be working just fine.

image

I suspect it's an issue with the Wiz driver. Perhaps @zranger1 can comment.

Hi!

The Wiz driver does poll the lights to get current status. There's no other way to do it.
By default, it's one datagram transaction of about 150 bytes every 10 seconds.

If you've got a lot of lights, that might be generating some traffic. If this really is causing trouble, you can change the polling rate from the device page, or via the API.

1 Like

OK, so with 60 lights (6/second) each producing 8 MQTT messages it adds up and matches what I see. I take it comparing against the previous status isn't simple.

It's likely possible, but it would require touching a lot of things, and I can't get to it for a while -- not 'till at least after Burning Man.

@zranger1 You really shouldn't have to do anything special. As long as on the event publishing it doesn't force a state change (isStateChange: true), then the platform will auto-dedup events.

Something like this (stolen from one of my drivers):

result << sendEvent([name: "pushed", value: 1, descriptionText: "$device.displayName had Up Pushed (button 1) [physical]", type: "physical", isStateChange: false]

if isStateChange: false or omitted (default=false), then the hub will dedup events automatically.

Better described here:

Of course the problem may actually be in my YAMA app... I looked, and I specified filterEvents:false for the subscriptions, which from the docs:

filterEvents - Used for device subscriptions. Set to false to receive all events, defaults to true and events that do not have a changed value will not be processed.

So simply changing/removing that may "fix" what you are seeing.... Unless I put it in there for a specific reason - that is the part I can't remember. I don't "usually" change defaults unless there is a reason, that is the only thing giving me pause.

Here is a test build that simply sets all the event filters to TRUE instead of FALSE, if you want to try it. Open YAMA app replace all the code with the code below, and save. Then open the YAMA App and click done (or even better, reboot the hub):

Test Code
/*
 *  Import URL: https://raw.githubusercontent.com/Botched1/Hubitat/master/Apps/YAMA/YAMA%20APP.groovy 
 *
 *  ****************  YAMA (Yet Another MQTT App) APP  ****************
 *
 *  Design Usage:
 *  Publish Hubitat devices, commands, and attributes to MQTT 
 *
 *  Original code, but inspiration and some ideas taken from the work of Kevin (xAPPO) and MQTT Link (mydevbox, jeubanks, et al)
 *  
 *-------------------------------------------------------------------------------------------------------------------
 *  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 *  in compliance with the License. You may obtain a copy of the License at:
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
 *  for the specific language governing permissions and limitations under the License.
 *
 * ------------------------------------------------------------------------------------------------------------------------------
 *
 *  Changes:
 *
 *  V0.0.1 - 01/12/21 - Test release 1
 *  V0.0.2 - 01/13/21 - Test release 2. fixed init after reboot, and a few other code cleanup items
 *  V0.0.3 - 01/13/21 - Added sendAll command support. Made init only do commands and subscriptions, but not re-publish all attributes.
 *  V0.0.4 - 01/13/21 - Added logic to remove mqttDriver from deviceList if it was selected
 *  V0.0.5 - 01/20/21 - Added check for driver connection status on init 
 *  V0.0.6 - 01/23/21 - Added check for driver connection status before sending events to MQTT
 *  V0.0.7 - 01/30/21 - Split sendAll and Initialize methods apart. Delayed initialization code by 1s to prevent issues if the driver happens to send, or app sees, multiple "init" events.
 *  V0.0.8 - 02/06/21 - Changed topic structure, putting attributes in hubitat/hubName/deviceName/attributes/* and commands in hubitat/hubName/deviceName/commands/*
 *  V0.0.9 - 08/08/22 - Removed all ["filterEvents": false] from subscriptions as a test.
 *
 */

import groovy.json.JsonSlurper

definition(
	name: "YAMA (Yet Another MQTT App)",
	namespace: "Botched1",
	author: "Jason Bottjen",
	description: "",
	iconUrl: "",
	iconX2Url: "",
	iconX3Url: "")

preferences 
{
	page(name: "pageConfig")
}

def pageConfig() 
{
	dynamicPage(name: "", title: "", install: true, uninstall: true, refreshInterval:0) 
	{
		section("Configuration") 
		{
			input "mqttDriver", "capability.notification", title: "MQTT Driver", description: "MQTT Driver", required: true, multiple: false
			input "deviceList", "capability.*", title: "Devices", description: "Devices", required: true, multiple: true
			input(name: "logEnable", type: "bool", defaultValue: "true", title: "Enable Debug Logging", description: "Enable extra logging for debugging.")
		}
	}
}

def installed() 
{
	log.debug "----- in installed -----"	
	if (logEnable) log.debug "installed"
	initialize()
}

def updated()
{
	log.debug "----- in updated -----"
	if (logEnable) log.debug "updated"
	initialize()
}

def logsOff(){
    log.warn "debug logging disabled..."
    app.updateSetting("logEnable",[value:"false",type:"bool"])
}

def initialize() 
{
	if (logEnable) {
		log.warn "Debug logging is enabled. Will be automatically turned off in 30 minutes."
		runIn(1800,logsOff)
	}

	subscribe(mqttDriver, "parentComplete", deviceEvent, ["filterEvents": true])
	
	log.warn "State: " + atomicState.initialized
	if (atomicState.initialized != "initializing") {
		if (logEnable) log.debug "initialize starting"	
		atomicState.initialized = "initializing"
		runInMillis(1000,initializeMqtt);
	}
}

def initializeMqtt() 
{
	log.debug "YAMA app initialization: Beginning"
	
	// Unsubscribe from all events
	unsubscribe()
	
	// Remove the MQTT Driver from the device list, if it was selected for publishing
	if (deviceList.find {it.name == mqttDriver.name}) {
		deviceList.remove(deviceList.findIndexOf {it.name == mqttDriver.name})
	}

	// See if driver is connected to MQTT broker
	if (mqttDriver.currentValue("connectionState") == "disconnected") {
		log.debug "Error: Driver could not connect to MQTT broker! Could not Initialize app."
		return;
	}
	
	// Walk through selected devices to get commands and attributes
	for(item in deviceList){
		// Publish Commands
		commandList = item.getSupportedCommands()

		for(commandItem in commandList){
			mqttDriver.publish("${item}/commands/${commandItem}/set","")
			pauseExecution(100)
		}

		// Publish sendAll command
		mqttDriver.publish("sendAll","")
		pauseExecution(100)
		
		// MQTT Subscribe to all command sets
		mqttDriver.subscribe("+/+/+/set")
		pauseExecution(100)
		
		// Subscribe to sendAll
		mqttDriver.subscribe("sendAll")
	}

	// Subscribe to events form selected devices, and MQTT driver
	subscribe(deviceList, deviceEvent, ["filterEvents": true])
	subscribe(mqttDriver, "mqtt", deviceEvent, ["filterEvents": true])
	subscribe(mqttDriver, "init", deviceEvent, ["filterEvents": true])
	subscribe(mqttDriver, "sendAll", deviceEvent, ["filterEvents": true])
	subscribe(mqttDriver, "parentComplete", deviceEvent, ["filterEvents": true])
	
	log.debug "YAMA app initialization: Complete"
	
	// Set state so new events will get processed in that handler
	atomicState.initialized = "completed"
}

def sendAll() 
{
	if (atomicState.initialized == "completed") {
		if (logEnable) log.debug "sendAll starting"	
		atomicState.initialized = "sendAll"
		runInMillis(100,sendAllMqtt);
	} 
	else {
		log.debug "sendAll aborted since initialized flag != completed"	
	}
}

def sendAllMqtt() 
{
	log.debug "YAMA app initialization: Beginning"
	
	// Set state so events that happen during init are ignored in the event handler
	//atomicState.initialized = "re-sending"
	
	// Unsubscribe from all events
	unsubscribe()
	
	// Remove the MQTT Driver from the device list, if it was selected for publishing
	if (deviceList.find {it.name == mqttDriver.name}) {
		deviceList.remove(deviceList.findIndexOf {it.name == mqttDriver.name})
	}

	// See if driver is connected to MQTT broker
	if (mqttDriver.currentValue("connectionState") == "disconnected") {
		log.debug "Error: Driver could not connect to MQTT broker! Could not Initialize app."
		return;
	}
	
	// Walk through selected devices to get commands and attributes
	for(item in deviceList){
		// Publish Commands
		commandList = item.getSupportedCommands()

		for(commandItem in commandList){
			log.debug "Setting ${item}/commands/${commandItem}/set"
			mqttDriver.publish("${item}/commands/${commandItem}/set","")
			pauseExecution(100)
		}

		// Publish Attributes
		attributeList = item.getSupportedAttributes()
		
		for(attributeItem in attributeList){
			curVal = item.currentValue("${attributeItem}")
			
			if (!curVal) {
				curVal="null"
			} else {
				curVal = curVal.toString()
			}
			mqttDriver.publish("${item}/attributes/${attributeItem}/value",curVal)
			pauseExecution(100)
		}
	}

	// Publish sendAll command
	mqttDriver.publish("sendAll","")
	pauseExecution(100)
		
	// MQTT Subscribe to all command sets
	mqttDriver.subscribe("+/+/+/set")
	pauseExecution(100)
		
	// Subscribe to sendAll
	mqttDriver.subscribe("sendAll")
	pauseExecution(100)
	
	// Subscribe to events form selected devices, and MQTT driver
	subscribe(deviceList, deviceEvent, ["filterEvents": true])
	subscribe(mqttDriver, "mqtt", deviceEvent, ["filterEvents": true])
	subscribe(mqttDriver, "init", deviceEvent, ["filterEvents": true])
	subscribe(mqttDriver, "sendAll", deviceEvent, ["filterEvents": true])
	subscribe(mqttDriver, "parentComplete", deviceEvent, ["filterEvents": true])
	
	log.debug "YAMA app initialization: Complete"
	
	// Set state so new events will get processed in that handler
	atomicState.initialized = "completed"
}

def uninstalled() 
{
	unschedule()
	unsubscribe()
	log.debug "uninstalled"
}

def deviceEvent(evt)
{
	/*
	log.debug "name: " + evt.name
	log.debug "descriptionText: " + evt.descriptionText
	log.debug "source: " + evt.source
	log.debug "value: " + evt.value
	log.debug "unit: " + evt.unit
	log.debug "description: " + evt.description
	log.debug "type: " + evt.type	
	log.debug "locationId: " + evt.locationId
	log.debug "hubId: " + evt.hubId
	log.debug "displayName: " + evt.getDisplayName()
	log.debug "device: " + evt.getDevice()
	log.debug "deviceId: " + evt.getDeviceId()
    */

	log.debug "atomicState.initialized: " + atomicState.initialized
	log.debug "atomicState.initialized != completed: " + (atomicState.initialized != "completed")

    // MQTT driverparentComplete
	if (evt.name == "parentComplete") {
		log.debug "Setting atomicState.initialized = completed"
		atomicState.initialized = "completed"
		return;
	}
	
	if (atomicState.initialized != "completed") {
		log.debug "Aborting since atomicState.initialized != completed"
		return;
	}
	
    // If MQTT driver is init, then re-initialize app
	if (evt.name == "init") {
		//log.debug "Received init from the driver."
		initialize()
		return;
	}

    // MQTT driver re-send all
	if (evt.name == "sendAll") {
		sendAll()
		return;
	}
		
	// Process incoming HUB DEVICE events, and publish to MQTT topics
	if (evt.name != "mqtt") {
		// See if driver is connected to MQTT broker
		if (mqttDriver.currentValue("connectionState") == "disconnected") {
			if (logEnable) log.debug "Error: App could not send MQTT event. Driver is not connected to MQTT broker!"
			return;
		}
		else {
			mqttDriver.publish("${evt.getDevice()}/attributes/${evt.name}/value","${evt.value}")
		}
	}
	// Process incoming MQTT events from subscibed topics
	else
	{
		def jsonSlurper = new JsonSlurper()
		def object = jsonSlurper.parseText(evt.value)
		
		if (logEnable) log.debug "event device: " + object.device
		if (logEnable) log.debug "event type: " + object.type
		if (logEnable) log.debug "event value: " + object.value
		
		eventDevice = deviceList.find {it.getDisplayName() == object.device}
		//log.debug "eventDevice: " + eventDevice
		eventDeviceCommands = eventDevice.getSupportedCommands()
		//log.debug "eventDeviceCommands: " + eventDeviceCommands
		eventDeviceCommand = eventDeviceCommands.find {it.name == object.type}
		//log.debug "eventDeviceCommand: " + eventDeviceCommand
		eventDeviceParams = eventDeviceCommand.getParameters()
		//log.debug "eventDeviceParams: " + eventDeviceParams
		
		// Determine if the updated command accepts parameters or not, and process
		if (eventDeviceParams) {
			if (object.value) {
				newstr = (object.value).split(',')
				
				switch(newstr.size()) {
				case 1:
					//log.debug "size 1";
					param1 = (eventDeviceParams[0].type == "NUMBER" ? newstr[0].toLong() : newstr[0])
					eventDevice."$object.type"(param1)
					break;
				case 2:
					//log.debug "size 2";
					param1 = (eventDeviceParams[0].type == "NUMBER" ? newstr[0].toLong() : newstr[0])
					param2 = (eventDeviceParams[1].type == "NUMBER" ? newstr[1].toLong() : newstr[1])
					eventDevice."$object.type"(param1, param2)
					break;
				case 3:
					//log.debug "size 3";
					param1 = (eventDeviceParams[0].type == "NUMBER" ? newstr[0].toLong() : newstr[0])
					param2 = (eventDeviceParams[1].type == "NUMBER" ? newstr[1].toLong() : newstr[1])
					param3 = (eventDeviceParams[2].type == "NUMBER" ? newstr[2].toLong() : newstr[2])
					eventDevice."$object.type"(param1, param2, param3)
					break;
				case 4:
					//log.debug "size 4";
					param1 = (eventDeviceParams[0].type == "NUMBER" ? newstr[0].toLong() : newstr[0])
					param2 = (eventDeviceParams[1].type == "NUMBER" ? newstr[1].toLong() : newstr[1])
					param3 = (eventDeviceParams[2].type == "NUMBER" ? newstr[2].toLong() : newstr[2])
					param4 = (eventDeviceParams[3].type == "NUMBER" ? newstr[3].toLong() : newstr[3])
					eventDevice."$object.type"(param1, param2, param3, param4)
					break;
				case 5:
					//log.debug "size 5";
					param1 = (eventDeviceParams[0].type == "NUMBER" ? newstr[0].toLong() : newstr[0])
					param2 = (eventDeviceParams[1].type == "NUMBER" ? newstr[1].toLong() : newstr[1])
					param3 = (eventDeviceParams[2].type == "NUMBER" ? newstr[2].toLong() : newstr[2])
					param4 = (eventDeviceParams[3].type == "NUMBER" ? newstr[3].toLong() : newstr[3])
					param5 = (eventDeviceParams[4].type == "NUMBER" ? newstr[4].toLong() : newstr[4])
					eventDevice."$object.type"(param1, param2, param3, param4, param5)
					break;
				} 
			}
			else
			{
				log.debug "Empty value received unexpectedly - abort"
			}
		}
		else
		{
			// No parameters needed for this command, so process it as-is
			eventDevice."$object.type"()
		}
	}
}

'''

@JasonJoel, you're right -- I just hadn't though about this in a while.

I think I'm getting back some values that are noisy, like rssi, that are probably causing more state changes than necessary. I can force those to false so they don't get picked up unless something important changes, and that'll cut down the event traffic.

Will still be a couple of days -- we're on the last march towards getting our 50 foot long, 150,000 LED project ready to haul off to Blackrock city.

1 Like

Thanks for all the effort.

I'm now focusing on working around bugs in the current node networking implementation.

Strange -- I stopped getting messages other than the heartbeat. All the settings are the same and the log showed nothing out of the ordinary.

And now it started working again ... mysterious ways and all that.

My YAMA app just started doing this as well.
I'm only getting messages for heartbeats.

1 Like

I tried rebooting the hub, restarting my mqtt broker, neither worked.
What DID work was removing the YAMA app and recreating it.