Thread synchronization/safety when multiple commands run simultaneously?

Hmm I didn't know ConcurrentHashMap was supported, I'll give it a shot

java.util.concurrent.ConcurrentLinkedQueue seems to be meeting my needs, wasn't aware the concurrent classes would work. Thanks!!

Awesome!

Would you mind sharing your code snippet? I'm doing some debouncing work as well and would love some examples.

Sure

It's not fully working yet. My runIn dies sometimes and I can't come up with a better way to do what I want. If I use schedule() I end up with multiple running at once. I'd rather use a schedule but then I'd need to come up with more of a true mutex so that the other schedule's just return if the mutex is locked... Unsure if I can find a way to do that

I tried creating a semaphore but something wasn't working right... it seemed that my finally {} was called on thread interruption, not just when my code finished. As a result I removed the finally {} and added the release to the end of my try and my catch. It seems to be working. It's not quite a debounce, but it's similar.

@mike.maxwell curious... how dangerous is what I posted there when it comes to my runAllActions since you mentioned there really isn't any good threading stuff in HE at this point? It seems to work and I did a bunch of testing for race conditions and it seems good

If it works it works...

1 Like

So without knowing all of the details of the execution logic in Hubitat you might want to hack your own semaphore in so you can ensure it never stays locked for more than X millis.

Something like this might work:

@Field AtomicLong mutex = new AtomicLong();

def myCode() {
    // Not sure how reliable the execution env is so we sanity check our
    // lock, if it has been held for more than 5s assume it is abandoned.
    if (now() - mutex.get() > 5000) {
        // Do some lock-busting error handling / logging

        // Remove the lock, don't care about the return value, if 'val'
        // isn't valid it just means another thread beat us to
        // lock-busting. compareAndSet is used because we don't want to
        // overwrite a valid lock from another thread.
        mutex.compareAndSet(val, 0);
    }

    try {
        // Attempt to get the lock, 0 is our marker for "unlocked", use
        // the current timestamp for the lock marker
        if (!mutex.compareAndSet(0, now())) {
            return;
        }

        // Do Stuff In Lock

        // Release the lock
        mutex.set(0);
    } catch (e) {
        // Release the lock
        mutex.set(0);
    }
}

I was thinking of something like this and it makes sense. I'm more of a .NET guy so I'm still learning the locking constructs that java/groovy have... I'll have to read on on AtomicLong, not familiar with that class.

All that being said, it's been running for about 24 hours now without issue!!!

In this specific case the important feature of AtomicLong is compareAndSet. It lets you specify the expected and new values and returns a boolean that will be true of the set succeeded.

Functionally it is very close to your Semaphore code you just get the timestamp guard ability as well.

Glad to hear it has been working!

1 Like

Way late here, but I'm considering writing my own apps because I've had problems with RM's lack of thread safety and resulting race conditions.

Based on my limited understanding, storing a concurrent hash map in atomic state wouldn't provide thread safety, but I'd love to be wrong. As far as I can tell, atomic state stores serialized objects, so it's not actually a reference to a thread-safe concurrent map. Therefore, any operation on it would necessarily construct an object, modify it, then serialize that object and write it back out. I'm not sure how "atomic" is the right word for atomic state, as it doesn't seem to provide any thread safety at all. Instead, it seems to just be a synchronous write operation, or a buffered write followed by a "flush."

Is "atomic state" a misnomer?

Were you able to get your mutex example to run on Hubitat? Maybe I'm importing the wrong package, but I'm getting: "Importing [java.util.concurrent.atomic.AtomicLong] is not allowed."

Any other luck achieving thread safety? I have a hard time believing anyone would introduce a multi-threaded model with all its inherent complexities without any way to manage that complexity. Otherwise it's literally impossible to write a "correct" program.

It was named that by SmartThings. Yes, I agree, it's kind of a misnomner. What it means is "I don't wait until the execution of the entire 'event' finishes to write to the db, I write when you ask me too." But completely agree, if you're coming from a programming background this is not going to give you true atomicity because of the problem with having to duplicate objects, modify, then put them back.

There are lots of options. For starters, the standard Java/Groovy keyword synchronized() is supported. I have multiple apps where I make use of this. Second, you can make use of some (but not all) of the concurrent Java classes. This was the first thread-safe app I wrote for HE, hubitat-kevo/apps/Kevo_Plus_Integration.groovy at master · dcmeglio/hubitat-kevo · GitHub it uses a ConcurrentLinkedQueue and a Semaphore to manage a shared queue.

Happy to try to help if you have specific questions as you dig in!

Thanks, that clears up a lot!

I see you're using the semaphore as just a mutex instead of synchronizing on an object. Is that just so you can use the non-blocking tryAcquire? It seems like you wouldn't need lock-busting logic for a synchronized object unless there's a bug in the JVM. Have you seen otherwise? Something like:

@Field static Object mutex = new Object()

def f() {
  synchronized (mutex) { ... }
}

If I were to instead synchronize on this, what would this refer to? I seem to remember reading that the platform creates a separate instance per event, so I'm guessing it wouldn't protect the critical section across events, right?

Is there a way to scope the mutex to f?

def f() {
  static Object mutex = new Object()  // Illegal
  synchronized (mutex) { ... }
}

This isn't legal, but would be slightly desirable for readability and for preventing mutex abuse (e.g., if I simply want to prevent two instances of a handler function from running concurrently).

Finally, one question about atomic state: can I assume that if a read of state.something occurs after state.something has been set by another thread, the read will see the new value rather than the previous value? Rephrased in code:

@Field static Object mutex = new Object()

def handler() {
  synchronized (mutex) {
    if (state.everythingIsDone) { return }
    // Using atomic state, is this guaranteed
    // to execute at most once?
    state.everythingIsDone = true
  }
}

Thanks for your help!

state is read at startup and written at exit of an event execution. So other threads will not see a change until after the 'write/change thread' exits

atomicState is written to and read from the db. so you see changes 'as they happen'

mixing atomicState and state for variable is generally bad. It can appear to work, but if you write it both with atomicState and state, the state may re-write the variable when an event execution ends.

There are other ways to deal with data across executions - see:

1 Like

I was specifically asking how "as they happen" atomic state is. My example code snippet shows the race condition I'm worried about, already assuming that I'm using atomic state. Am I guaranteed that as soon as the set-state command finishes executing that the new state has already been written, or does setting atomic state merely enqueue the DB write for "almost immediate" write, as implied by the SmartThings documentation?

Whatever the answer, I've decided to ditch state altogether for what should really be modeled as a thread-safe, in-memory state machine, and only using state/atomic state for best-effort persistence so I can reconstruct in-memory state on Hubitat reboot.

At the time I wrote this I didn’t know synchronized worked in HE. If memory serves me, it did not in ST which was a consequence of it being cloud based (two executions maybe executing on different servers) So I think I just assumed the same in HE since I could find no examples using it and no reference to it. So I did like many and assumed the ST docs were valid. Search https://docs.smartthings.com/en/latest/code-review-guidelines.html for synchronized. The reasons to not use it in ST do not apply to HR

Correct. I generally use a static @Field since that will persist across all those instances. [quote="peterwoggle, post:19, topic:21304"]

Using atomic state, is this guaranteed
    // to execute at most once?

[/quote]

That I don’t know for sure. I believe so, but one of the platform developers would need to chime in, I think. I believe a definitive answer relies on internal knowledge of the architecture that isn’t public.

For some reason I'm not seeing reference to this topic.. where the same sort of issue was discussed, and responded to by Hubitat.

If it's already been examined, great.. but I just didn't see it.

Using atomic state, is this guaranteed

I'd say according to my testing (mentioned here) this is not guaranteed, and is indeed not much different than using regular state.

I haven't tested the @Field trick, but regular synchronized is not working. I'd say there is no thread safe way to update state in Hubitat App or Driver (UPDATE: I was wrong - static field does solve the problem).
Given that there's no way to avoid multiple threads in any real app, I'd also go further and say that simply there's no way to update state in Hubitat without race conditions (UPDATE: I was wrong). I'll try to test @Field trick soon, and I really hope it works.

Synchronized does works. However I’ve you’re not locking on a static field then you’re not locking on a shared object thus you’re not actually synchronized.

1 Like

It does work indeed it seems! At least, for now. That's great news :slight_smile:

1 Like