concurrent
concurrent.cr
Modern Adequate Any
New opportunities for concurrency tools in Crystal.
Large empty lots spacious directories available to build your dream home algorithm!
Space is filling up at (24k code bytes / 2 months) 0.004 bytes per second. Register your PR today!
©️ Real estate marketing association
Inspired by Erlang, Clojure, Scala, Haskell, F#, C#, Java, and classic concurrency patterns which inspired Ruby, which inspired this library.
Available classes:
- Concurrent::Enumerable
- Concurrent::Channel
- Concurrent::CountDownLatch
- Concurrent::CyclicBarrier
- Concurrent::Semaphore
TODO:
- [ ] Change Enumerable/Channel in to generic stream processing.
- [ ] Enumerable/Channel custom error handling.
More algorithms are coming. Contributions welcome.
Installation
-
Add the dependency to your
shard.yml
:dependencies: concurrent: github: didactic-drunk/concurrent.cr
-
Run
shards install
Usage
Parallel map (experimental)
require "concurrent/enumerable"
(1..50).parallel.select(&.even?).map { |n| n + 1 }.serial.sum
^ ^ ^ Results joined.
| | Spawns separate fiber pool
| Spawns fiber pool
Batches
(1..50).parallel.map { |n|
# Parallel processing in a fiber pool
Choose::A::ORM.new(id: n)
}.batch(10).run { |array_of_records|
# Run 10 Inserts inside a transaction for faster db writes
# Real applications should choose ~~~100-100000 depending on the database, schema, data & hardware
ORM.transaction { array_of_records.each &.save! }
}.wait
Stream processing from a Channel
(experimental).
require "concurrent/channel"
# Same interface and restrictions as concurrent/enumerable.
ch = Channel(Int32).new
spawn do
10.times { |i| ch.send 1 }
ch.close
end
# map is processed in a Fiber pool.
# All other fibers will shut down after all messages are processed.
# Any errors in processing are raised here.
ch.parallel.map { |n| n + 1 }.serial.sum
Open ended stream processing aka simplified fiber pools (experimental)
require "concurrent/channel"
# Same interface and restrictions as concurrent/enumerable.
ch = Channel(Int32).new
# Messages may be processed in parallel within each `tee` and `run`.
# Make sure to use immutable objects or concurrency safe data structures.
run = ch.parallel.tee { |n| Log.info { "n=#{n}" } }.batch(2).run { |n| p n }
10.times { |i| ch.send 1 }
ch.close
# Wait until all messages/errors are processed.
run.wait
Stream error handling
ary = (1..10).to_a.parallel.select { |i|
raise "select error" if i == 2
true
}.parallel.map { |i|
raise "map error" if i.even?
i.to_s
# All errors in prior blocks handled here
}.errors { |ex, obj|
puts "#{obj} #{ex}"
}.map { |s|
s.to_i
}.to_a
p ary => [1, 3, 5, 7]
CountDownLatch
require "concurrent/count_down_latch"
fiber_count = 10
latch = Concurrent::CountDownLatch.new
10.times do
spawn do
# Do work
latch.count_down
end
end
latch.wait_count = fiber_count
latch.wait
Semaphore
require "concurrent/semaphore"
sem = Concurrent::Semaphore.new n
# spawn a lot of fibers
2000.times do
spawn do
sem.acquire do
...
end
end
end
Development
TODO: Write development instructions here
Contributing
- Fork it (https://github.com/didactic-drunk/concurrent.cr/fork)
- Install a formatting check git hook (ln -sf ../../scripts/git/pre-commit .git/hooks)
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create a new Pull Request
Contributors
- Click or Run
git shortlog --summary --numbered --email