Something here is messed up Revert "message bus fixes and diagnostics"

This reverts commit 36d1aafe1e.
This commit is contained in:
Sam Saffron 2013-02-08 21:39:38 +11:00
parent 616a1f4906
commit f3c6144e3b
6 changed files with 93 additions and 204 deletions

View File

@ -20,7 +20,6 @@ module Discourse
# -- all .rb files in that directory are automatically loaded.
require 'discourse'
require 'message_bus_diags'
# Custom directories with classes and modules you want to be autoloadable.
config.autoload_paths += %W(#{config.root}/app/serializers)

View File

@ -1,32 +0,0 @@
class MessageBusDiags
@host_info = {}
def self.my_id
@my_id ||= "#{`hostname`}-#{Process.pid}"
end
def self.seen_host(name)
@host_info[name] = DateTime.now
end
def self.establish_peer_names
MessageBus.publish "/server-name", {channel: "/server-name-reply/#{my_id}"}
end
def self.seen_hosts
@host_info
end
unless @subscribed
MessageBus.subscribe "/server-name-reply/#{my_id}" do |msg|
MessageBusDiags.seen_host(msg.data)
end
MessageBus.subscribe "/server-name" do |msg|
MessageBus.publish msg.data["channel"], MessageBusDiags.my_id
end
@subscribed = true
end
end

View File

@ -10,31 +10,6 @@ require 'redis'
class MessageBus::ReliablePubSub
class NoMoreRetries < StandardError; end
class BackLogOutOfOrder < StandardError
attr_accessor :highest_id
def initialize(highest_id)
@highest_id = highest_id
end
end
def max_publish_retries=(val)
@max_publish_retries = val
end
def max_publish_retries
@max_publish_retries ||= 10
end
def max_publish_wait=(ms)
@max_publish_wait = ms
end
def max_publish_wait
@max_publish_wait ||= 500
end
# max_backlog_size is per multiplexed channel
def initialize(redis_config = {}, max_backlog_size = 1000)
@redis_config = redis_config
@ -67,12 +42,12 @@ class MessageBus::ReliablePubSub
@pub_redis ||= new_redis_connection
end
def backlog_key(channel)
"__mb_backlog_#{channel}"
def offset_key(channel)
"__mb_offset_#{channel}"
end
def backlog_id_key(channel)
"__mb_backlog_id_#{channel}"
def backlog_key(channel)
"__mb_backlog_#{channel}"
end
def global_id_key
@ -82,6 +57,10 @@ class MessageBus::ReliablePubSub
def global_backlog_key
"__mb_global_backlog"
end
def global_offset_key
"__mb_global_offset"
end
# use with extreme care, will nuke all of the data
def reset!
@ -92,49 +71,74 @@ class MessageBus::ReliablePubSub
def publish(channel, data)
redis = pub_redis
backlog_id_key = backlog_id_key(channel)
offset_key = offset_key(channel)
backlog_key = backlog_key(channel)
global_id = nil
backlog_id = nil
redis.watch(offset_key, backlog_key, global_id_key, global_backlog_key, global_offset_key) do
offset = redis.get(offset_key).to_i
backlog = redis.llen(backlog_key).to_i
redis.multi do |m|
global_id = m.incr(global_id_key)
backlog_id = m.incr(backlog_id_key)
global_offset = redis.get(global_offset_key).to_i
global_backlog = redis.llen(global_backlog_key).to_i
global_id = redis.get(global_id_key).to_i
global_id += 1
too_big = backlog + 1 > @max_backlog_size
global_too_big = global_backlog + 1 > @max_global_backlog_size
message_id = backlog + offset + 1
redis.multi do
if too_big
redis.ltrim backlog_key, (backlog+1) - @max_backlog_size, -1
offset += (backlog+1) - @max_backlog_size
redis.set(offset_key, offset)
end
if global_too_big
redis.ltrim global_backlog_key, (global_backlog+1) - @max_global_backlog_size, -1
global_offset += (global_backlog+1) - @max_global_backlog_size
redis.set(global_offset_key, global_offset)
end
msg = MessageBus::Message.new global_id, message_id, channel, data
payload = msg.encode
redis.set global_id_key, global_id
redis.rpush backlog_key, payload
redis.rpush global_backlog_key, message_id.to_s << "|" << channel
redis.publish redis_channel_name, payload
end
return message_id
end
global_id = global_id.value
backlog_id = backlog_id.value
msg = MessageBus::Message.new global_id, backlog_id, channel, data
payload = msg.encode
redis.zadd backlog_key, backlog_id, payload
redis.zadd global_backlog_key, global_id, backlog_id.to_s << "|" << channel
redis.publish redis_channel_name, payload
if backlog_id > @max_backlog_size
redis.zremrangebyscore backlog_key, 1, backlog_id - @max_backlog_size
end
if global_id > @max_global_backlog_size
redis.zremrangebyscore global_backlog_key, 1, backlog_id - @max_backlog_size
end
backlog_id
end
def last_id(channel)
redis = pub_redis
backlog_id_key = backlog_id_key(channel)
redis.get(backlog_id_key).to_i
offset_key = offset_key(channel)
backlog_key = backlog_key(channel)
offset,len = nil
redis.watch offset_key, backlog_key do
offset = redis.get(offset_key).to_i
len = redis.llen backlog_key
end
offset + len
end
def backlog(channel, last_id = nil)
redis = pub_redis
offset_key = offset_key(channel)
backlog_key = backlog_key(channel)
items = redis.zrangebyscore backlog_key, last_id.to_i + 1, "+inf"
items = nil
redis.watch offset_key, backlog_key do
offset = redis.get(offset_key).to_i
start_at = last_id.to_i - offset
items = redis.lrange backlog_key, start_at, -1
end
items.map do |i|
MessageBus::Message.decode(i)
@ -143,9 +147,14 @@ class MessageBus::ReliablePubSub
def global_backlog(last_id = nil)
last_id = last_id.to_i
items = nil
redis = pub_redis
items = redis.zrangebyscore global_backlog_key, last_id.to_i + 1, "+inf"
redis.watch global_backlog_key, global_offset_key do
offset = redis.get(global_offset_key).to_i
start_at = last_id.to_i - offset
items = redis.lrange global_backlog_key, start_at, -1
end
items.map! do |i|
pipe = i.index "|"
@ -156,19 +165,27 @@ class MessageBus::ReliablePubSub
end
items.compact!
items
end
def get_message(channel, message_id)
redis = pub_redis
offset_key = offset_key(channel)
backlog_key = backlog_key(channel)
items = redis.zrangebyscore backlog_key, message_id, message_id
if items && items[0]
MessageBus::Message.decode(items[0])
else
nil
msg = nil
redis.watch(offset_key, backlog_key) do
offset = redis.get(offset_key).to_i
idx = (message_id-1) - offset
return nil if idx < 0
msg = redis.lindex(backlog_key, idx)
end
if msg
msg = MessageBus::Message.decode(msg)
end
msg
end
def subscribe(channel, last_id = nil)
@ -176,53 +193,22 @@ class MessageBus::ReliablePubSub
# can cut down on connections if we only have one global subscriber
raise ArgumentError unless block_given?
if last_id
# we need to translate this to a global id, at least give it a shot
# we are subscribing on global and global is always going to be bigger than local
# so worst case is a replay of a few messages
message = get_message(channel, last_id)
if message
last_id = message.global_id
end
end
global_subscribe(last_id) do |m|
yield m if m.channel == channel
end
end
def process_global_backlog(highest_id, raise_error, &blk)
global_backlog(highest_id).each do |old|
if highest_id + 1 == old.global_id
yield old
highest_id = old.global_id
else
raise BackLogOutOfOrder.new(highest_id) if raise_error
if old.global_id > highest_id
yield old
highest_id = old.global_id
end
end
end
highest_id
end
def global_subscribe(last_id=nil, &blk)
raise ArgumentError unless block_given?
highest_id = last_id
clear_backlog = lambda do
retries = 4
begin
highest_id = process_global_backlog(highest_id, retries > 0, &blk)
rescue BackLogOutOfOrder => e
highest_id = e.highest_id
retries -= 1
sleep(rand(50) / 1000.0)
retry
clear_backlog = lambda do
global_backlog(highest_id).each do |old|
highest_id = old.global_id
yield old
end
end
begin
redis = new_redis_connection
@ -238,18 +224,11 @@ class MessageBus::ReliablePubSub
end
on.message do |c,m|
m = MessageBus::Message.decode m
# we have 2 options
#
# 1. message came in the correct order GREAT, just deal with it
# 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog
if highest_id.nil? || m.global_id == highest_id + 1
highest_id = m.global_id
yield m
else
if highest_id && m.global_id != highest_id + 1
clear_backlog.call(&blk)
end
yield m if highest_id.nil? || m.global_id > highest_id
highest_id = m.global_id
end
end
rescue => error
@ -259,4 +238,5 @@ class MessageBus::ReliablePubSub
end
end
end

View File

@ -70,6 +70,8 @@ describe MessageBus do
r = MessageBus.backlog("/chuck", id)
wait_for(1000) { r.length == 2 }
r.map{|i| i.data}.to_a.should == ['foo', 'bar']
end

View File

@ -1,60 +0,0 @@
require 'spec_helper'
require 'message_bus'
describe MessageBus::ReliablePubSub do
def new_bus
MessageBus::ReliablePubSub.new(:db => 10)
end
def work_it
Signal.trap("HUP") { exit }
bus = new_bus
$stdout.reopen("/dev/null", "w")
$stderr.reopen("/dev/null", "w")
# subscribe blocks, so we need a new bus to transmit
new_bus.subscribe("/echo", 0) do |msg|
bus.publish("/response", Process.pid.to_s)
end
end
def spawn_child
r = fork
if r.nil?
work_it
else
r
end
end
it 'gets every response from child processes' do
pid = nil
Redis.new(:db => 10).flushall
begin
pids = (1..10).map{spawn_child}
responses = []
bus = MessageBus::ReliablePubSub.new(:db => 10)
Thread.new do
bus.subscribe("/response", 0) do |msg|
responses << msg if pids.include? msg.data.to_i
end
end
10.times{bus.publish("/echo", Process.pid.to_s)}
wait_for 4000 do
responses.count == 100
end
# p responses.group_by(&:data).map{|k,v|[k, v.count]}
# p responses.group_by(&:global_id).map{|k,v|[k, v.count]}
responses.count.should == 100
ensure
if pids
pids.each do |pid|
Process.kill("HUP", pid)
Process.wait(pid)
end
end
end
end
end

View File

@ -101,8 +101,8 @@ describe MessageBus::ReliablePubSub do
end
t.kill
got.length.should == 3
got.map{|m| m.data}.should == ["1","2","3"]
end