Thread synchronization/safety when multiple commands run simultaneously?

I'm building an integration with Kevo which doesn't like to receive a lot of commands at once. What I essentially did was made the lock() and unlock() methods add an object to an ArrayList each time you run the command. Then, I have a look (schedule/runIn) that runs every second and pulls stuff off that command ArrayList. What I've noticed is it really seems like I have a race condition. When I do a removeAt(0) on the list, it goes from having 2 items down to having 0 with a single removeAt. When I added some logging it really screams race condition. So I tried adding @Synchronized... which apparently isn't allowed. My question is, are things like this thread safe? I noticed that if I increased the time between running my schedule from once a setting to once every 5 seconds it works perfectly... further evidence of a race condition.

I guess my short question is, is there anyway to make an ArrayList thread safe? Right now it seems like when a command is adding while a cron is removing I have no guarantee it's going to work.

I tried searching here, but unfortunately on a forum the word "thread" comes up a lot :slight_smile:

I'd also love more info on what threading primitives are available in device and app code. I'm working on a driver that is trying to debounce events and having some sort of atomic state or concurrency primitives in device handlers would be wonderful.

atomicState is better-ish, however, it requires you to make a copy of an object like a map, make your changes, then push it back into atomicState. That still means a race can exist. So I went with an @Field, which I still don't think is thread safe, but it seems like it reduces the window where a race can occur to a minimum. From what I can tell, HE does not offer any thread safety primitives. Maybe @mike.maxwell or @chuck.schwer can shed some light?

There aren't any at the app and driver level. Atomic state is there for apps but not drivers.

OK thanks. Am I correct that there is still a race condition possible then? As I said I tried @synchronized but that's not allowed

Can you put full objects into atomicState? I think ConcurrentHashMap is available as an API, if you can stuff one of those into atomicState you can use putIfPresent to get mutex like behavior.

Hmm I didn't know ConcurrentHashMap was supported, I'll give it a shot

java.util.concurrent.ConcurrentLinkedQueue seems to be meeting my needs, wasn't aware the concurrent classes would work. Thanks!!

Awesome!

Would you mind sharing your code snippet? I'm doing some debouncing work as well and would love some examples.

Sure

It's not fully working yet. My runIn dies sometimes and I can't come up with a better way to do what I want. If I use schedule() I end up with multiple running at once. I'd rather use a schedule but then I'd need to come up with more of a true mutex so that the other schedule's just return if the mutex is locked... Unsure if I can find a way to do that

I tried creating a semaphore but something wasn't working right... it seemed that my finally {} was called on thread interruption, not just when my code finished. As a result I removed the finally {} and added the release to the end of my try and my catch. It seems to be working. It's not quite a debounce, but it's similar.

@mike.maxwell curious... how dangerous is what I posted there when it comes to my runAllActions since you mentioned there really isn't any good threading stuff in HE at this point? It seems to work and I did a bunch of testing for race conditions and it seems good

If it works it works...

1 Like

So without knowing all of the details of the execution logic in Hubitat you might want to hack your own semaphore in so you can ensure it never stays locked for more than X millis.

Something like this might work:

@Field AtomicLong mutex = new AtomicLong();

def myCode() {
    // Not sure how reliable the execution env is so we sanity check our
    // lock, if it has been held for more than 5s assume it is abandoned.
    if (now() - mutex.get() > 5000) {
        // Do some lock-busting error handling / logging

        // Remove the lock, don't care about the return value, if 'val'
        // isn't valid it just means another thread beat us to
        // lock-busting. compareAndSet is used because we don't want to
        // overwrite a valid lock from another thread.
        mutex.compareAndSet(val, 0);
    }

    try {
        // Attempt to get the lock, 0 is our marker for "unlocked", use
        // the current timestamp for the lock marker
        if (!mutex.compareAndSet(0, now())) {
            return;
        }

        // Do Stuff In Lock

        // Release the lock
        mutex.set(0);
    } catch (e) {
        // Release the lock
        mutex.set(0);
    }
}

I was thinking of something like this and it makes sense. I'm more of a .NET guy so I'm still learning the locking constructs that java/groovy have... I'll have to read on on AtomicLong, not familiar with that class.

All that being said, it's been running for about 24 hours now without issue!!!

In this specific case the important feature of AtomicLong is compareAndSet. It lets you specify the expected and new values and returns a boolean that will be true of the set succeeded.

Functionally it is very close to your Semaphore code you just get the timestamp guard ability as well.

Glad to hear it has been working!

1 Like

Way late here, but I'm considering writing my own apps because I've had problems with RM's lack of thread safety and resulting race conditions.

Based on my limited understanding, storing a concurrent hash map in atomic state wouldn't provide thread safety, but I'd love to be wrong. As far as I can tell, atomic state stores serialized objects, so it's not actually a reference to a thread-safe concurrent map. Therefore, any operation on it would necessarily construct an object, modify it, then serialize that object and write it back out. I'm not sure how "atomic" is the right word for atomic state, as it doesn't seem to provide any thread safety at all. Instead, it seems to just be a synchronous write operation, or a buffered write followed by a "flush."

Is "atomic state" a misnomer?

Were you able to get your mutex example to run on Hubitat? Maybe I'm importing the wrong package, but I'm getting: "Importing [java.util.concurrent.atomic.AtomicLong] is not allowed."

Any other luck achieving thread safety? I have a hard time believing anyone would introduce a multi-threaded model with all its inherent complexities without any way to manage that complexity. Otherwise it's literally impossible to write a "correct" program.

It was named that by SmartThings. Yes, I agree, it's kind of a misnomner. What it means is "I don't wait until the execution of the entire 'event' finishes to write to the db, I write when you ask me too." But completely agree, if you're coming from a programming background this is not going to give you true atomicity because of the problem with having to duplicate objects, modify, then put them back.

There are lots of options. For starters, the standard Java/Groovy keyword synchronized() is supported. I have multiple apps where I make use of this. Second, you can make use of some (but not all) of the concurrent Java classes. This was the first thread-safe app I wrote for HE, hubitat-kevo/apps/Kevo_Plus_Integration.groovy at master · dcmeglio/hubitat-kevo · GitHub it uses a ConcurrentLinkedQueue and a Semaphore to manage a shared queue.

Happy to try to help if you have specific questions as you dig in!

Thanks, that clears up a lot!

I see you're using the semaphore as just a mutex instead of synchronizing on an object. Is that just so you can use the non-blocking tryAcquire? It seems like you wouldn't need lock-busting logic for a synchronized object unless there's a bug in the JVM. Have you seen otherwise? Something like:

@Field static Object mutex = new Object()

def f() {
  synchronized (mutex) { ... }
}

If I were to instead synchronize on this, what would this refer to? I seem to remember reading that the platform creates a separate instance per event, so I'm guessing it wouldn't protect the critical section across events, right?

Is there a way to scope the mutex to f?

def f() {
  static Object mutex = new Object()  // Illegal
  synchronized (mutex) { ... }
}

This isn't legal, but would be slightly desirable for readability and for preventing mutex abuse (e.g., if I simply want to prevent two instances of a handler function from running concurrently).

Finally, one question about atomic state: can I assume that if a read of state.something occurs after state.something has been set by another thread, the read will see the new value rather than the previous value? Rephrased in code:

@Field static Object mutex = new Object()

def handler() {
  synchronized (mutex) {
    if (state.everythingIsDone) { return }
    // Using atomic state, is this guaranteed
    // to execute at most once?
    state.everythingIsDone = true
  }
}

Thanks for your help!

state is read at startup and written at exit of an event execution. So other threads will not see a change until after the 'write/change thread' exits

atomicState is written to and read from the db. so you see changes 'as they happen'

mixing atomicState and state for variable is generally bad. It can appear to work, but if you write it both with atomicState and state, the state may re-write the variable when an event execution ends.

There are other ways to deal with data across executions - see:

1 Like