Skip to content

[#41] Implement concurrent message processing#42

Open
lispyclouds wants to merge 3 commits intomainfrom
conc
Open

[#41] Implement concurrent message processing#42
lispyclouds wants to merge 3 commits intomainfrom
conc

Conversation

@lispyclouds
Copy link
Copy Markdown
Member

  • Implements concurrent processing of mesaages
  • Uses a mutex when writing to stdout
  • Bump Golang to 1.25 and deps
  • Cleanup code

- Implements concurrent processing of mesaages
- Uses a mutex when writing to stdout
- Bump Golang to 1.25 and deps
- Cleanup code
@lispyclouds lispyclouds requested a review from borkdude March 18, 2026 07:58
@lispyclouds
Copy link
Copy Markdown
Member Author

@borkdude this should work. any ideas on what would be a good enough test to add maybe?

@lispyclouds lispyclouds marked this pull request as ready for review March 18, 2026 08:15
@borkdude
Copy link
Copy Markdown
Contributor

Thanks. A test that heavily exercises the concurrent access? I suspect sqlite might not handle this well so the more aggressive the test is the better :)

@lispyclouds
Copy link
Copy Markdown
Member Author

Sqlite might just surprise us, as it did with huge workloads at work. Will think and get something out later today.

@borkdude
Copy link
Copy Markdown
Contributor

You could just copy the babasha-sql-pod test that went in with the concurrency stuff. Some numbers on if this makes it faster or not would also be interesting.

@lispyclouds
Copy link
Copy Markdown
Member Author

lispyclouds commented Mar 18, 2026

Just tried it with this little test from the sqlpods repo:

(deftest concurrent-tests
  (let [conn (sqlite/get-connection temp-file)]
    (testing "concurrent requests"
      (let [n 20]
        (sqlite/execute! conn ["create table concurrent_test ( id int, val int );"])
        (let [futures (mapv (fn [i]
                              (future (sqlite/execute! conn [(format "insert into concurrent_test values (%d, %d);" i (* i 10))])))
                            (range n))]
          (run! deref futures)) ; stuck here
        (let [result (sqlite/query conn ["select count(*) as cnt from concurrent_test;"])]
          (is (= n (:cnt (first result)))))))))

seems to be stuck on derefing those futures. tried to put prns before and after the future call like so:

(mapv (fn [i]
        (future
          (prn :start i)
          (sqlite/execute! conn [(format "insert into concurrent_test values (%d, %d);" i (* i 10))])
          (prn :end i))
        (range n)))

all prints as expected. this is also stuck without my changes too btw. so interesting. trying to dig more. tired with a doseq on the futures with a prn, can see that it derefs maybe twice and then gets stuck.

Running on bb master build on linux amd64.

@lispyclouds
Copy link
Copy Markdown
Member Author

thought that the db might be locked but connected via cli and can see all the values and able to write to it too, so cant be the inbuilt sqlite write lock i think

@lispyclouds
Copy link
Copy Markdown
Member Author

don't seem to see anything obvious on the go side, all the messages come at its responding successfully. seems to be something on bb side?

@lispyclouds
Copy link
Copy Markdown
Member Author

Any ideas on this @borkdude? I'm trying to find more time to look into this properly, maybe you already know what's up?

@borkdude
Copy link
Copy Markdown
Contributor

Doesn't ring a bell

@lispyclouds
Copy link
Copy Markdown
Member Author

lispyclouds commented Mar 22, 2026

Some claude assisted debugging later:

It seems there is some locking going on bb's response routing when it comes to concurrent requests. Adding this staggered send seems to fix it:

(let [futures (mapv (fn [i]
                      (future
                        (Thread/sleep (* i 50)) ; stagger by 50ms each
                        (sqlite/execute! conn [...])))
                    (range n))]
  (run! deref futures))

not sure how to approach this if the analysis is correct @borkdude , some pointers could help me.

@lispyclouds
Copy link
Copy Markdown
Member Author

To add more context:

  • not all requests arrive on the go side
  • all that arrives are successfully dealt with, can see on the db too
  • the number of successful ones are always random < n. eg 14/20, 18/20, 15/20 etc

@borkdude
Copy link
Copy Markdown
Contributor

@lispyclouds If you're using Claude code it's probably best to let it use the pod library directly instead of bb so it can debug that too.

@lispyclouds
Copy link
Copy Markdown
Member Author

Not sure how it works on the sql-pods thing which is think is the same stdout loop? Lemme see those changes too.

@borkdude
Copy link
Copy Markdown
Contributor

Should be very similar. The only difference is that it's written in JVM Clojure.

@lispyclouds
Copy link
Copy Markdown
Member Author

Adding some excerpts from claude to keep notes:

  • babashka's pod reader loop — the thing that reads responses back from stdout and dispatches them to the waiting futures — may itself be single-threaded or have a bounded internal dispatch queue. When 20 futures are all blocked waiting for their response, and babashka's reader can only unblock them one at a time with no backpressure relief, you get a livelock or deadlock depending on how bb schedules those futures' threads.
  • The core issue is likely invoke-id correlation under concurrent load. Each pod message has an id, and babashka should route the response back to the future holding that id. If bb's response-dispatch loop is blocking or dropping correlations under concurrent load, some futures never get unblocked — which is exactly the "stuck at deref" symptom.

will try to let it debug the pod code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants