/usr/lib/ruby/vendor_ruby/celluloid/supervision/container/pool.rb is in ruby-celluloid-pool 0.20.5-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 | module Celluloid
module Supervision
class Container
# Manages a fixed-size pool of actors
# Delegates work (i.e. methods) and supervises actors
# Don't use this class directly. Instead use MyKlass.pool
class Pool
include Celluloid
trap_exit :__crash_handler__
finalizer :__shutdown__
attr_reader :size, :actors
def initialize(options={})
@idle = []
@busy = []
@klass = options[:actors]
@actors = Set.new
@mutex = Mutex.new
@size = options[:size] || [Celluloid.cores || 2, 2].max
@args = options[:args] ? Array(options[:args]) : []
# Do this last since it can suspend and/or crash
@idle = @size.times.map { __spawn_actor__ }
end
def __shutdown__
return unless defined?(@actors) && @actors
# TODO: these can be nil if initializer crashes
terminators = @actors.map do |actor|
begin
actor.future(:terminate)
rescue DeadActorError
end
end
terminators.compact.each { |terminator| terminator.value rescue nil }
end
def _send_(method, *args, &block)
actor = __provision_actor__
begin
actor._send_ method, *args, &block
rescue DeadActorError # if we get a dead actor out of the pool
wait :respawn_complete
actor = __provision_actor__
retry
rescue ::Exception => ex
abort ex
ensure
if actor.alive?
@idle << actor
@busy.delete actor
# Broadcast that actor is done processing and
# waiting idle
signal :actor_idle
end
end
end
def name
_send_ @mailbox, :name
end
def is_a?(klass)
_send_ :is_a?, klass
end
def kind_of?(klass)
_send_ :kind_of?, klass
end
def methods(include_ancestors = true)
_send_ :methods, include_ancestors
end
def to_s
_send_ :to_s
end
def inspect
_send_ :inspect
end
def size=(new_size)
new_size = [0, new_size].max
if new_size > size
delta = new_size - size
delta.times { @idle << __spawn_actor__ }
else
(size - new_size).times do
actor = __provision_actor__
unlink actor
@busy.delete actor
@actors.delete actor
actor.terminate
end
end
@size = new_size
end
def busy_size
@mutex.synchronize { @busy.length }
end
def idle_size
@mutex.synchronize { @idle.length }
end
def __idle?(actor)
@mutex.synchronize { @idle.include? actor }
end
def __busy?(actor)
@mutex.synchronize { @busy.include? actor }
end
def __busy
@mutex.synchronize { @busy }
end
def __idle
@mutex.synchronize { @idle }
end
def __state(actor)
return :busy if __busy?(actor)
return :idle if __idle?(actor)
:missing
end
# Instantiate an actor, add it to the actor Set, and return it
def __spawn_actor__
actor = @klass.new_link(*@args)
@mutex.synchronize { @actors.add(actor) }
@actors.add(actor)
actor
end
# Provision a new actor ( take it out of idle, move it into busy, and avail it )
def __provision_actor__
Task.current.guard_warnings = true
@mutex.synchronize do
while @idle.empty?
# Wait for responses from one of the busy actors
response = exclusive { receive { |msg| msg.is_a?(Internals::Response) } }
Thread.current[:celluloid_actor].handle_message(response)
end
actor = @idle.shift
@busy << actor
actor
end
end
# Spawn a new worker for every crashed one
def __crash_handler__(actor, reason)
@busy.delete actor
@idle.delete actor
@actors.delete actor
return unless reason
@idle << __spawn_actor__
signal :respawn_complete
end
def respond_to?(meth, include_private = false)
# NOTE: use method() here since this class
# shouldn't be used directly, and method() is less
# likely to be "reimplemented" inconsistently
# with other Object.*method* methods.
found = method(meth)
if include_private
found ? true : false
else
if found.is_a?(UnboundMethod)
found.owner.public_instance_methods.include?(meth) ||
found.owner.protected_instance_methods.include?(meth)
else
found.receiver.public_methods.include?(meth) ||
found.receiver.protected_methods.include?(meth)
end
end
rescue NameError
false
end
def method_missing(method, *args, &block)
if respond_to?(method)
_send_ method, *args, &block
else
super
end
end
# Since Pool allocates worker objects only just before calling them,
# we can still help Celluloid::Call detect passing invalid parameters to
# async methods by checking for those methods on the worker class
def method(meth)
super
rescue NameError
@klass.instance_method(meth.to_sym)
end
end
end
end
end
|