What is Hubitat's thread safety model?

It'd be great if you could convince the folks over at Kwikset to change their API, or for that matter even release a public API instead of leading me to have to reverse engineer what their app and website do. However, I don't think that's going to happen. So I decided to build something to make it work and distribute it free to the community, further adding value to HE and theoretically more sales for you guys since you have more devices you integrate with. I'm just asking for better tools to help you expand your platform as a developer.

Yes, it is viable. But "viable" and "good" aren't always the same. Making the code synchronous makes it consume more resources since it's now a blocking operation on something that is inherently non-blocking. Every OS built in the last 30 years (Including whatever flavor of Linux HE is based on) are designed to have network IO be a non-blocking operation. From my standpoint, I do kind of wonder, when I and others have had to do things like this in our code, is the decisions I'm making that make me cringe as a developer, is this why I experience so many hub slowdowns that make me have to reboot my hub nightly? Maybe not, but it definitely could be. If I were building my own piece of software, not within HE, I'd be making as much of it asynchronous as possible.

While that's certainly your call for your platform, as a developer trying to expand the footprint of your platform (I've now built and published integrations for Kevo, HEOS, BOND, Kohler DTV+ as well as several apps people seem to find useful), along with some of the other developers on this thread asking for better tools, I can say it's a little disappointing. Something I've learned in various markets, and I think home automation is one of them, people buy because of the ecosystem that is built up around the hub, not the hub itself. All of the integrations, apps, and device drivers others build are a major reason people choose a platform. There is value in giving developers the tools their looking for since they help expand your platform for you, and best of all, you never even have to give us a single dollar! The biggest thing that made me pause when switching from ST to HE was the realization that a lot of things I had working with ST because others had already built device integrations were things I was going to have to build myself for HE. As a developer, I was good with doing that. Others who aren't developers though may have just gone elsewhere.

2 Likes

As a business we always have to confront the issue of how to prioritize what we apply resources to. It's not a perfect world, we don't have unlimited resources, we can't solve every problem that comes along right now. Certainly you could find it disappointing that we can't put highest priority on your specific issue. The real world dictates that perfection of the platform will always be out of reach.

7 Likes

@bravenel, thanks for the explanation.
@mike.maxwell, I'm just trying to write correct thread safe code. And for that I need to understand the API and guarantees Hubitat provides.

But to be more specific, I'm looking at the script here (I have it installed and planning to update soon): influxdb-logger.groovy.

Here are some relevant parts, and marked interesting points with ## numbers ##:

def installed() {
    state.loggingLevelIDE = 5

    // ## 1 ##
    // Needs to be synchronized in case another event happens at the same time
    synchronized(this) {
        state.queuedData = []
    }
}

// ...

def updated() {
    logger("updated()","trace")

    // ## 2 ##
    state.loggingLevelIDE = (settings.configLoggingLevelIDE) ? settings.configLoggingLevelIDE.toInteger() : 3

    // ## 3 ## 
    state.headers = [:] 
    state.headers.put("HOST", "${state.databaseHost}:${state.databasePort}")

    // ...

    manageSubscriptions()
}

private manageSubscriptions() {
    logger("manageSubscriptions()","trace")
    unsubscribe()
    if (prefLogModeEvents) subscribe(location, "mode", handleModeEvent)
    
    // ...
}

def handleModeEvent(evt) {
    logger("handleModeEvent(): Mode changed to: ${evt.value}","info")
    // ...
    def data = // ..
    queueToInfluxDb(data)
}

def queueToInfluxDb(data) {
    // ...
    // ## 4 ##
    synchronized(this) {
        state.queuedData.add(data)
        // ...   
    }

    // ...

    // ## 5 ## 
    state.headers are supposed to be read here, unsynchronized
}

private logger(msg, level = "debug") {
    switch(level) {
        // ... 
        case "trace":
            if (state.loggingLevelIDE >= 5) log.trace msg // ## 6 ##
            break
        // ...
    }
}

And here are the questions:

  1. (this same data also accessed at ## 3 ##) - is this (existing) comment really true? It seems like nothing can run before installed() subscribed to anything. So probably not?
  2. updated() can change state.loggingLevelIDE, and any call to logger() will read it in ## 6 ##. Is there a need to synchronize this or use atomic value or something like that? Is there a happens-before relationship between writing to state.loggingLevelIDE and reading from it here? Because if there's not, logger() method may never see updated state.loggingLevelIDE.
  3. state.headers is constructed in multiple steps, unsynchronized here, and is supposed to be read from within event handlers around ## 5 ## (because of a bug in this code, it's not used, but I'm fixing this). This seems to be a really big issue - not only there's probably no happens-before relationship between write and read, the state of the headers object can also be corrupt and is not synchronized.

Is my understanding of these issues correct?
Thanks for your patience!

Installed only runs once when the device is initially installed, only one call, thus only one instance.
Updated is only run when save preferences is clicked in the ui, unlikely updated would be an issue.
As far as creating a q for influx, you're making it more complicated than it needs to be. When the event comes in push it to your endpoint... there's really nothing to synchronize, if one thread is doing one event and another event is being handled by a separate thread who cares.

4 Likes

Thanks for the reply!

As I've mentioned, the code is not mine. However, adding bulk send was a recent change and apparently a needed one.

From your reply I'll try infer answer to my questions:

  1. This comment is incorrect, no need to lock in installed().
  2. "Unlikely updated would be an issue" seems to indicate that it could be an issue. And since there is no documented or mentioned guarantees provided by Hubitat, it seems like synchronization even for state.loggingLevelIDE (like making it AtomicInteger, if I understand things correctly) is necessary.
  3. state.headers construction and update in the state needs to be synchronized too.

I wish Hubitat did run all the callbacks from a single thread (at least by default), I think that would make things much simpler for everyone.

how do you figure this is an incorrect statement?

This is designed as an internal method called by the hub, I write a fair number of drivers, and have never seen it called more than once when the platform installs the driver, either via device inclusion or when manually added...
If it's being called more than once, then this is an issue with the driver calling it somewhere else in the code...

Again updated is designed to be called from the UI, so yeah, if there are two browser sessions open each having different preference settings and save settings is clicked at the same time, then sure, last writer will win, if that's not an unlikely situation, then I don't know what is...

maybe, it all depends..., there can be issues with state getting tromped on, but in most cases it can be overcome by simply understanding that it can be an issue, but it's not guaranteed to be an issue...

Sorry for being unclear. I meant that the comment in the code

// Needs to be synchronized in case another event happens at the same time

is incorrect, not your statement.

This would be fine. After all, we expect only the last value to end up stored :slight_smile:
What the real issue is, without any guarantees from Hubitat, updated int value in the state may not become visible to other threads even after it was saved. They may even see an incorrect value (if it happens to be torn, not sure if this can happen for this particular case). And all of that, for simple int. When just one updated() writes to state and some other callback is reading it even later on from another thread. Unless I'm missing something, of course.

Thanks for your code with semaphores, it helped me solve my problem.

The code in post 23 would be problematic even assuming the synchronization of that synchronized block works. You would need to use atomicState there. You should read the SmartThings docs about state and atomicState. When you use atomicState, you are basically writing to the db directly, but the regular “state” object is just a copy of your app’s db entries that gets copied before your handler method executes, and any changes aren’t written back to the db until after that execution is finished. So in your example code, even if one thread is waiting on another thread at the synchronized block, then the waiting thread will have already obtained its copy of the db entries (state), which won’t reflect any changes the first thread makes.

2 Likes

Thanks for pointing this out. I did go and re-read the state docs, and now I see that thread safety is not as big of a problem as I originally thought.

Do you know if atomicState also re-reads data from DB every time you access it? Seems like in the following scenario even with atomicState, there could be an issue?

  1. queuedData is ""
  2. Thread1 reads DB. queuedData for thread1 is ""
  3. Thread2 reads DB. queuedData for thread2 is ""
  4. Thread1 appends data (reads it into string, updates string, stores string) to queuedData. Thread1's queuedData is "data1". It's written to DB. DB contains "data1".
  5. Thread2 appends data to queuedData. Thread2's queuedData is "data2". It's written to DB. DB contains "data2", instead of "data1data2".

Is my understanding correct here? Is there a way to avoid this race condition?
Is there a better way to handle data accumulation like in the snippet? Perhaps without touching the state at all?

Thanks!

def threadSafeAppend(String data) {
    synchronized(this) {
        atomicState.queuedData = (atomicState.queuedData ?: "") + data
    }
}

Calling this method from a bunch of threads in parallel, each thread trying to append a different letter...

["a","b","c","d","e","f","g"].parallelStream().forEach { 
    log.debug "${threadSafeAppend(it)}"
}

...logs the following...

[app:1057] 2020-03-04 07:44:38.137 pm [debug] ebcafgd
[app:1057] 2020-03-04 07:44:38.130 pm [debug] ebcafg
[app:1057] 2020-03-04 07:44:38.124 pm [debug] ebcaf
[app:1057] 2020-03-04 07:44:38.121 pm [debug] ebca
[app:1057] 2020-03-04 07:44:38.105 pm [debug] ebc
[app:1057] 2020-03-04 07:44:38.098 pm [debug] eb
[app:1057] 2020-03-04 07:44:38.090 pm [debug] e

..., which means it worked... nothing got overwritten.

If you take out the "synchronized", it behaves as you describe...

[app:1057] 2020-03-04 07:52:01.244 pm [debug] ec
[app:1057] 2020-03-04 07:52:01.238 pm [debug] fa
[app:1057] 2020-03-04 07:52:01.233 pm [debug] gd
[app:1057] 2020-03-04 07:52:01.228 pm [debug] b
[app:1057] 2020-03-04 07:52:01.223 pm [debug] e
[app:1057] 2020-03-04 07:52:01.218 pm [debug] f
[app:1057] 2020-03-04 07:52:01.212 pm [debug] g

Thanks, but I guess it's not the same as when Hubitat itself calls threadSafeAppend as a callback?

Per my understanding of atomicState's documentation, atomic state will be loaded from DB before each execution, and written to DB every time it's updated.

In your example, it's a single execution, so per documentation it'll load from DB once, and then you call this one loaded atomicState object from multiple threads.

In case where Hubitat calls the callbacks, it'll load from DB for every execution, and atomicState object is going to be potentially different (of course, I'm only making guesses here, I'm probably wrong) for every call.

So here's my attempt (it's quite possible I'm doing something wrong here) to do the same test, but with making Hubitat to call the callback on multiple threads:

def getAppends() { 10 }
def getChars() { 'b' }

void scheduleAllAppends()
{
    for (def val in 0..getAppends()) {
        runInMillis(1, "appendData${val}")
    }
    
    runInMillis(2000, checkData)
}
                    
void appendImpl(def num)
{
    for (def c in 'a'..getChars()) {
        threadSafeAppend("${c}${num}");
    }
}

void appendData0() { appendImpl(0) }
void appendData1() { appendImpl(1) }
void appendData2() { appendImpl(2) }
void appendData3() { appendImpl(3) }
void appendData4() { appendImpl(4) }
void appendData5() { appendImpl(5) }
void appendData6() { appendImpl(6) }
void appendData7() { appendImpl(7) }
void appendData8() { appendImpl(8) }
void appendData9() { appendImpl(9) }
void appendData10(){ appendImpl(10) }

void checkData()
{
    log.info("Checking atomicState.queuedData...") 
    def data = atomicState.queuedData
    for (def val in 0..getAppends()) {
        for (def c in 'a'..getChars()) {
            def stringToFind = "${c}${val}"
            if (!data.contains(stringToFind)) {
                log.info("queuedData is incomplete - '${stringToFind}' not found in ${data}")
            }
        }
    }
    log.info("Done checking atomicState.queuedData...")
}

def threadSafeAppend(String data) {
    String oldData
    String updatedData
       
    synchronized(this) {
        oldData = atomicState.queuedData  
        atomicState.queuedData = oldData + data
        updatedData = atomicState.queuedData
    }
    
    log.info("Appended '${data}'. '${oldData}' => '${updatedData}'")
        
    updatedData
}

private def update()
{
    unschedule()
    atomicState.queuedData = ""
    scheduleAllAppends()
}

def installed()
{
    update()
}

def updated()
{
    update()
}

The output I get:

23:08:18.147 info Done checking atomicState.queuedData...
23:08:18.146 info queuedData is incomplete - 'a10' not found in a0b0b1a2b2a3b3a4b4a5b5a6b6a7b7a8b8a9b9b10
23:08:18.141 info queuedData is incomplete - 'a1' not found in a0b0b1a2b2a3b3a4b4a5b5a6b6a7b7a8b8a9b9b10
23:08:18.139 info Checking atomicState.queuedData...
23:08:16.232 info Appended 'b9'. 'a0b0b1a2b2a3b3a4b4a5b5a6b6a7b7a8b8a9' => 'a0b0b1a2b2a3b3a4b4a5b5a6b6a7b7a8b8a9b9' ## Or maybe this one overwrites a10
23:08:16.229 info Appended 'b10'. 'a0b0b1a2b2a3b3a4b4a5b5a6b6a7b7a8b8a9b9' => 'a0b0b1a2b2a3b3a4b4a5b5a6b6a7b7a8b8a9b9b10' ## Overwrites a10
23:08:16.221 info Appended 'a10'. 'a0b0b1a2b2a3b3a4b4a5b5a6b6a7b7a8b8' => 'a0b0b1a2b2a3b3a4b4a5b5a6b6a7b7a8b8a10'
23:08:16.217 info Appended 'a9'. 'a0b0b1a2b2a3b3a4b4a5b5a6b6a7b7a8b8' => 'a0b0b1a2b2a3b3a4b4a5b5a6b6a7b7a8b8a9'
23:08:16.214 info Appended 'b8'. 'a0b0b1a2b2a3b3a4b4a5b5a6b6a7b7a8' => 'a0b0b1a2b2a3b3a4b4a5b5a6b6a7b7a8b8'
23:08:16.208 info Appended 'a8'. 'a0b0b1a2b2a3b3a4b4a5b5a6b6a7b7' => 'a0b0b1a2b2a3b3a4b4a5b5a6b6a7b7a8'
23:08:16.152 info Appended 'b7'. 'a0b0b1a2b2a3b3a4b4a5b5a6b6a7' => 'a0b0b1a2b2a3b3a4b4a5b5a6b6a7b7'
23:08:16.145 info Appended 'a7'. 'a0b0b1a2b2a3b3a4b4a5b5a6b6' => 'a0b0b1a2b2a3b3a4b4a5b5a6b6a7'
23:08:16.137 info Appended 'b6'. 'a0b0b1a2b2a3b3a4b4a5b5a6' => 'a0b0b1a2b2a3b3a4b4a5b5a6b6'
23:08:16.129 info Appended 'a6'. 'a0b0b1a2b2a3b3a4b4a5b5' => 'a0b0b1a2b2a3b3a4b4a5b5a6'
23:08:16.106 info Appended 'b5'. 'a0b0b1a2b2a3b3a4b4a5' => 'a0b0b1a2b2a3b3a4b4a5b5'
23:08:16.100 info Appended 'a5'. 'a0b0b1a2b2a3b3a4b4' => 'a0b0b1a2b2a3b3a4b4a5'
23:08:16.088 info Appended 'b4'. 'a0b0b1a2b2a3b3a4' => 'a0b0b1a2b2a3b3a4b4'
23:08:16.083 info Appended 'a4'. 'a0b0b1a2b2a3b3' => 'a0b0b1a2b2a3b3a4'
23:08:16.078 info Appended 'b3'. 'a0b0b1a2b2a3' => 'a0b0b1a2b2a3b3'
23:08:16.073 info Appended 'a3'. 'a0b0b1a2b2' => 'a0b0b1a2b2a3'
23:08:16.045 info Appended 'b2'. 'a0b0b1a2' => 'a0b0b1a2b2'
23:08:16.038 info Appended 'a2'. 'a0b0b1' => 'a0b0b1a2'
23:08:15.990 info Appended 'b1'. 'a0b0' => 'a0b0b1'
23:08:15.988 info Appended 'b0'. 'a0' => 'a0b0' ## It sees only 'a0'
23:08:15.985 info Appended 'a1'. 'a0' => 'a0a1'
23:08:15.982 info Appended 'a0'. '' => 'a0'

So I still have a question if there's a good way to accumulate data in a thread-safe way, but I guess not.

1 Like

Oh yeah, that behavior makes since now that I look at the state diagram in the docs. The db read happens before your app executes and thus is outside of the synchronized statement. I have another idea though. Try creating a child smart app whose only purpose is to save and retrieve data from its atomicState, and then call its methods within a synchronized block.

So move this to a child app without the synchronized:

def threadSafeAppend(String data) {
    String oldData = atomicState.queuedData  
    atomicState.queuedData = oldData + data
    String updatedData = atomicState.queuedData

    log.info("Appended '${data}'. '${oldData}' => '${updatedData}'")
    
    updatedData
}

And then change your original threadSafeAppend method (in your parent) to:

def threadSafeAppend(String data) {
    synchronized(this) {
        def child = findChildAppByName(“My Child App”)
        child.threadSafeAppend(data)
    }
}
1 Like

Thanks, it took me a while to test your suggestion.
It still doesn't seem to be working, and I'm still getting data corruption :frowning: .

I want to fix issues in some of the community scripts that I'm using on my HE.

So I first sought out some documentation on the Hubitat environment. Didn't find anything on threading model or several other key things to understand before spending time on these scripts. Some key items on Developer Documentation (such as State) take me to "There is currently no text in this page...". It doesn't exist. While searching for thread safety I landed on this forum thread. Community members referencing ST's documentation for a key component (atomicstate) seems really odd to me.

Let me first say that I really appreciate @bravenel responses in this thread and Hubitat interaction with the community. I'm sure there is a stack of issues he'd like to be making progress on, and taking the time to respond to help the community here is really appreciated.

I'd like to share a couple of thoughts on the entry barrier of which thread safety is a piece of.

  1. Developers with any experience on this platform (or on SmartThings) are most likely familiar with the issues raised by multi-threading. Many end users are as well, because it is not difficult to run into problems with Rules in Rule Machine --> a not infrequent topic when an error is thrown by a rule caused by this. We assume that app and driver developers are familiar with this topic in general, or quickly become familiar with it when they encounter it.

This indicates one needs to have key tribal knowledge of this platform in order to write quality driver & app and be efficient in their time developing for it. A wiki page explaining the basic from some with access to the core sources would help a lot of people.

There is value in giving developers the tools their looking for since they help expand your platform for you, and best of all, you never even have to give us a single dollar!

In order for Hubitat to best attract developers to spend their time on doing this, there needs to be good documentation to start with. Android is a different world, but it's worth considering briefly as a case study. The main reason why it caught on and quickly became such a success is because the developer SDK documentation is very well fleshed out (and they made tools for developers to use easily). If you look at the platform documentation however (AOSP internals), it's really only begun to be fleshed out in the past few years. My point is, they made it as easy as they could for app developers to sell their platform for them, and good documentation targeting those developers made it possible to ramp up quickly.

As a business we always have to confront the issue of how to prioritize what we apply resources to.

I totally understand this and work in the world of prioritizing software issues every day. If a goal is to make this platform accessible to owner developers, however, Hubitat needs to prioritize documentation of the fundamental environment the scripts are operating in. Otherwise, those who may try to contribute will spend their time elsewhere. If developer-driven expansion is a lower priority than other business goals, putting off documentation is fine for the business purposes.

As a low-level software engineer with many years of experience, I want to contribute to this community (and fix those scripts I see issues in), and think I have a good background for it. But my time is limited. I have other priorities in life. I know I could bumble around through trial and error figuring things like how atomic atomicstate really is, how methods can be called on different threads and the synchronization solutions available, but that will take considerably more time than I expected with no official documentation. The barrier to entry for contribution is steeper than I expected, so I am unlikely to spend my free time learning the tribal knowledge required to use my time efficiently contributing here.

2 Likes

Context: I’m a professional software developer, and have spent literal years of my life working multi-threading and synchronization topics. I also have a dozen or so apps and drivers released and in usage by the community.

I have not had to think about locking or semaphores or anything like that at all while developing my drivers and apps. And I would bet a dollar that 99% of all drivers and apps do not need it. I think I’ve only seen a single example of an app from another forum member where it looked necessary.

It’s an event-based model. Your code is called in response to an event queue. Now, could you create a race condition by your application-level logic, such as having two different apps that do things to trigger each other infinitely? Of course. But that’s a problem with application logic, and not low-level thread synchronization.

To get into one detail you mentioned: my understanding of state and atomicState is that state writes to the database when your event is done being handled. AtomicState writes to the database immediately. But you still don’t need to worry about thread synchronization.

Think of it like running some JavaScript in a browser. To the developer, it’s single-threaded and based on events and callbacks.

(Anyone please feel free to correct me if I’m wrong on a detail. But it stands that thread synchronization is not something I’ve had to worry about while developing my apps and drivers.)

2 Likes

I guess it's a "bit" late for the reply, but I guess I'll just add it here for some other desperate developers to read :slight_smile:

My understanding is that in JavaScript, your events run on the same thread (at least, from the developer's point of view). The event loop picks them up from the queue, and they run one after another, thus safely changing shared objects such as DOM.

You'd expect that the same thing is happening in Hubitat (well, I expected that for sure before I was disillusioned).

In Hubitat's case, the events are actually handled on different threads, and there is no synchronization of state between them. Both atomicState and state are not thread safe for this purpose - there's no guarantee that if one event is updating state, the other one will see the consistent update.

So you have to manually do this with tricky undocumented global synchronization objects like these (from this thread):

@Field static Object mutex = new Object()

def handler() {
  synchronized (mutex) {
       // Your code under real mutex
  }
}

Which is frustrating.

1 Like

I’m curious what events you have that are happening so close together in time? In my house, events occur seconds and minutes apart. No one is simultaneously sending multiple conflicting commands to my door locks, for example.

1 Like

You may not fully understand the issues in concurrent processes that interact, and which need mutually exclusive access to a resource.

Just as one example, the Litter Robot driver needs to poll a cloud server to get status and receive events. On my hub, a rule needs to interact with that driver and with another rule that changes color (red, yellow, green, and flashing red) of Hue under-vanity lightstrips to indicate the level of cat poop in the litter drawer, and whether the poop level is critical and needs to be emptied.

Those processes are asynchronous and unrelated, and critical regions are needed to control the lightstrips. It’s the asynchronous nature of the cloud server responses relative to the polling of the cloud server and the flashing of the under-vanity light period that creates the concurrency issues.

Just one example. I’ve got several.

1 Like

First - energy metering devices can send many events per second.

Second - you sound like you’re assuming a single event type for one device. What if I have a handler that is handling all temp changes for 20 different devices?

Yeah, I find synchronization to be a challenge some times. Not because it’s hard (though it is a more challenging programming topic) but because as @artyom.tokmakov said, none of this is documented and so we just get to try to figure it out and make our best guesses about what is going on under the hood.

2 Likes