/usr/lib/ruby/vendor_ruby/influxdb/writer/async.rb is in ruby-influxdb 0.2.3-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | require 'thread'
require "net/http"
require "uri"
module InfluxDB
module Writer # :nodoc: all
class Async
attr_reader :config, :client
def initialize(client, config)
@client = client
@config = config
end
def write(data, _precision = nil, _retention_policy = nil)
data = data.is_a?(Array) ? data : [data]
data.map { |p| worker.push(p) }
end
WORKER_MUTEX = Mutex.new
def worker
return @worker if @worker
WORKER_MUTEX.synchronize do
# this return is necessary because the previous mutex holder
# might have already assigned the @worker
return @worker if @worker
@worker = Worker.new(client, config)
end
end
class Worker
attr_reader :client, :queue, :threads
include InfluxDB::Logging
MAX_POST_POINTS = 1000
MAX_QUEUE_SIZE = 10_000
NUM_WORKER_THREADS = 3
SLEEP_INTERVAL = 5
def initialize(client, config)
@client = client
config = config.is_a?(Hash) ? config : {}
@queue = InfluxDB::MaxQueue.new config.fetch(:max_queue, MAX_QUEUE_SIZE)
spawn_threads!
at_exit do
log :debug, "Thread exiting, flushing queue."
check_background_queue until queue.empty?
end
end
def push(payload)
queue.push(payload)
end
def current_threads
Thread.list.select { |t| t[:influxdb] == object_id }
end
def current_thread_count
Thread.list.count { |t| t[:influxdb] == object_id }
end
# rubocop:disable Metrics/CyclomaticComplexity
# rubocop:disable Metrics/MethodLength
# rubocop:disable Metrics/AbcSize
def spawn_threads!
@threads = []
NUM_WORKER_THREADS.times do |thread_num|
log :debug, "Spawning background worker thread #{thread_num}."
@threads << Thread.new do
Thread.current[:influxdb] = object_id
until client.stopped?
check_background_queue(thread_num)
sleep rand(SLEEP_INTERVAL)
end
log :debug, "Exit background worker thread #{thread_num}."
end
end
end
def check_background_queue(thread_num = 0)
log :debug,
"Checking background queue on thread #{thread_num} (#{current_thread_count} active)"
loop do
data = []
while data.size < MAX_POST_POINTS && !queue.empty?
p = queue.pop(true) rescue next
data.push p
end
return if data.empty?
begin
log :debug, "Found data in the queue! (#{data.length} points)"
client.write(data.join("\n"), nil)
rescue => e
puts "Cannot write data: #{e.inspect}"
end
break if queue.length > MAX_POST_POINTS
end
end
end
end
end
end
|