Revert "Something here is messed up Revert "message bus fixes and diagnostics""
This reverts commit f3c6144e3b
.
This commit is contained in:
parent
457229b133
commit
d3f911cc4c
|
@ -20,6 +20,7 @@ module Discourse
|
|||
# -- all .rb files in that directory are automatically loaded.
|
||||
|
||||
require 'discourse'
|
||||
require 'message_bus_diags'
|
||||
|
||||
# Custom directories with classes and modules you want to be autoloadable.
|
||||
config.autoload_paths += %W(#{config.root}/app/serializers)
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
class MessageBusDiags
|
||||
|
||||
@host_info = {}
|
||||
|
||||
def self.my_id
|
||||
@my_id ||= "#{`hostname`}-#{Process.pid}"
|
||||
end
|
||||
|
||||
def self.seen_host(name)
|
||||
@host_info[name] = DateTime.now
|
||||
end
|
||||
|
||||
def self.establish_peer_names
|
||||
MessageBus.publish "/server-name", {channel: "/server-name-reply/#{my_id}"}
|
||||
end
|
||||
|
||||
def self.seen_hosts
|
||||
@host_info
|
||||
end
|
||||
|
||||
unless @subscribed
|
||||
|
||||
MessageBus.subscribe "/server-name-reply/#{my_id}" do |msg|
|
||||
MessageBusDiags.seen_host(msg.data)
|
||||
end
|
||||
|
||||
MessageBus.subscribe "/server-name" do |msg|
|
||||
MessageBus.publish msg.data["channel"], MessageBusDiags.my_id
|
||||
end
|
||||
@subscribed = true
|
||||
end
|
||||
end
|
|
@ -10,6 +10,31 @@ require 'redis'
|
|||
|
||||
class MessageBus::ReliablePubSub
|
||||
|
||||
class NoMoreRetries < StandardError; end
|
||||
class BackLogOutOfOrder < StandardError
|
||||
attr_accessor :highest_id
|
||||
|
||||
def initialize(highest_id)
|
||||
@highest_id = highest_id
|
||||
end
|
||||
end
|
||||
|
||||
def max_publish_retries=(val)
|
||||
@max_publish_retries = val
|
||||
end
|
||||
|
||||
def max_publish_retries
|
||||
@max_publish_retries ||= 10
|
||||
end
|
||||
|
||||
def max_publish_wait=(ms)
|
||||
@max_publish_wait = ms
|
||||
end
|
||||
|
||||
def max_publish_wait
|
||||
@max_publish_wait ||= 500
|
||||
end
|
||||
|
||||
# max_backlog_size is per multiplexed channel
|
||||
def initialize(redis_config = {}, max_backlog_size = 1000)
|
||||
@redis_config = redis_config
|
||||
|
@ -42,14 +67,14 @@ class MessageBus::ReliablePubSub
|
|||
@pub_redis ||= new_redis_connection
|
||||
end
|
||||
|
||||
def offset_key(channel)
|
||||
"__mb_offset_#{channel}"
|
||||
end
|
||||
|
||||
def backlog_key(channel)
|
||||
"__mb_backlog_#{channel}"
|
||||
end
|
||||
|
||||
def backlog_id_key(channel)
|
||||
"__mb_backlog_id_#{channel}"
|
||||
end
|
||||
|
||||
def global_id_key
|
||||
"__mb_global_id"
|
||||
end
|
||||
|
@ -58,10 +83,6 @@ class MessageBus::ReliablePubSub
|
|||
"__mb_global_backlog"
|
||||
end
|
||||
|
||||
def global_offset_key
|
||||
"__mb_global_offset"
|
||||
end
|
||||
|
||||
# use with extreme care, will nuke all of the data
|
||||
def reset!
|
||||
pub_redis.keys("__mb_*").each do |k|
|
||||
|
@ -71,74 +92,49 @@ class MessageBus::ReliablePubSub
|
|||
|
||||
def publish(channel, data)
|
||||
redis = pub_redis
|
||||
offset_key = offset_key(channel)
|
||||
backlog_id_key = backlog_id_key(channel)
|
||||
backlog_key = backlog_key(channel)
|
||||
|
||||
redis.watch(offset_key, backlog_key, global_id_key, global_backlog_key, global_offset_key) do
|
||||
offset = redis.get(offset_key).to_i
|
||||
backlog = redis.llen(backlog_key).to_i
|
||||
global_id = nil
|
||||
backlog_id = nil
|
||||
|
||||
global_offset = redis.get(global_offset_key).to_i
|
||||
global_backlog = redis.llen(global_backlog_key).to_i
|
||||
|
||||
global_id = redis.get(global_id_key).to_i
|
||||
global_id += 1
|
||||
|
||||
too_big = backlog + 1 > @max_backlog_size
|
||||
global_too_big = global_backlog + 1 > @max_global_backlog_size
|
||||
|
||||
message_id = backlog + offset + 1
|
||||
redis.multi do
|
||||
if too_big
|
||||
redis.ltrim backlog_key, (backlog+1) - @max_backlog_size, -1
|
||||
offset += (backlog+1) - @max_backlog_size
|
||||
redis.set(offset_key, offset)
|
||||
end
|
||||
|
||||
if global_too_big
|
||||
redis.ltrim global_backlog_key, (global_backlog+1) - @max_global_backlog_size, -1
|
||||
global_offset += (global_backlog+1) - @max_global_backlog_size
|
||||
redis.set(global_offset_key, global_offset)
|
||||
end
|
||||
|
||||
msg = MessageBus::Message.new global_id, message_id, channel, data
|
||||
payload = msg.encode
|
||||
|
||||
redis.set global_id_key, global_id
|
||||
redis.rpush backlog_key, payload
|
||||
redis.rpush global_backlog_key, message_id.to_s << "|" << channel
|
||||
redis.publish redis_channel_name, payload
|
||||
end
|
||||
|
||||
return message_id
|
||||
redis.multi do |m|
|
||||
global_id = m.incr(global_id_key)
|
||||
backlog_id = m.incr(backlog_id_key)
|
||||
end
|
||||
|
||||
global_id = global_id.value
|
||||
backlog_id = backlog_id.value
|
||||
|
||||
msg = MessageBus::Message.new global_id, backlog_id, channel, data
|
||||
payload = msg.encode
|
||||
|
||||
redis.zadd backlog_key, backlog_id, payload
|
||||
redis.zadd global_backlog_key, global_id, backlog_id.to_s << "|" << channel
|
||||
|
||||
redis.publish redis_channel_name, payload
|
||||
|
||||
if backlog_id > @max_backlog_size
|
||||
redis.zremrangebyscore backlog_key, 1, backlog_id - @max_backlog_size
|
||||
end
|
||||
|
||||
if global_id > @max_global_backlog_size
|
||||
redis.zremrangebyscore global_backlog_key, 1, backlog_id - @max_backlog_size
|
||||
end
|
||||
|
||||
backlog_id
|
||||
end
|
||||
|
||||
def last_id(channel)
|
||||
redis = pub_redis
|
||||
offset_key = offset_key(channel)
|
||||
backlog_key = backlog_key(channel)
|
||||
|
||||
offset,len = nil
|
||||
redis.watch offset_key, backlog_key do
|
||||
offset = redis.get(offset_key).to_i
|
||||
len = redis.llen backlog_key
|
||||
end
|
||||
offset + len
|
||||
backlog_id_key = backlog_id_key(channel)
|
||||
redis.get(backlog_id_key).to_i
|
||||
end
|
||||
|
||||
def backlog(channel, last_id = nil)
|
||||
redis = pub_redis
|
||||
offset_key = offset_key(channel)
|
||||
backlog_key = backlog_key(channel)
|
||||
|
||||
items = nil
|
||||
|
||||
redis.watch offset_key, backlog_key do
|
||||
offset = redis.get(offset_key).to_i
|
||||
start_at = last_id.to_i - offset
|
||||
items = redis.lrange backlog_key, start_at, -1
|
||||
end
|
||||
items = redis.zrangebyscore backlog_key, last_id.to_i + 1, "+inf"
|
||||
|
||||
items.map do |i|
|
||||
MessageBus::Message.decode(i)
|
||||
|
@ -147,14 +143,9 @@ class MessageBus::ReliablePubSub
|
|||
|
||||
def global_backlog(last_id = nil)
|
||||
last_id = last_id.to_i
|
||||
items = nil
|
||||
redis = pub_redis
|
||||
|
||||
redis.watch global_backlog_key, global_offset_key do
|
||||
offset = redis.get(global_offset_key).to_i
|
||||
start_at = last_id.to_i - offset
|
||||
items = redis.lrange global_backlog_key, start_at, -1
|
||||
end
|
||||
items = redis.zrangebyscore global_backlog_key, last_id.to_i + 1, "+inf"
|
||||
|
||||
items.map! do |i|
|
||||
pipe = i.index "|"
|
||||
|
@ -165,27 +156,19 @@ class MessageBus::ReliablePubSub
|
|||
end
|
||||
|
||||
items.compact!
|
||||
|
||||
items
|
||||
end
|
||||
|
||||
def get_message(channel, message_id)
|
||||
redis = pub_redis
|
||||
offset_key = offset_key(channel)
|
||||
backlog_key = backlog_key(channel)
|
||||
|
||||
msg = nil
|
||||
redis.watch(offset_key, backlog_key) do
|
||||
offset = redis.get(offset_key).to_i
|
||||
idx = (message_id-1) - offset
|
||||
return nil if idx < 0
|
||||
msg = redis.lindex(backlog_key, idx)
|
||||
items = redis.zrangebyscore backlog_key, message_id, message_id
|
||||
if items && items[0]
|
||||
MessageBus::Message.decode(items[0])
|
||||
else
|
||||
nil
|
||||
end
|
||||
|
||||
if msg
|
||||
msg = MessageBus::Message.decode(msg)
|
||||
end
|
||||
msg
|
||||
end
|
||||
|
||||
def subscribe(channel, last_id = nil)
|
||||
|
@ -193,22 +176,53 @@ class MessageBus::ReliablePubSub
|
|||
# can cut down on connections if we only have one global subscriber
|
||||
raise ArgumentError unless block_given?
|
||||
|
||||
if last_id
|
||||
# we need to translate this to a global id, at least give it a shot
|
||||
# we are subscribing on global and global is always going to be bigger than local
|
||||
# so worst case is a replay of a few messages
|
||||
message = get_message(channel, last_id)
|
||||
if message
|
||||
last_id = message.global_id
|
||||
end
|
||||
end
|
||||
global_subscribe(last_id) do |m|
|
||||
yield m if m.channel == channel
|
||||
end
|
||||
end
|
||||
|
||||
def process_global_backlog(highest_id, raise_error, &blk)
|
||||
global_backlog(highest_id).each do |old|
|
||||
if highest_id + 1 == old.global_id
|
||||
yield old
|
||||
highest_id = old.global_id
|
||||
else
|
||||
raise BackLogOutOfOrder.new(highest_id) if raise_error
|
||||
if old.global_id > highest_id
|
||||
yield old
|
||||
highest_id = old.global_id
|
||||
end
|
||||
end
|
||||
end
|
||||
highest_id
|
||||
end
|
||||
|
||||
def global_subscribe(last_id=nil, &blk)
|
||||
raise ArgumentError unless block_given?
|
||||
highest_id = last_id
|
||||
|
||||
clear_backlog = lambda do
|
||||
global_backlog(highest_id).each do |old|
|
||||
highest_id = old.global_id
|
||||
yield old
|
||||
retries = 4
|
||||
begin
|
||||
highest_id = process_global_backlog(highest_id, retries > 0, &blk)
|
||||
rescue BackLogOutOfOrder => e
|
||||
highest_id = e.highest_id
|
||||
retries -= 1
|
||||
sleep(rand(50) / 1000.0)
|
||||
retry
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
begin
|
||||
redis = new_redis_connection
|
||||
|
||||
|
@ -224,11 +238,18 @@ class MessageBus::ReliablePubSub
|
|||
end
|
||||
on.message do |c,m|
|
||||
m = MessageBus::Message.decode m
|
||||
if highest_id && m.global_id != highest_id + 1
|
||||
|
||||
# we have 2 options
|
||||
#
|
||||
# 1. message came in the correct order GREAT, just deal with it
|
||||
# 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog
|
||||
|
||||
if highest_id.nil? || m.global_id == highest_id + 1
|
||||
highest_id = m.global_id
|
||||
yield m
|
||||
else
|
||||
clear_backlog.call(&blk)
|
||||
end
|
||||
yield m if highest_id.nil? || m.global_id > highest_id
|
||||
highest_id = m.global_id
|
||||
end
|
||||
end
|
||||
rescue => error
|
||||
|
@ -238,5 +259,4 @@ class MessageBus::ReliablePubSub
|
|||
end
|
||||
end
|
||||
|
||||
|
||||
end
|
||||
|
|
|
@ -70,8 +70,6 @@ describe MessageBus do
|
|||
|
||||
r = MessageBus.backlog("/chuck", id)
|
||||
|
||||
wait_for(1000) { r.length == 2 }
|
||||
|
||||
r.map{|i| i.data}.to_a.should == ['foo', 'bar']
|
||||
end
|
||||
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
require 'spec_helper'
|
||||
require 'message_bus'
|
||||
|
||||
describe MessageBus::ReliablePubSub do
|
||||
|
||||
def new_bus
|
||||
MessageBus::ReliablePubSub.new(:db => 10)
|
||||
end
|
||||
|
||||
def work_it
|
||||
Signal.trap("HUP") { exit }
|
||||
|
||||
bus = new_bus
|
||||
$stdout.reopen("/dev/null", "w")
|
||||
$stderr.reopen("/dev/null", "w")
|
||||
# subscribe blocks, so we need a new bus to transmit
|
||||
new_bus.subscribe("/echo", 0) do |msg|
|
||||
bus.publish("/response", Process.pid.to_s)
|
||||
end
|
||||
end
|
||||
|
||||
def spawn_child
|
||||
r = fork
|
||||
if r.nil?
|
||||
work_it
|
||||
else
|
||||
r
|
||||
end
|
||||
end
|
||||
|
||||
it 'gets every response from child processes' do
|
||||
pid = nil
|
||||
Redis.new(:db => 10).flushall
|
||||
begin
|
||||
pids = (1..10).map{spawn_child}
|
||||
responses = []
|
||||
bus = MessageBus::ReliablePubSub.new(:db => 10)
|
||||
Thread.new do
|
||||
bus.subscribe("/response", 0) do |msg|
|
||||
responses << msg if pids.include? msg.data.to_i
|
||||
end
|
||||
end
|
||||
10.times{bus.publish("/echo", Process.pid.to_s)}
|
||||
wait_for 4000 do
|
||||
responses.count == 100
|
||||
end
|
||||
|
||||
# p responses.group_by(&:data).map{|k,v|[k, v.count]}
|
||||
# p responses.group_by(&:global_id).map{|k,v|[k, v.count]}
|
||||
responses.count.should == 100
|
||||
ensure
|
||||
if pids
|
||||
pids.each do |pid|
|
||||
Process.kill("HUP", pid)
|
||||
Process.wait(pid)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -101,8 +101,8 @@ describe MessageBus::ReliablePubSub do
|
|||
end
|
||||
|
||||
t.kill
|
||||
got.length.should == 3
|
||||
|
||||
got.length.should == 3
|
||||
got.map{|m| m.data}.should == ["1","2","3"]
|
||||
end
|
||||
|
||||
|
|
Loading…
Reference in New Issue