message bus extracted, wanted to make sure the tests run regularly, so the new repo has travis enabled.

new home is https://github.com/SamSaffron/message_bus

implemented group support over there with testing fixes
This commit is contained in:
Sam 2013-05-15 16:32:19 +10:00
parent 1201ba9920
commit bae2d252fa
37 changed files with 7 additions and 40589 deletions

View File

@ -9,7 +9,7 @@ gem 'barber', '0.3.0'
gem 'vestal_versions', git: 'https://github.com/zhangyuan/vestal_versions'
gem 'message_bus', path: 'vendor/gems/message_bus'
gem 'message_bus'
gem 'rails_multisite', path: 'vendor/gems/rails_multisite'
gem 'simple_handlebars_rails', path: 'vendor/gems/simple_handlebars_rails'

View File

@ -71,15 +71,6 @@ PATH
specs:
discourse_plugin (0.0.1)
PATH
remote: vendor/gems/message_bus
specs:
message_bus (0.0.1)
eventmachine
rack (>= 1.1.3)
redis
thin
PATH
remote: vendor/gems/rails_multisite
specs:
@ -251,6 +242,11 @@ GEM
i18n (>= 0.4.0)
mime-types (~> 1.16)
treetop (~> 1.4.8)
message_bus (0.0.2)
eventmachine
rack (>= 1.1.3)
redis
thin
metaclass (0.0.1)
method_source (0.8.1)
mime-types (1.23)
@ -485,7 +481,7 @@ DEPENDENCIES
librarian (>= 0.0.25)
listen
lru_redux
message_bus!
message_bus
minitest
mocha
multi_json

View File

@ -1,17 +0,0 @@
*.gem
*.rbc
.bundle
.config
.yardoc
Gemfile.lock
InstalledFiles
_yardoc
coverage
doc/
lib/bundler/man
pkg
rdoc
spec/reports
test/tmp
test/version_tmp
tmp

View File

@ -1,16 +0,0 @@
source 'https://rubygems.org'
# Specify your gem's dependencies in message_bus.gemspec
gemspec
group :test do
gem 'rspec'
gem 'ZenTest'
gem 'autotest'
gem 'redis'
gem 'rake'
gem 'guard-rspec'
gem 'rb-inotify'
gem 'rack'
gem "rack-test", require: "rack/test"
end

View File

@ -1,7 +0,0 @@
guard 'rspec', :focus_on_failed => true do
watch(%r{^spec/.+_spec\.rb$})
watch(%r{^lib/(.+)\.rb$}) { |m| "spec/lib/#{m[1]}_spec.rb" }
watch('spec/spec_helper.rb') { "spec" }
end

View File

@ -1,22 +0,0 @@
Copyright (c) 2012 Sam Saffron
MIT License
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -1,27 +0,0 @@
# MessageBus
This is an extraction of the MessageBus used at discourse.
## Installation
Add this line to your application's Gemfile:
gem 'message_bus'
And then execute:
$ bundle
Or install it yourself as:
$ gem install message_bus
## Usage
## Contributing
1. Fork it
2. Create your feature branch (`git checkout -b my-new-feature`)
3. Commit your changes (`git commit -am 'Added some feature'`)
4. Push to the branch (`git push origin my-new-feature`)
5. Create new Pull Request

View File

@ -1,14 +0,0 @@
require 'rubygems'
require 'bundler'
require 'bundler/gem_tasks'
require 'bundler/setup'
Bundler.require(:default, :test)
task :default => [:spec]
require 'rspec/core'
require 'rspec/core/rake_task'
RSpec::Core::RakeTask.new(:spec) do |spec|
spec.pattern = FileList['spec/**/*_spec.rb']
end

View File

@ -1,7 +0,0 @@
<header>
<h2>Message Bus Diagnostics<h2>
</header>
<div>
{{outlet}}
</div>

View File

@ -1,79 +0,0 @@
window.App = Ember.Application.createWithMixins({
start: function(){
MessageBus.start();
}
});
window.App.start();
App.IndexRoute = Ember.Route.extend({
setupController: function(controller) {
model = App.IndexModel.create();
model.ensureSubscribed();
controller.set('content', model);
}
});
App.IndexView = Ember.View.extend({});
App.Process = Ember.View.extend({
uniqueId: function(){
return this.get('hostname') + this.get('pid');
}.property('hostname', 'pid'),
hup: function(){
$.post("/message-bus/_diagnostics/hup/" + this.get('hostname') + "/" + this.get('pid'));
}
});
App.IndexModel = Ember.Object.extend({
disabled: function(){
return this.get("discovering") ? "disabled" : null;
}.property("discovering"),
ensureSubscribed: function() {
var processes;
var _this = this;
if(this.get("subscribed")) { return; }
MessageBus.callbackInterval = 500;
MessageBus.subscribe("/_diagnostics/process-discovery", function(data){
processes = _this.get('processes');
processes.pushObject(App.Process.create(data));
processes = processes.sort(function(a,b){
return a.get('uniqueId') < b.get('uniqueId') ? -1 : 1;
});
// somewhat odd ...
_this.set('processes', null);
_this.set('processes', processes);
});
this.set("subscribed", true);
},
discover: function(){
var _this = this;
this.set('processes', Em.A());
this.ensureSubscribed();
this.set("discovering", true);
Ember.run.later(function(){
_this.set("discovering", false);
}, 1 * 1000);
$.post("/message-bus/_diagnostics/discover");
}
});
App.IndexController = Ember.ObjectController.extend({
discover: function(){
this.get("content").discover();
},
hup: function(process) {
process.hup();
}
});

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,25 +0,0 @@
<button {{action discover}} {{bindAttr disabled="disabled"}}>Discover Processes</button>
<table>
<thead>
<tr>
<td>pid</td>
<td>full_path</td>
<td>hostname</td>
<td>uptime</td>
<td></td>
</tr>
</thead>
<tbody>
{{#each processes}}
<tr>
<td>{{pid}}</td>
<td>{{full_path}}</td>
<td>{{hostname}}</td>
<td>{{uptime}} secs</td>
<td><button {{action hup this}}>HUP</button></td>
</tr>
{{/each}}
</tbody>
</table>

File diff suppressed because it is too large Load Diff

View File

@ -1,140 +0,0 @@
window.MessageBus = (function() {
var callbacks, clientId, failCount, interval, isHidden, queue, responseCallbacks, uniqueId;
uniqueId = function() {
return 'xxxxxxxxxxxx4xxxyxxxxxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
var r, v;
r = Math.random() * 16 | 0;
v = c === 'x' ? r : r & 0x3 | 0x8;
return v.toString(16);
});
};
clientId = uniqueId();
responseCallbacks = {};
callbacks = [];
queue = [];
interval = null;
failCount = 0;
isHidden = function() {
if (document.hidden !== void 0) {
return document.hidden;
} else if (document.webkitHidden !== void 0) {
return document.webkitHidden;
} else if (document.msHidden !== void 0) {
return document.msHidden;
} else if (document.mozHidden !== void 0) {
return document.mozHidden;
} else {
return !document.hasFocus;
}
};
var processMessages = function(messages) {
failCount = 0;
$.each(messages,function(idx,message) {
gotData = true;
$.each(callbacks,function(idx,callback) {
if (callback.channel === message.channel) {
callback.last_id = message.message_id;
callback.func(message.data);
}
if (message.channel === "/__status") {
if (message.data[callback.channel] !== void 0) {
callback.last_id = message.data[callback.channel];
}
}
});
});
};
return {
enableLongPolling: true,
callbackInterval: 60000,
maxPollInterval: 3 * 60 * 1000,
callbacks: callbacks,
clientId: clientId,
stop: false,
start: function(opts) {
var poll,
_this = this;
if (opts === null) {
opts = {};
}
poll = function() {
var data, gotData;
if (callbacks.length === 0) {
setTimeout(poll, 500);
return;
}
data = {};
$.each(callbacks, function(idx,c) {
return data[c.channel] = c.last_id === void 0 ? -1 : c.last_id;
});
gotData = false;
return _this.longPoll = $.ajax("/message-bus/" + clientId + "/poll?" + (isHidden() || !_this.enableLongPolling ? "dlp=t" : ""), {
data: data,
cache: false,
dataType: 'json',
type: 'POST',
headers: {
'X-SILENCE-LOGGER': 'true'
},
success: function(messages) {
processMessages(messages);
},
error: failCount += 1,
complete: function() {
if (gotData) {
setTimeout(poll, 100);
} else {
interval = _this.callbackInterval;
if (failCount > 2) {
interval = interval * failCount;
} else if (isHidden()) {
interval = interval * 4;
}
if (interval > _this.maxPollInterval) {
interval = _this.maxPollInterval;
}
setTimeout(poll, interval);
}
_this.longPoll = null;
}
});
};
poll();
},
subscribe: function(channel, func, lastId) {
callbacks.push({
channel: channel,
func: func,
last_id: lastId
});
if (this.longPoll) {
return this.longPoll.abort();
}
},
unsubscribe: function(channel) {
var glob;
if (channel.endsWith("*")) {
channel = channel.substr(0, channel.length - 1);
glob = true;
}
callbacks = callbacks.filter(function(callback) {
if (glob) {
return callback.channel.substr(0, channel.length) !== channel;
} else {
return callback.channel !== channel;
}
});
if (this.longPoll) {
return this.longPoll.abort();
}
}
};
})();

View File

@ -1 +0,0 @@
Autotest.add_discovery { "rspec2" }

View File

@ -1,271 +0,0 @@
# require 'thin'
# require 'eventmachine'
# require 'rack'
# require 'redis'
require "message_bus/version"
require "message_bus/message"
require "message_bus/reliable_pub_sub"
require "message_bus/client"
require "message_bus/connection_manager"
require "message_bus/message_handler"
require "message_bus/diagnostics"
require "message_bus/rack/middleware"
require "message_bus/rack/diagnostics"
# we still need to take care of the logger
if defined?(::Rails)
require 'message_bus/rails/railtie'
end
module MessageBus; end
module MessageBus::Implementation
def cache_assets=(val)
@cache_assets = val
end
def cache_assets
if defined? @cache_assets
@cache_assets
else
true
end
end
def logger=(logger)
@logger = logger
end
def logger
return @logger if @logger
require 'logger'
@logger = Logger.new(STDOUT)
end
def sockets_enabled?
@sockets_enabled == false ? false : true
end
def sockets_enabled=(val)
@sockets_enabled = val
end
def long_polling_enabled?
@long_polling_enabled == false ? false : true
end
def long_polling_enabled=(val)
@long_polling_enabled = val
end
def long_polling_interval=(millisecs)
@long_polling_interval = millisecs
end
def long_polling_interval
@long_polling_interval || 30 * 1000
end
def off
@off = true
end
def on
@off = false
end
# Allow us to inject a redis db
def redis_config=(config)
@redis_config = config
end
def redis_config
@redis_config ||= {}
end
def site_id_lookup(&blk)
@site_id_lookup = blk if blk
@site_id_lookup
end
def user_id_lookup(&blk)
@user_id_lookup = blk if blk
@user_id_lookup
end
def is_admin_lookup(&blk)
@is_admin_lookup = blk if blk
@is_admin_lookup
end
def on_connect(&blk)
@on_connect = blk if blk
@on_connect
end
def on_disconnect(&blk)
@on_disconnect = blk if blk
@on_disconnect
end
def allow_broadcast=(val)
@allow_broadcast = val
end
def allow_broadcast?
@allow_broadcast ||=
if defined? ::Rails
::Rails.env.test? || ::Rails.env.development?
else
false
end
end
def reliable_pub_sub
@reliable_pub_sub ||= MessageBus::ReliablePubSub.new redis_config
end
def enable_diagnostics
MessageBus::Diagnostics.enable
end
def publish(channel, data, opts = nil)
return if @off
user_ids = nil
if opts
user_ids = opts[:user_ids] if opts
end
encoded_data = JSON.dump({
data: data,
user_ids: user_ids
})
reliable_pub_sub.publish(encode_channel_name(channel), encoded_data)
end
def blocking_subscribe(channel=nil, &blk)
if channel
reliable_pub_sub.subscribe(encode_channel_name(channel), &blk)
else
reliable_pub_sub.global_subscribe(&blk)
end
end
ENCODE_SITE_TOKEN = "$|$"
# encode channel name to include site
def encode_channel_name(channel)
if MessageBus.site_id_lookup
raise ArgumentError.new channel if channel.include? ENCODE_SITE_TOKEN
"#{channel}#{ENCODE_SITE_TOKEN}#{MessageBus.site_id_lookup.call}"
else
channel
end
end
def decode_channel_name(channel)
channel.split(ENCODE_SITE_TOKEN)
end
def subscribe(channel=nil, &blk)
subscribe_impl(channel, nil, &blk)
end
# subscribe only on current site
def local_subscribe(channel=nil, &blk)
site_id = MessageBus.site_id_lookup.call if MessageBus.site_id_lookup
subscribe_impl(channel, site_id, &blk)
end
def backlog(channel=nil, last_id)
old =
if channel
reliable_pub_sub.backlog(encode_channel_name(channel), last_id)
else
reliable_pub_sub.global_backlog(encode_channel_name(channel), last_id)
end
old.each{ |m|
decode_message!(m)
}
old
end
def last_id(channel)
reliable_pub_sub.last_id(encode_channel_name(channel))
end
protected
def decode_message!(msg)
channel, site_id = decode_channel_name(msg.channel)
msg.channel = channel
msg.site_id = site_id
parsed = JSON.parse(msg.data)
msg.data = parsed["data"]
msg.user_ids = parsed["user_ids"]
end
def subscribe_impl(channel, site_id, &blk)
@subscriptions ||= {}
@subscriptions[site_id] ||= {}
@subscriptions[site_id][channel] ||= []
@subscriptions[site_id][channel] << blk
ensure_subscriber_thread
end
def ensure_subscriber_thread
@mutex ||= Mutex.new
@mutex.synchronize do
return if @subscriber_thread
@subscriber_thread = Thread.new do
reliable_pub_sub.global_subscribe do |msg|
begin
decode_message!(msg)
globals = @subscriptions[nil]
locals = @subscriptions[msg.site_id] if msg.site_id
global_globals = globals[nil] if globals
local_globals = locals[nil] if locals
globals = globals[msg.channel] if globals
locals = locals[msg.channel] if locals
multi_each(globals,locals, global_globals, local_globals) do |c|
begin
c.call msg
rescue => e
MessageBus.logger.warn "failed to deliver message, skipping #{msg.inspect}\n ex: #{e} backtrace: #{e.backtrace}"
end
end
rescue => e
MessageBus.logger.warn "failed to process message #{msg.inspect}\n ex: #{e} backtrace: #{e.backtrace}"
end
end
end
end
end
def multi_each(*args,&block)
args.each do |a|
a.each(&block) if a
end
end
end
module MessageBus
extend MessageBus::Implementation
end
# allows for multiple buses per app
class MessageBus::Instance
include MessageBus::Implementation
end

View File

@ -1,70 +0,0 @@
class MessageBus::Client
attr_accessor :client_id, :user_id, :connect_time, :subscribed_sets, :site_id, :cleanup_timer, :async_response
def initialize(opts)
self.client_id = opts[:client_id]
self.user_id = opts[:user_id]
self.site_id = opts[:site_id]
self.connect_time = Time.now
@subscriptions = {}
end
def close
return unless @async_response
write_and_close "[]"
end
def closed
!@async_response
end
def subscribe(channel, last_seen_id)
last_seen_id ||= MessageBus.last_id(channel)
@subscriptions[channel] = last_seen_id
end
def subscriptions
@subscriptions
end
def <<(msg)
write_and_close messages_to_json([msg])
end
def subscriptions
@subscriptions
end
def backlog
r = []
@subscriptions.each do |k,v|
next if v.to_i < 0
messages = MessageBus.backlog(k,v)
messages.each do |msg|
allowed = !msg.user_ids || msg.user_ids.include?(self.user_id)
r << msg if allowed
end
end
# stats message for all newly subscribed
status_message = nil
@subscriptions.each do |k,v|
if v.to_i == -1
status_message ||= {}
status_message[k] = MessageBus.last_id(k)
end
end
r << MessageBus::Message.new(-1, -1, '/__status', status_message) if status_message
r
end
protected
def write_and_close(data)
@async_response << data
@async_response.done
@async_response = nil
end
def messages_to_json(msgs)
MessageBus::Rack::Middleware.backlog_to_json(msgs)
end
end

View File

@ -1,69 +0,0 @@
require 'json' unless defined? ::JSON
class MessageBus::ConnectionManager
def initialize
@clients = {}
@subscriptions = {}
end
def notify_clients(msg)
begin
site_subs = @subscriptions[msg.site_id]
subscription = site_subs[msg.channel] if site_subs
return unless subscription
subscription.each do |client_id|
client = @clients[client_id]
if client
allowed = !msg.user_ids || msg.user_ids.include?(client.user_id)
if allowed
client << msg
# turns out you can delete from a set while itereating
remove_client(client)
end
end
end
rescue => e
MessageBus.logger.error "notify clients crash #{e} : #{e.backtrace}"
end
end
def add_client(client)
@clients[client.client_id] = client
@subscriptions[client.site_id] ||= {}
client.subscriptions.each do |k,v|
subscribe_client(client, k)
end
end
def remove_client(c)
@clients.delete c.client_id
@subscriptions[c.site_id].each do |k, set|
set.delete c.client_id
end
c.cleanup_timer.cancel
end
def lookup_client(client_id)
@clients[client_id]
end
def subscribe_client(client,channel)
set = @subscriptions[client.site_id][channel]
unless set
set = Set.new
@subscriptions[client.site_id][channel] = set
end
set << client.client_id
end
def stats
{
client_count: @clients.length,
subscriptions: @subscriptions
}
end
end

View File

@ -1,53 +0,0 @@
class MessageBus::Diagnostics
def self.full_process_path
begin
system = `uname`.strip
if system == "Darwin"
`ps -o "comm=" -p #{Process.pid}`
elsif system == "FreeBSD"
`ps -o command -p #{Process.pid}`.split("\n",2)[1].strip()
else
info = `ps -eo "%p|$|%a" | grep '^\\s*#{Process.pid}'`
info.strip.split('|$|')[1]
end
rescue
# skip it ... not linux or something weird
end
end
def self.hostname
begin
`hostname`.strip
rescue
# skip it
end
end
def self.enable
full_path = full_process_path
start_time = Time.now.to_f
hostname = self.hostname
# it may make sense to add a channel per machine/host to streamline
# process to process comms
MessageBus.subscribe('/_diagnostics/hup') do |msg|
if Process.pid == msg.data["pid"] && hostname == msg.data["hostname"]
$shutdown = true
sleep 4
Process.kill("HUP", $$)
end
end
MessageBus.subscribe('/_diagnostics/discover') do |msg|
MessageBus.on_connect.call msg.site_id if MessageBus.on_connect
MessageBus.publish '/_diagnostics/process-discovery', {
pid: Process.pid,
process_name: $0,
full_path: full_path,
uptime: (Time.now.to_f - start_time).to_i,
hostname: hostname
}, user_ids: [msg.data["user_id"]]
MessageBus.on_disconnect.call msg.site_id if MessageBus.on_disconnect
end
end
end

View File

@ -1,17 +0,0 @@
class MessageBus::Message < Struct.new(:global_id, :message_id, :channel , :data)
attr_accessor :site_id, :user_ids
def self.decode(encoded)
s1 = encoded.index("|")
s2 = encoded.index("|", s1+1)
s3 = encoded.index("|", s2+1)
MessageBus::Message.new encoded[0..s1].to_i, encoded[s1+1..s2].to_i, encoded[s2+1..s3-1].gsub("$$123$$", "|"), encoded[s3+1..-1]
end
# only tricky thing to encode is pipes in a channel name ... do a straight replace
def encode
global_id.to_s << "|" << message_id.to_s << "|" << channel.gsub("|","$$123$$") << "|" << data
end
end

View File

@ -1,26 +0,0 @@
class MessageBus::MessageHandler
def self.load_handlers(path)
Dir.glob("#{path}/*.rb").each do |f|
load "#{f}"
end
end
def self.handle(name,&blk)
raise ArgumentError.new("expecting block") unless block_given?
raise ArgumentError.new("name") unless name
@@handlers ||= {}
@@handlers[name] = blk
end
def self.call(site_id, name, data, current_user_id)
begin
MessageBus.on_connect.call(site_id) if MessageBus.on_connect
@@handlers[name].call(data,current_user_id)
ensure
MessageBus.on_disconnect.call(site_id) if MessageBus.on_disconnect
end
end
end

View File

@ -1,98 +0,0 @@
module MessageBus::Rack; end
class MessageBus::Rack::Diagnostics
def initialize(app, config = {})
@app = app
end
def js_asset(name)
return generate_script_tag(name) unless MessageBus.cache_assets
@@asset_cache ||= {}
@@asset_cache[name] ||= generate_script_tag(name)
@@asset_cache[name]
end
def generate_script_tag(name)
"<script src='/message-bus/_diagnostics/assets/#{name}?#{file_hash(name)}' type='text/javascript'></script>"
end
def file_hash(asset)
require 'digest/sha1'
Digest::SHA1.hexdigest(asset_contents(asset))
end
def asset_contents(asset)
File.open(asset_path(asset)).read
end
def asset_path(asset)
File.expand_path("../../../../assets/#{asset}", __FILE__)
end
def index
html = <<HTML
<!DOCTYPE html>
<html>
<head>
</head>
<body>
<div id="app"></div>
#{js_asset "jquery-1.8.2.js"}
#{js_asset "handlebars.js"}
#{js_asset "ember.js"}
#{js_asset "message-bus.js"}
#{js_asset "application.handlebars"}
#{js_asset "index.handlebars"}
#{js_asset "application.js"}
</body>
</html>
HTML
return [200, {"content-type" => "text/html;"}, [html]]
end
def translate_handlebars(name, content)
"Ember.TEMPLATES['#{name}'] = Ember.Handlebars.compile(#{indent(content).inspect});"
end
# from ember-rails
def indent(string)
string.gsub(/$(.)/m, "\\1 ").strip
end
def call(env)
return @app.call(env) unless env['PATH_INFO'].start_with? '/message-bus/_diagnostics'
route = env['PATH_INFO'].split('/message-bus/_diagnostics')[1]
if MessageBus.is_admin_lookup.nil? || !MessageBus.is_admin_lookup.call(env)
return [403, {}, ['not allowed']]
end
return index unless route
if route == '/discover'
user_id = MessageBus.user_id_lookup.call(env)
MessageBus.publish('/_diagnostics/discover', user_id: user_id)
return [200, {}, ['ok']]
end
if route =~ /^\/hup\//
hostname, pid = route.split('/hup/')[1].split('/')
MessageBus.publish('/_diagnostics/hup', {hostname: hostname, pid: pid.to_i})
return [200, {}, ['ok']]
end
asset = route.split('/assets/')[1]
if asset && !asset !~ /\//
content = asset_contents(asset)
split = asset.split('.')
if split[1] == 'handlebars'
content = translate_handlebars(split[0],content)
end
return [200, {'content-type' => 'text/javascript;'}, [content]]
end
return [404, {}, ['not found']]
end
end

View File

@ -1,170 +0,0 @@
# our little message bus, accepts long polling and web sockets
require 'thin'
require 'eventmachine'
module MessageBus::Rack; end
class MessageBus::Rack::Middleware
def self.start_listener
unless @started_listener
MessageBus.subscribe do |msg|
if EM.reactor_running?
EM.next_tick do
@@connection_manager.notify_clients(msg) if @@connection_manager
end
end
end
@started_listener = true
end
end
def initialize(app, config = {})
@app = app
@@connection_manager = MessageBus::ConnectionManager.new
self.class.start_listener
end
def self.backlog_to_json(backlog)
m = backlog.map do |msg|
{
:global_id => msg.global_id,
:message_id => msg.message_id,
:channel => msg.channel,
:data => msg.data
}
end.to_a
JSON.dump(m)
end
def call(env)
return @app.call(env) unless env['PATH_INFO'] =~ /^\/message-bus/
# special debug/test route
if ::MessageBus.allow_broadcast? && env['PATH_INFO'] == '/message-bus/broadcast'
parsed = Rack::Request.new(env)
::MessageBus.publish parsed["channel"], parsed["data"]
return [200,{"Content-Type" => "text/html"},["sent"]]
end
if env['PATH_INFO'].start_with? '/message-bus/_diagnostics'
diags = MessageBus::Rack::Diagnostics.new(@app)
return diags.call(env)
end
client_id = env['PATH_INFO'].split("/")[2]
return [404, {}, ["not found"]] unless client_id
user_id = MessageBus.user_id_lookup.call(env) if MessageBus.user_id_lookup
site_id = MessageBus.site_id_lookup.call(env) if MessageBus.site_id_lookup
client = MessageBus::Client.new(client_id: client_id, user_id: user_id, site_id: site_id)
connection = env['em.connection']
request = Rack::Request.new(env)
request.POST.each do |k,v|
client.subscribe(k, v)
end
backlog = client.backlog
headers = {}
headers["Cache-Control"] = "must-revalidate, private, max-age=0"
headers["Content-Type"] ="application/json; charset=utf-8"
if backlog.length > 0
[200, headers, [self.class.backlog_to_json(backlog)] ]
elsif MessageBus.long_polling_enabled? && env['QUERY_STRING'] !~ /dlp=t/ && EM.reactor_running?
response = Thin::AsyncResponse.new(env)
response.headers["Cache-Control"] = "must-revalidate, private, max-age=0"
response.headers["Content-Type"] ="application/json; charset=utf-8"
response.status = 200
client.async_response = response
@@connection_manager.add_client(client)
client.cleanup_timer = ::EM::Timer.new(MessageBus.long_polling_interval.to_f / 1000) {
client.close
@@connection_manager.remove_client(client)
}
throw :async
else
[200, headers, ["[]"]]
end
end
end
# there is also another in cramp this is from https://github.com/macournoyer/thin_async/blob/master/lib/thin/async.rb
module Thin
unless defined?(DeferrableBody)
# Based on version from James Tucker <raggi@rubyforge.org>
class DeferrableBody
include ::EM::Deferrable
def initialize
@queue = []
end
def call(body)
@queue << body
schedule_dequeue
end
def each(&blk)
@body_callback = blk
schedule_dequeue
end
private
def schedule_dequeue
return unless @body_callback
::EM.next_tick do
next unless body = @queue.shift
body.each do |chunk|
@body_callback.call(chunk)
end
schedule_dequeue unless @queue.empty?
end
end
end
end
# Response whos body is sent asynchronously.
class AsyncResponse
include Rack::Response::Helpers
attr_reader :headers, :callback, :closed
attr_accessor :status
def initialize(env, status=200, headers={})
@callback = env['async.callback']
@body = DeferrableBody.new
@status = status
@headers = headers
@headers_sent = false
end
def send_headers
return if @headers_sent
@callback.call [@status, @headers, @body]
@headers_sent = true
end
def write(body)
send_headers
@body.call(body.respond_to?(:each) ? body : [body])
end
alias :<< :write
# Tell Thin the response is complete and the connection can be closed.
def done
@closed = true
send_headers
::EM.next_tick { @body.succeed }
end
end
end

View File

@ -1,9 +0,0 @@
module MessageBus; module Rails; end; end
class MessageBus::Rails::Railtie < ::Rails::Railtie
initializer "message_bus.configure_init" do |app|
MessageBus::MessageHandler.load_handlers("#{Rails.root}/app/message_handlers")
app.middleware.use MessageBus::Rack::Middleware
MessageBus.logger = Rails.logger
end
end

View File

@ -1,262 +0,0 @@
require 'redis'
# the heart of the message bus, it acts as 2 things
#
# 1. A channel multiplexer
# 2. Backlog storage per-multiplexed channel.
#
# ids are all sequencially increasing numbers starting at 0
#
class MessageBus::ReliablePubSub
class NoMoreRetries < StandardError; end
class BackLogOutOfOrder < StandardError
attr_accessor :highest_id
def initialize(highest_id)
@highest_id = highest_id
end
end
def max_publish_retries=(val)
@max_publish_retries = val
end
def max_publish_retries
@max_publish_retries ||= 10
end
def max_publish_wait=(ms)
@max_publish_wait = ms
end
def max_publish_wait
@max_publish_wait ||= 500
end
# max_backlog_size is per multiplexed channel
def initialize(redis_config = {}, max_backlog_size = 1000)
@redis_config = redis_config
@max_backlog_size = 1000
# we can store a ton here ...
@max_global_backlog_size = 100000
end
# amount of global backlog we can spin through
def max_global_backlog_size=(val)
@max_global_backlog_size = val
end
# per channel backlog size
def max_backlog_size=(val)
@max_backlog_size = val
end
def new_redis_connection
::Redis.new(@redis_config)
end
def redis_channel_name
db = @redis_config[:db] || 0
"discourse_#{db}"
end
# redis connection used for publishing messages
def pub_redis
@pub_redis ||= new_redis_connection
end
def backlog_key(channel)
"__mb_backlog_n_#{channel}"
end
def backlog_id_key(channel)
"__mb_backlog_id_n_#{channel}"
end
def global_id_key
"__mb_global_id_n"
end
def global_backlog_key
"__mb_global_backlog_n"
end
# use with extreme care, will nuke all of the data
def reset!
pub_redis.keys("__mb_*").each do |k|
pub_redis.del k
end
end
def publish(channel, data)
redis = pub_redis
backlog_id_key = backlog_id_key(channel)
backlog_key = backlog_key(channel)
global_id = nil
backlog_id = nil
redis.multi do |m|
global_id = m.incr(global_id_key)
backlog_id = m.incr(backlog_id_key)
end
global_id = global_id.value
backlog_id = backlog_id.value
msg = MessageBus::Message.new global_id, backlog_id, channel, data
payload = msg.encode
redis.zadd backlog_key, backlog_id, payload
redis.zadd global_backlog_key, global_id, backlog_id.to_s << "|" << channel
redis.publish redis_channel_name, payload
if backlog_id > @max_backlog_size
redis.zremrangebyscore backlog_key, 1, backlog_id - @max_backlog_size
end
if global_id > @max_global_backlog_size
redis.zremrangebyscore global_backlog_key, 1, backlog_id - @max_backlog_size
end
backlog_id
end
def last_id(channel)
redis = pub_redis
backlog_id_key = backlog_id_key(channel)
redis.get(backlog_id_key).to_i
end
def backlog(channel, last_id = nil)
redis = pub_redis
backlog_key = backlog_key(channel)
items = redis.zrangebyscore backlog_key, last_id.to_i + 1, "+inf"
items.map do |i|
MessageBus::Message.decode(i)
end
end
def global_backlog(last_id = nil)
last_id = last_id.to_i
redis = pub_redis
items = redis.zrangebyscore global_backlog_key, last_id.to_i + 1, "+inf"
items.map! do |i|
pipe = i.index "|"
message_id = i[0..pipe].to_i
channel = i[pipe+1..-1]
m = get_message(channel, message_id)
m
end
items.compact!
items
end
def get_message(channel, message_id)
redis = pub_redis
backlog_key = backlog_key(channel)
items = redis.zrangebyscore backlog_key, message_id, message_id
if items && items[0]
MessageBus::Message.decode(items[0])
else
nil
end
end
def subscribe(channel, last_id = nil)
# trivial implementation for now,
# can cut down on connections if we only have one global subscriber
raise ArgumentError unless block_given?
if last_id
# we need to translate this to a global id, at least give it a shot
# we are subscribing on global and global is always going to be bigger than local
# so worst case is a replay of a few messages
message = get_message(channel, last_id)
if message
last_id = message.global_id
end
end
global_subscribe(last_id) do |m|
yield m if m.channel == channel
end
end
def process_global_backlog(highest_id, raise_error, &blk)
global_backlog(highest_id).each do |old|
if highest_id + 1 == old.global_id
yield old
highest_id = old.global_id
else
raise BackLogOutOfOrder.new(highest_id) if raise_error
if old.global_id > highest_id
yield old
highest_id = old.global_id
end
end
end
highest_id
end
def global_subscribe(last_id=nil, &blk)
raise ArgumentError unless block_given?
highest_id = last_id
clear_backlog = lambda do
retries = 4
begin
highest_id = process_global_backlog(highest_id, retries > 0, &blk)
rescue BackLogOutOfOrder => e
highest_id = e.highest_id
retries -= 1
sleep(rand(50) / 1000.0)
retry
end
end
begin
redis = new_redis_connection
if highest_id
clear_backlog.call(&blk)
end
redis.subscribe(redis_channel_name) do |on|
on.subscribe do
if highest_id
clear_backlog.call(&blk)
end
end
on.message do |c,m|
m = MessageBus::Message.decode m
# we have 2 options
#
# 1. message came in the correct order GREAT, just deal with it
# 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog
if highest_id.nil? || m.global_id == highest_id + 1
highest_id = m.global_id
yield m
else
clear_backlog.call(&blk)
end
end
end
rescue => error
MessageBus.logger.warn "#{error} subscribe failed, reconnecting in 1 second. Call stack #{error.backtrace}"
sleep 1
retry
end
end
end

View File

@ -1,3 +0,0 @@
module MessageBus
VERSION = "0.0.1"
end

View File

@ -1,23 +0,0 @@
# -*- encoding: utf-8 -*-
require File.expand_path('../lib/message_bus/version', __FILE__)
Gem::Specification.new do |gem|
gem.authors = ["Sam Saffron"]
gem.email = ["sam.saffron@gmail.com"]
gem.description = %q{A message bus built on websockets}
gem.summary = %q{}
gem.homepage = ""
# when this is extracted comment it back in, prd has no .git
# gem.files = `git ls-files`.split($\)
gem.files = Dir['README*','LICENSE','lib/**/*.rb']
gem.executables = gem.files.grep(%r{^bin/}).map{ |f| File.basename(f) }
gem.test_files = gem.files.grep(%r{^(test|spec|features)/})
gem.name = "message_bus"
gem.require_paths = ["lib"]
gem.version = MessageBus::VERSION
gem.add_runtime_dependency 'rack', '>= 1.1.3'
gem.add_runtime_dependency 'thin'
gem.add_runtime_dependency 'eventmachine'
gem.add_runtime_dependency 'redis'
end

View File

@ -1,27 +0,0 @@
require 'spec_helper'
require 'message_bus'
describe MessageBus::Client do
describe "subscriptions" do
before do
@client = MessageBus::Client.new :client_id => 'abc'
end
it "should provide a list of subscriptions" do
@client.subscribe('/hello', nil)
@client.subscriptions['/hello'].should_not be_nil
end
it "should provide backlog for subscribed channel" do
@client.subscribe('/hello', nil)
MessageBus.publish '/hello', 'world'
log = @client.backlog
log.length.should == 1
log[0].channel.should == '/hello'
log[0].data.should == 'world'
end
end
end

View File

@ -1,83 +0,0 @@
require 'spec_helper'
require 'message_bus'
class FakeAsync
attr_accessor :cleanup_timer
def <<(val)
@sent ||= ""
@sent << val
end
def sent; @sent; end
def done; @done = true; end
def done?; @done; end
end
class FakeTimer
attr_accessor :cancelled
def cancel; @cancelled = true; end
end
describe MessageBus::ConnectionManager do
before do
@manager = MessageBus::ConnectionManager.new
@client = MessageBus::Client.new(client_id: "xyz", user_id: 1, site_id: 10)
@resp = FakeAsync.new
@client.async_response = @resp
@client.subscribe('test', -1)
@manager.add_client(@client)
@client.cleanup_timer = FakeTimer.new
end
it "should cancel the timer after its responds" do
m = MessageBus::Message.new(1,1,"test","data")
m.site_id = 10
@manager.notify_clients(m)
@client.cleanup_timer.cancelled.should == true
end
it "should be able to lookup an identical client" do
@manager.lookup_client(@client.client_id).should == @client
end
it "should be subscribed to a channel" do
@manager.stats[:subscriptions][10]["test"].length == 1
end
it "should not notify clients on incorrect site" do
m = MessageBus::Message.new(1,1,"test","data")
m.site_id = 9
@manager.notify_clients(m)
@resp.sent.should == nil
end
it "should notify clients on the correct site" do
m = MessageBus::Message.new(1,1,"test","data")
m.site_id = 10
@manager.notify_clients(m)
@resp.sent.should_not == nil
end
it "should strip site id and user id from the payload delivered" do
m = MessageBus::Message.new(1,1,"test","data")
m.user_ids = [1]
m.site_id = 10
@manager.notify_clients(m)
parsed = JSON.parse(@resp.sent)
parsed[0]["site_id"].should == nil
parsed[0]["user_id"].should == nil
end
it "should not deliver unselected" do
m = MessageBus::Message.new(1,1,"test","data")
m.user_ids = [5]
m.site_id = 10
@manager.notify_clients(m)
@resp.sent.should == nil
end
end

View File

@ -1,5 +0,0 @@
class DemoMessageHandler < MessageBus::MessageHandler
handle "/dupe" do |m, uid|
"#{m}#{m}"
end
end

View File

@ -1,76 +0,0 @@
require 'spec_helper'
require 'message_bus'
require 'redis'
describe MessageBus do
before do
MessageBus.site_id_lookup do
"magic"
end
MessageBus.redis_config = {}
end
it "should automatically decode hashed messages" do
data = nil
MessageBus.subscribe("/chuck") do |msg|
data = msg.data
end
MessageBus.publish("/chuck", {:norris => true})
wait_for(1000){ data }
data["norris"].should == true
end
it "should get a message if it subscribes to it" do
@data,@site_id,@channel = nil
MessageBus.subscribe("/chuck") do |msg|
@data = msg.data
@site_id = msg.site_id
@channel = msg.channel
@user_ids = msg.user_ids
end
MessageBus.publish("/chuck", "norris", user_ids: [1,2,3])
wait_for(1000){@data}
@data.should == 'norris'
@site_id.should == 'magic'
@channel.should == '/chuck'
@user_ids.should == [1,2,3]
end
it "should get global messages if it subscribes to them" do
@data,@site_id,@channel = nil
MessageBus.subscribe do |msg|
@data = msg.data
@site_id = msg.site_id
@channel = msg.channel
end
MessageBus.publish("/chuck", "norris")
wait_for(1000){@data}
@data.should == 'norris'
@site_id.should == 'magic'
@channel.should == '/chuck'
end
it "should have the ability to grab the backlog messages in the correct order" do
id = MessageBus.publish("/chuck", "norris")
MessageBus.publish("/chuck", "foo")
MessageBus.publish("/chuck", "bar")
r = MessageBus.backlog("/chuck", id)
r.map{|i| i.data}.to_a.should == ['foo', 'bar']
end
end

View File

@ -1,39 +0,0 @@
require 'spec_helper'
require 'message_bus'
describe MessageBus::MessageHandler do
it "should properly register message handlers" do
MessageBus::MessageHandler.handle "/hello" do |m|
m
end
MessageBus::MessageHandler.call("site","/hello", "world", 1).should == "world"
end
it "should correctly load message handlers" do
MessageBus::MessageHandler.load_handlers("#{File.dirname(__FILE__)}/handlers")
MessageBus::MessageHandler.call("site","/dupe", "1", 1).should == "11"
end
it "should allow for a connect / disconnect callback" do
MessageBus::MessageHandler.handle "/channel" do |m|
m
end
connected = false
disconnected = false
MessageBus.on_connect do |site_id|
connected = true
end
MessageBus.on_disconnect do |site_id|
disconnected = true
end
MessageBus::MessageHandler.call("site_id", "/channel", "data", 1)
connected.should == true
disconnected.should == true
end
end

View File

@ -1,199 +0,0 @@
require 'spec_helper'
require 'message_bus'
require 'rack/test'
describe MessageBus::Rack::Middleware do
include Rack::Test::Methods
class FakeAsyncMiddleware
def self.in_async?
@@in_async if defined? @@in_async
end
def initialize(app,config={})
@app = app
end
def call(env)
result = nil
EM.run {
env['async.callback'] = lambda { |r|
# more judo with deferrable body, at this point we just have headers
r[2].callback do
# even more judo cause rack test does not call each like the spec says
body = ""
r[2].each do |m|
body << m
end
r[2] = [body]
result = r
end
}
catch(:async) {
result = @app.call(env)
}
EM::Timer.new(1) { EM.stop }
defer = lambda {
if !result
@@in_async = true
EM.next_tick do
defer.call
end
else
EM.next_tick { EM.stop }
end
}
defer.call
}
@@in_async = false
result || [500, {}, ['timeout']]
end
end
def app
@app ||= Rack::Builder.new {
use FakeAsyncMiddleware
use MessageBus::Rack::Middleware
run lambda {|env| [500, {'Content-Type' => 'text/html'}, 'should not be called' ]}
}.to_app
end
describe "long polling" do
before do
MessageBus.sockets_enabled = false
MessageBus.long_polling_enabled = true
end
it "should respond right away if dlp=t" do
post "/message-bus/ABC?dlp=t", '/foo1' => 0
FakeAsyncMiddleware.in_async?.should == false
last_response.should be_ok
end
it "should respond right away to long polls that are polling on -1 with the last_id" do
post "/message-bus/ABC", '/foo' => -1
last_response.should be_ok
parsed = JSON.parse(last_response.body)
parsed.length.should == 1
parsed[0]["channel"].should == "/__status"
parsed[0]["data"]["/foo"].should == MessageBus.last_id("/foo")
end
it "should respond to long polls when data is available" do
Thread.new do
wait_for(2000) { FakeAsyncMiddleware.in_async? }
MessageBus.publish "/foo", "bar"
end
post "/message-bus/ABC", '/foo' => nil
last_response.should be_ok
parsed = JSON.parse(last_response.body)
parsed.length.should == 1
parsed[0]["data"].should == "bar"
end
it "should timeout within its alloted slot" do
begin
MessageBus.long_polling_interval = 10
s = Time.now.to_f * 1000
post "/message-bus/ABC", '/foo' => nil
(Time.now.to_f * 1000 - s).should < 30
ensure
MessageBus.long_polling_interval = 5000
end
end
end
describe "diagnostics" do
it "should return a 403 if a user attempts to get at the _diagnostics path" do
get "/message-bus/_diagnostics"
last_response.status.should == 403
end
it "should get a 200 with html for an authorized user" do
MessageBus.stub(:is_admin_lookup).and_return(lambda{|env| true })
get "/message-bus/_diagnostics"
last_response.status.should == 200
end
it "should get the script it asks for" do
MessageBus.stub(:is_admin_lookup).and_return(lambda{|env| true })
get "/message-bus/_diagnostics/assets/message-bus.js"
last_response.status.should == 200
last_response.content_type.should == "text/javascript;"
end
end
describe "polling" do
before do
MessageBus.sockets_enabled = false
MessageBus.long_polling_enabled = false
end
it "should respond with a 200 to a subscribe" do
client_id = "ABCD"
# client always keeps a list of channels with last message id they got on each
post "/message-bus/#{client_id}", {
'/foo' => nil,
'/bar' => nil
}
last_response.should be_ok
end
it "should correctly understand that -1 means stuff from now onwards" do
MessageBus.publish('foo', 'bar')
post "/message-bus/ABCD", {
'/foo' => -1
}
last_response.should be_ok
parsed = JSON.parse(last_response.body)
parsed.length.should == 1
parsed[0]["channel"].should == "/__status"
parsed[0]["data"]["/foo"].should == MessageBus.last_id("/foo")
end
it "should respond with the data if messages exist in the backlog" do
id = MessageBus.last_id('/foo')
MessageBus.publish("/foo", "barbs")
MessageBus.publish("/foo", "borbs")
client_id = "ABCD"
post "/message-bus/#{client_id}", {
'/foo' => id,
'/bar' => nil
}
parsed = JSON.parse(last_response.body)
parsed.length.should == 2
parsed[0]["data"].should == "barbs"
parsed[1]["data"].should == "borbs"
end
it "should not get consumed messages" do
MessageBus.publish("/foo", "barbs")
id = MessageBus.last_id('/foo')
client_id = "ABCD"
post "/message-bus/#{client_id}", {
'/foo' => id
}
parsed = JSON.parse(last_response.body)
parsed.length.should == 0
end
end
end

View File

@ -1,60 +0,0 @@
require 'spec_helper'
require 'message_bus'
describe MessageBus::ReliablePubSub do
def new_bus
MessageBus::ReliablePubSub.new(:db => 10)
end
def work_it
Signal.trap("HUP") { exit }
bus = new_bus
$stdout.reopen("/dev/null", "w")
$stderr.reopen("/dev/null", "w")
# subscribe blocks, so we need a new bus to transmit
new_bus.subscribe("/echo", 0) do |msg|
bus.publish("/response", Process.pid.to_s)
end
end
def spawn_child
r = fork
if r.nil?
work_it
else
r
end
end
it 'gets every response from child processes' do
pid = nil
Redis.new(:db => 10).flushdb
begin
pids = (1..10).map{spawn_child}
responses = []
bus = MessageBus::ReliablePubSub.new(:db => 10)
Thread.new do
bus.subscribe("/response", 0) do |msg|
responses << msg if pids.include? msg.data.to_i
end
end
10.times{bus.publish("/echo", Process.pid.to_s)}
wait_for 4000 do
responses.count == 100
end
# p responses.group_by(&:data).map{|k,v|[k, v.count]}
# p responses.group_by(&:global_id).map{|k,v|[k, v.count]}
responses.count.should == 100
ensure
if pids
pids.each do |pid|
Process.kill("HUP", pid)
Process.wait(pid)
end
end
end
end
end

View File

@ -1,167 +0,0 @@
require 'spec_helper'
require 'message_bus'
describe MessageBus::ReliablePubSub do
def new_test_bus
MessageBus::ReliablePubSub.new(:db => 10)
end
before do
@bus = new_test_bus
@bus.reset!
end
it "should be able to access the backlog" do
@bus.publish "/foo", "bar"
@bus.publish "/foo", "baz"
@bus.backlog("/foo", 0).to_a.should == [
MessageBus::Message.new(1,1,'/foo','bar'),
MessageBus::Message.new(2,2,'/foo','baz')
]
end
it "should truncate channels correctly" do
@bus.max_backlog_size = 2
4.times do |t|
@bus.publish "/foo", t.to_s
end
@bus.backlog("/foo").to_a.should == [
MessageBus::Message.new(3,3,'/foo','2'),
MessageBus::Message.new(4,4,'/foo','3'),
]
end
it "should be able to grab a message by id" do
id1 = @bus.publish "/foo", "bar"
id2 = @bus.publish "/foo", "baz"
@bus.get_message("/foo", id2).should == MessageBus::Message.new(2, 2, "/foo", "baz")
@bus.get_message("/foo", id1).should == MessageBus::Message.new(1, 1, "/foo", "bar")
end
it "should be able to access the global backlog" do
@bus.publish "/foo", "bar"
@bus.publish "/hello", "world"
@bus.publish "/foo", "baz"
@bus.publish "/hello", "planet"
@bus.global_backlog.to_a.should == [
MessageBus::Message.new(1, 1, "/foo", "bar"),
MessageBus::Message.new(2, 1, "/hello", "world"),
MessageBus::Message.new(3, 2, "/foo", "baz"),
MessageBus::Message.new(4, 2, "/hello", "planet")
]
end
it "should correctly omit dropped messages from the global backlog" do
@bus.max_backlog_size = 1
@bus.publish "/foo", "a"
@bus.publish "/foo", "b"
@bus.publish "/bar", "a"
@bus.publish "/bar", "b"
@bus.global_backlog.to_a.should == [
MessageBus::Message.new(2, 2, "/foo", "b"),
MessageBus::Message.new(4, 2, "/bar", "b")
]
end
it "should have the correct number of messages for multi threaded access" do
threads = []
4.times do
threads << Thread.new do
bus = new_test_bus
25.times {
bus.publish "/foo", "."
}
end
end
threads.each{|t| t.join}
@bus.backlog("/foo").length == 100
end
it "should be able to subscribe globally with recovery" do
@bus.publish("/foo", "1")
@bus.publish("/bar", "2")
got = []
t = Thread.new do
new_test_bus.global_subscribe(0) do |msg|
got << msg
end
end
@bus.publish("/bar", "3")
wait_for(100) do
got.length == 3
end
t.kill
got.length.should == 3
got.map{|m| m.data}.should == ["1","2","3"]
end
it "should be able to encode and decode messages properly" do
m = MessageBus::Message.new 1,2,'||','||'
MessageBus::Message.decode(m.encode).should == m
end
it "should handle subscribe on single channel, with recovery" do
@bus.publish("/foo", "1")
@bus.publish("/bar", "2")
got = []
t = Thread.new do
new_test_bus.subscribe("/foo",0) do |msg|
got << msg
end
end
@bus.publish("/foo", "3")
wait_for(100) do
got.length == 2
end
t.kill
got.map{|m| m.data}.should == ["1","3"]
end
it "should not get backlog if subscribe is called without params" do
@bus.publish("/foo", "1")
got = []
t = Thread.new do
new_test_bus.subscribe("/foo") do |msg|
got << msg
end
end
# sleep 50ms to allow the bus to correctly subscribe,
# I thought about adding a subscribed callback, but outside of testing it matters less
sleep 0.05
@bus.publish("/foo", "2")
wait_for(100) do
got.length == 1
end
t.kill
got.map{|m| m.data}.should == ["2"]
end
it "should allow us to get last id on a channel" do
@bus.last_id("/foo").should == 0
@bus.publish("/foo", "1")
@bus.last_id("/foo").should == 1
end
end

View File

@ -1,16 +0,0 @@
RSpec.configure do |config|
config.color_enabled = true
end
def wait_for(timeout_milliseconds)
timeout = (timeout_milliseconds + 0.0) / 1000
finish = Time.now + timeout
t = Thread.new do
while Time.now < finish && !yield
sleep(0.001)
end
end
t.join
end