@@ -1951,57 +1951,128 @@ def then_ask(actor)
1951
1951
include ActorIntegration
1952
1952
end
1953
1953
1954
- ### Experimental features follow
1954
+ class Channel < Concurrent ::Synchronization ::Object
1955
+ safe_initialization!
1955
1956
1956
- module FactoryMethods
1957
+ # Default size of the Channel, makes it accept unlimited number of messages.
1958
+ UNLIMITED = Object . new
1959
+ UNLIMITED . singleton_class . class_eval do
1960
+ include Comparable
1957
1961
1958
- # @!visibility private
1962
+ def <=>( other )
1963
+ 1
1964
+ end
1959
1965
1960
- module ChannelIntegration
1966
+ def to_s
1967
+ 'unlimited'
1968
+ end
1969
+ end
1970
+
1971
+ # A channel to pass messages between promises. The size is limited to support back pressure.
1972
+ # @param [Integer, UNLIMITED] size the maximum number of messages stored in the channel.
1973
+ def initialize ( size = UNLIMITED )
1974
+ super ( )
1975
+ @Size = size
1976
+ # TODO (pitr-ch 26-Dec-2016): replace with lock-free implementation
1977
+ @Mutex = Mutex . new
1978
+ @Probes = [ ]
1979
+ @Messages = [ ]
1980
+ @PendingPush = [ ]
1981
+ end
1961
1982
1962
- # @!visibility private
1963
1983
1964
- # only proof of concept
1965
- # @return [Future]
1966
- def select ( *channels )
1967
- # TODO (pitr-ch 26-Mar-2016): re-do, has to be non-blocking
1968
- future do
1969
- # noinspection RubyArgCount
1970
- Channel . select do |s |
1971
- channels . each do |ch |
1972
- s . take ( ch ) { |value | [ value , ch ] }
1984
+ # Returns future which will fulfill when the message is added to the channel. Its value is the message.
1985
+ # @param [Object] message
1986
+ # @return [Future]
1987
+ def push ( message )
1988
+ @Mutex . synchronize do
1989
+ while true
1990
+ if @Probes . empty?
1991
+ if @Size > @Messages . size
1992
+ @Messages . push message
1993
+ return Promises . fulfilled_future message
1994
+ else
1995
+ pushed = Promises . resolvable_future
1996
+ @PendingPush . push [ message , pushed ]
1997
+ return pushed . with_hidden_resolvable
1998
+ end
1999
+ else
2000
+ probe = @Probes . shift
2001
+ if probe . fulfill [ self , message ] , false
2002
+ return Promises . fulfilled_future ( message )
1973
2003
end
1974
2004
end
1975
2005
end
1976
2006
end
1977
2007
end
1978
2008
1979
- include ChannelIntegration
2009
+ # Returns a future witch will become fulfilled with a value from the channel when one is available.
2010
+ # @param [ResolvableFuture] probe the future which will be fulfilled with a channel value
2011
+ # @return [Future] the probe, its value will be the message when available.
2012
+ def pop ( probe = Concurrent ::Promises . resolvable_future )
2013
+ # TODO (pitr-ch 26-Dec-2016): improve performance
2014
+ pop_for_select ( probe ) . then ( &:last )
2015
+ end
2016
+
2017
+ # @!visibility private
2018
+ def pop_for_select ( probe = Concurrent ::Promises . resolvable_future )
2019
+ @Mutex . synchronize do
2020
+ if @Messages . empty?
2021
+ @Probes . push probe
2022
+ else
2023
+ message = @Messages . shift
2024
+ probe . fulfill [ self , message ]
2025
+
2026
+ unless @PendingPush . empty?
2027
+ message , pushed = @PendingPush . shift
2028
+ @Messages . push message
2029
+ pushed . fulfill message
2030
+ end
2031
+ end
2032
+ end
2033
+ probe
2034
+ end
2035
+
2036
+ # @return [String] Short string representation.
2037
+ def to_s
2038
+ format '<#%s:0x%x size:%s>' , self . class , object_id << 1 , @Size
2039
+ end
2040
+
2041
+ alias_method :inspect , :to_s
1980
2042
end
1981
2043
1982
2044
class Future < AbstractEventFuture
2045
+ module NewChannelIntegration
1983
2046
1984
- # @!visibility private
2047
+ # @param [Channel] channel to push to.
2048
+ # @return [Future] a future which is fulfilled after the message is pushed to the channel.
2049
+ # May take a moment if the channel is full.
2050
+ def then_push_channel ( channel )
2051
+ self . then { |value | channel . push value } . flat_future
2052
+ end
2053
+
2054
+ # TODO (pitr-ch 26-Dec-2016): does it make sense to have rescue an chain variants as well, check other integrations as well
2055
+ end
1985
2056
1986
- module ChannelIntegration
2057
+ include NewChannelIntegration
2058
+ end
1987
2059
1988
- # @!visibility private
2060
+ module FactoryMethods
1989
2061
1990
- # Zips with selected value form the suplied channels
1991
- # @return [Future]
1992
- def then_select ( *channels )
1993
- future = Concurrent ::Promises . select ( *channels )
1994
- ZipFuturesPromise . new_blocked_by2 ( self , future , @DefaultExecutor ) . future
1995
- end
2062
+ module NewChannelIntegration
1996
2063
1997
- # @note may block
1998
- # @note only proof of concept
1999
- def then_put ( channel )
2000
- on_fulfillment_using ( :io , channel ) { |value , channel | channel . put value }
2064
+ # Selects a channel which is ready to be read from.
2065
+ # @param [Channel] channels
2066
+ # @return [Future] a future which is fulfilled with pair [channel, message] when one of the channels is
2067
+ # available for reading
2068
+ def select_channel ( *channels )
2069
+ probe = Promises . resolvable_future
2070
+ channels . each { |ch | ch . pop_for_select probe }
2071
+ probe
2001
2072
end
2002
2073
end
2003
2074
2004
- include ChannelIntegration
2075
+ include NewChannelIntegration
2005
2076
end
2006
2077
2007
2078
end
0 commit comments