@@ -7,7 +7,7 @@ FactoryMethods. They are not designed for inheritance but rather for
7
7
composition.
8
8
9
9
``` ruby
10
- Concurrent ::Promises ::FactoryMethods .instance_methods false
10
+ Concurrent ::Promises ::FactoryMethods .instance_methods
11
11
```
12
12
13
13
The module can be included or extended where needed.
@@ -438,6 +438,27 @@ future.fulfill 1 rescue $!
438
438
future.fulfill 2 , false
439
439
```
440
440
441
+ ## How are promises executed?
442
+
443
+ Promises use global pools to execute the tasks. Therefore each task may run on
444
+ different thread which implies that users have to be careful not to depend on
445
+ Thread local variables (or they have to set at the begging of the task and
446
+ cleaned up at the end of the task).
447
+
448
+ Since the tasks are running on may different threads of the thread pool, it's
449
+ better to follow following rules:
450
+
451
+ - Use only data passed in through arguments or values of parent futures, to
452
+ have better control over what are futures accessing.
453
+ - The data passed in and out of futures are easier to deal with if they are
454
+ immutable or at least treated as such.
455
+ - Any mutable and mutated object accessed by more than one threads or futures
456
+ must be thread safe, see {Concurrent::Array}, {Concurrent::Hash}, and
457
+ {Concurrent::Map}. (Value of a future may be consumed by many futures.)
458
+ - Futures can access outside objects, but they has to be thread-safe.
459
+
460
+ > * TODO: This part to be extended*
461
+
441
462
# Advanced
442
463
443
464
## Callbacks
@@ -470,6 +491,25 @@ Promises.future_on(:fast) { 2 }.
470
491
value.size
471
492
```
472
493
494
+ ## Run (simulated process)
495
+
496
+ Similar to flatting is running. When ` run ` is called on a future it will flat
497
+ indefinitely as long the future fulfils into a ` Future ` value. It can be used
498
+ to simulate a thread like processing without actually occupying the thread.
499
+
500
+ ``` ruby
501
+ count = lambda do |v |
502
+ v += 1
503
+ v < 5 ? Promises .future_on(:fast , v, & count) : v
504
+ end
505
+ 400 .times.
506
+ map { Promises .future_on(:fast , 0 , & count).run.value! }.
507
+ all? { |v | v == 5 }
508
+ ```
509
+
510
+ Therefore the above example finished fine on the the ` :fast ` thread pool even
511
+ though it has much less threads than there is the simulated process.
512
+
473
513
# Interoperability
474
514
475
515
## Actors
@@ -500,10 +540,47 @@ The `ask` method returns future.
500
540
``` ruby
501
541
actor.ask(2 ).then(& :succ ).value!
502
542
```
543
+ ## ProcessingActor
544
+
545
+ > * TODO: Documentation to be added in few days*
546
+
547
+ ## Channel
548
+
549
+ There is an implementation of channel as well. Lets start by creating a
550
+ channel with capacity 2 messages.
551
+
552
+ ``` ruby
553
+ ch1 = Concurrent ::Promises ::Channel .new 2
554
+ ```
503
555
504
- ## Channels
556
+ We push 3 messages, it can be observed that the last future representing the
557
+ push is not fulfilled since the capacity prevents it. When the work which fills
558
+ the channel depends on the futures created by push it can be used to create
559
+ back pressure – the filling work is delayed until the channel has space for
560
+ more messages.
505
561
506
- > * TODO: To be added*
562
+ ``` ruby
563
+ pushes = 3 .times.map { |i | ch1.push i }
564
+ ch1.pop.value!
565
+ pushes
566
+ ```
567
+
568
+ A selection over channels can be created with select_channel factory method. It
569
+ will be fulfilled with a first message available in any of the channels. It
570
+ returns a pair to be able to find out which channel had the message available.
571
+
572
+ ``` ruby
573
+ ch2 = Concurrent ::Promises ::Channel .new 2
574
+ result = Concurrent ::Promises .select_channel(ch1, ch2)
575
+ result.value!
576
+
577
+ Promises .future { 1 + 1 }.then_push_channel(ch1)
578
+ result = (
579
+ Concurrent ::Promises .fulfilled_future(' %02d' ) &
580
+ Concurrent ::Promises .select_channel(ch1, ch2)).
581
+ then { |format , (channel , value )| format format , value }
582
+ result.value!
583
+ ```
507
584
508
585
# Use-cases
509
586
@@ -573,7 +650,7 @@ results = 3.times.map { computer.ask [:run, -> { sleep 0.1; :result }] }
573
650
computer.ask(:status ).value!
574
651
results.map(& :value! )
575
652
```
576
- ## Too many threads / fibers
653
+ ## Solving the Thread count limit by thread simulation
577
654
578
655
Sometimes an application requires to process a lot of tasks concurrently. If
579
656
the number of concurrent tasks is high enough than it is not possible to create
@@ -606,7 +683,7 @@ Promises.future(0, &body).run.value! # => 5
606
683
607
684
This solution works well an any Ruby implementation.
608
685
609
- > TODO add more complete example
686
+ > * TODO: More examples to be added. *
610
687
611
688
## Cancellation
612
689
@@ -771,55 +848,116 @@ end #
771
848
futures.map(& :value! )
772
849
```
773
850
774
- ## Long stream of tasks
851
+ ## Long stream of tasks, applying back pressure
852
+
853
+ Lets assume that we queuing an API for a data and the queries can be faster
854
+ than we are able to process them. This example shows how to use channel as a
855
+ buffer and how to apply back pressure to slow down the queries.
856
+
857
+ ``` ruby
858
+ require ' json' #
859
+
860
+ channel = Promises ::Channel .new 6
861
+ source, token = Concurrent ::Cancellation .create
862
+
863
+ def query_random_text (token , channel )
864
+ Promises .future do
865
+ # for simplicity the query is omitted
866
+ # url = 'some api'
867
+ # Net::HTTP.get(URI(url))
868
+ sleep 0.1
869
+ { ' message' =>
870
+ ' Lorem ipsum rhoncus scelerisque vulputate diam inceptos'
871
+ }.to_json
872
+ end .then(token) do |value , token |
873
+ # The push to channel is fulfilled only after the message is successfully
874
+ # published to the channel, therefore it will not continue querying until
875
+ # current message is pushed.
876
+ channel.push(value) |
877
+ # It could wait on the push indefinitely if the token is not checked
878
+ # here with `or` (the pipe).
879
+ token.to_future
880
+ end .flat_future.then(token) do |_ , token |
881
+ # query again after the message is pushed to buffer
882
+ query_random_text(token, channel) unless token.canceled?
883
+ end
884
+ end
885
+
886
+ words = []
887
+ words_throttle = Concurrent ::Throttle .new 1
888
+
889
+ def count_words_in_random_text (token , channel , words , words_throttle )
890
+ channel.pop.then do |response |
891
+ string = JSON .load (response)[' message' ]
892
+ # processing is slower than querying
893
+ sleep 0.2
894
+ words_count = string.scan(/\w +/ ).size
895
+ end .then_throttled_by(words_throttle, words) do |words_count , words |
896
+ # safe since throttled to only 1 task at a time
897
+ words << words_count
898
+ end .then(token) do |_ , token |
899
+ # count words in next message
900
+ unless token.canceled?
901
+ count_words_in_random_text(token, channel, words, words_throttle)
902
+ end
903
+ end
904
+ end
905
+
906
+ query_processes = 3 .times.map do
907
+ Promises .future(token, channel, & method(:query_random_text )).run
908
+ end
909
+
910
+ word_counter_processes = 2 .times.map do
911
+ Promises .future(token, channel, words, words_throttle,
912
+ & method(:count_words_in_random_text )).run
913
+ end
914
+
915
+ sleep 0.5
916
+ ```
917
+
918
+ Let it run for a while then cancel it and ensure that the runs all fulfilled
919
+ (therefore ended) after the cancellation. Finally print the result.
775
920
776
- > TODO Channel
921
+ ``` ruby
922
+ source.cancel
923
+ query_processes.map(& :wait! )
924
+ word_counter_processes.map(& :wait! )
925
+ words
926
+ ```
777
927
778
- ## Parallel enumerable ?
928
+ Compared to using threads directly this is highly configurable and compostable
929
+ solution.
779
930
780
- > TODO
781
931
782
932
## Periodic task
783
933
784
- > TODO revisit, use cancellation, add to library
934
+ By combining ` schedule ` , ` run ` and ` Cancellation ` periodically executed task
935
+ can be easily created.
785
936
786
937
``` ruby
787
- def schedule_job (interval , & job )
788
- # schedule the first execution and chain restart of the job
789
- Promises .schedule(interval, & job).chain do |fulfilled , continue , reason |
790
- if fulfilled
791
- schedule_job(interval, & job) if continue
792
- else
793
- # handle error
794
- reason
795
- # retry sooner
796
- schedule_job(interval, & job)
797
- end
798
- end
938
+ repeating_scheduled_task = -> interval, token, task do
939
+ Promises .
940
+ # Schedule the task.
941
+ schedule(interval, token, & task).
942
+ # If successful schedule again.
943
+ # Alternatively use chain to schedule always.
944
+ then { repeating_scheduled_task.call(interval, token, task) }
799
945
end
800
946
801
- queue = Queue .new
802
- count = 0
803
- interval = 0.05 # small just not to delay execution of this example
804
-
805
- schedule_job interval do
806
- queue.push count
807
- count += 1
808
- # to continue scheduling return true, false will end the task
809
- if count < 4
810
- # to continue scheduling return true
811
- true
812
- else
813
- # close the queue with nil to simplify reading it
814
- queue.push nil
815
- # to end the task return false
816
- false
947
+ cancellation, token = Concurrent ::Cancellation .create
948
+
949
+ task = -> token do
950
+ 5 .times do
951
+ token.raise_if_canceled
952
+ # do stuff
953
+ print ' .'
954
+ sleep 0.01
817
955
end
818
956
end
819
957
820
- # read the queue
821
- arr, v = [], nil ; arr << v while (v = queue.pop) #
822
- # arr has the results from the executed scheduled tasks
823
- arr
958
+ result = Promises .future( 0.1 , token, task, & repeating_scheduled_task).run
959
+ sleep 0.2
960
+ cancellation.cancel
961
+ result.result
824
962
```
825
963
0 commit comments