/usr/lib/ruby/vendor_ruby/sequel/connection_pool/sharded_threaded.rb is in ruby-sequel 3.33.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | Sequel.require 'connection_pool/threaded'
# The slowest and most advanced connection, dealing with both multi-threaded
# access and configurations with multiple shards/servers.
#
# In addition, this pool subclass also handles scheduling in-use connections
# to be removed from the pool when they are returned to it.
class Sequel::ShardedThreadedConnectionPool < Sequel::ThreadedConnectionPool
# The following additional options are respected:
# * :servers - A hash of servers to use. Keys should be symbols. If not
# present, will use a single :default server. The server name symbol will
# be passed to the connection_proc.
# * :servers_hash - The base hash to use for the servers. By default,
# Sequel uses Hash.new(:default). You can use a hash with a default proc
# that raises an error if you want to catch all cases where a nonexistent
# server is used.
def initialize(opts = {}, &block)
super
@available_connections = {}
@connections_to_remove = []
@servers = opts.fetch(:servers_hash, Hash.new(:default))
add_servers([:default])
add_servers(opts[:servers].keys) if opts[:servers]
end
# Adds new servers to the connection pool. Primarily used in conjunction with master/slave
# or shard configurations. Allows for dynamic expansion of the potential slaves/shards
# at runtime. servers argument should be an array of symbols.
def add_servers(servers)
sync do
servers.each do |server|
unless @servers.has_key?(server)
@servers[server] = server
@available_connections[server] = []
@allocated[server] = {}
end
end
end
end
# A hash of connections currently being used for the given server, key is the
# Thread, value is the connection. Nonexistent servers will return nil. Treat
# this as read only, do not modify the resulting object.
def allocated(server=:default)
@allocated[server]
end
# An array of connections opened but not currently used, for the given
# server. Nonexistent servers will return nil. Treat this as read only, do
# not modify the resulting object.
def available_connections(server=:default)
@available_connections[server]
end
# The total number of connections opened for the given server, should
# be equal to available_connections.length + allocated.length. Nonexistent
# servers will return the created count of the default server.
def size(server=:default)
server = @servers[server]
@allocated[server].length + @available_connections[server].length
end
# Removes all connections currently available on all servers, optionally
# yielding each connection to the given block. This method has the effect of
# disconnecting from the database, assuming that no connections are currently
# being used. If connections are being used, they are scheduled to be
# disconnected as soon as they are returned to the pool.
#
# Once a connection is requested using #hold, the connection pool
# creates new connections to the database. Options:
# * :server - Should be a symbol specifing the server to disconnect from,
# or an array of symbols to specify multiple servers.
def disconnect(opts={}, &block)
block ||= @disconnection_proc
sync do
(opts[:server] ? Array(opts[:server]) : @servers.keys).each do |s|
disconnect_server(s, &block)
end
end
end
# Chooses the first available connection to the given server, or if none are
# available, creates a new connection. Passes the connection to the supplied
# block:
#
# pool.hold {|conn| conn.execute('DROP TABLE posts')}
#
# Pool#hold is re-entrant, meaning it can be called recursively in
# the same thread without blocking.
#
# If no connection is immediately available and the pool is already using the maximum
# number of connections, Pool#hold will block until a connection
# is available or the timeout expires. If the timeout expires before a
# connection can be acquired, a Sequel::PoolTimeout is
# raised.
def hold(server=:default)
server = pick_server(server)
t = Thread.current
if conn = owned_connection(t, server)
return yield(conn)
end
begin
unless conn = acquire(t, server)
time = Time.now
timeout = time + @timeout
sleep_time = @sleep_time
sleep sleep_time
until conn = acquire(t, server)
raise(::Sequel::PoolTimeout) if Time.now > timeout
sleep sleep_time
end
end
yield conn
rescue Sequel::DatabaseDisconnectError
sync{@connections_to_remove << conn} if conn
raise
ensure
sync{release(t, conn, server)} if conn
end
end
# Remove servers from the connection pool. Primarily used in conjunction with master/slave
# or shard configurations. Similar to disconnecting from all given servers,
# except that after it is used, future requests for the server will use the
# :default server instead.
def remove_servers(servers)
sync do
raise(Sequel::Error, "cannot remove default server") if servers.include?(:default)
servers.each do |server|
if @servers.include?(server)
disconnect_server(server, &@disconnection_proc)
@available_connections.delete(server)
@allocated.delete(server)
@servers.delete(server)
end
end
end
end
# Return an array of symbols for servers in the connection pool.
def servers
sync{@servers.keys}
end
private
# Assigns a connection to the supplied thread for the given server, if one
# is available. The calling code should NOT already have the mutex when
# calling this.
def acquire(thread, server)
sync do
if conn = available(server)
allocated(server)[thread] = conn
end
end
end
# Returns an available connection to the given server. If no connection is
# available, tries to create a new connection. The calling code should already
# have the mutex before calling this.
def available(server)
available_connections(server).pop || make_new(server)
end
# Disconnect from the given server. Disconnects available connections
# immediately, and schedules currently allocated connections for disconnection
# as soon as they are returned to the pool. The calling code should already
# have the mutex before calling this.
def disconnect_server(server, &block)
if conns = available_connections(server)
conns.each{|conn| block.call(conn)} if block
conns.clear
end
@connections_to_remove.concat(allocated(server).values)
end
# Creates a new connection to the given server if the size of the pool for
# the server is less than the maximum size of the pool. The calling code
# should already have the mutex before calling this.
def make_new(server)
if (n = size(server)) >= @max_size
allocated(server).to_a.each{|t, c| release(t, c, server) unless t.alive?}
n = nil
end
default_make_new(server) if (n || size(server)) < @max_size
end
# Returns the connection owned by the supplied thread for the given server,
# if any. The calling code should NOT already have the mutex before calling this.
def owned_connection(thread, server)
sync{@allocated[server][thread]}
end
# If the server given is in the hash, return it, otherwise, return the default server.
def pick_server(server)
sync{@servers[server]}
end
# Releases the connection assigned to the supplied thread and server. If the
# server or connection given is scheduled for disconnection, remove the
# connection instead of releasing it back to the pool.
# The calling code should already have the mutex before calling this.
def release(thread, conn, server)
if @connections_to_remove.include?(conn)
remove(thread, conn, server)
else
available_connections(server) << allocated(server).delete(thread)
end
end
# Removes the currently allocated connection from the connection pool. The
# calling code should already have the mutex before calling this.
def remove(thread, conn, server)
@connections_to_remove.delete(conn)
allocated(server).delete(thread) if @servers.include?(server)
@disconnection_proc.call(conn) if @disconnection_proc
end
CONNECTION_POOL_MAP[[false, true]] = self
end
|