Nuke message_bus_observer move to service class and classes
Secure all messages triggered by post creation and all user actions so they don't leak (meaning, if you have a browser open and secure topics are created you will only get them if you are allowed to see them)
This commit is contained in:
parent
bae2d252fa
commit
e9ebadb414
|
@ -1,49 +0,0 @@
|
|||
require_dependency 'discourse_observer'
|
||||
|
||||
# This class is responsible for notifying the message bus of various
|
||||
# events.
|
||||
class MessageBusObserver < DiscourseObserver
|
||||
observe :notification, :user_action, :topic
|
||||
|
||||
def after_create_notification(notification)
|
||||
refresh_notification_count(notification)
|
||||
end
|
||||
|
||||
def after_destroy_notification(notification)
|
||||
refresh_notification_count(notification)
|
||||
end
|
||||
|
||||
def after_create_user_action(user_action)
|
||||
MessageBus.publish("/users/#{user_action.user.username.downcase}", user_action.id)
|
||||
end
|
||||
|
||||
def after_create_topic(topic)
|
||||
|
||||
# Don't publish invisible topics
|
||||
return unless topic.visible?
|
||||
|
||||
return if topic.private_message?
|
||||
|
||||
topic.posters = topic.posters_summary
|
||||
topic.posts_count = 1
|
||||
topic_json = TopicListItemSerializer.new(topic).as_json
|
||||
MessageBus.publish("/latest", topic_json)
|
||||
|
||||
# If it has a category, add it to the category views too
|
||||
if topic.category.present?
|
||||
MessageBus.publish("/category/#{topic.category.slug}", topic_json)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
protected
|
||||
|
||||
def refresh_notification_count(notification)
|
||||
user_id = notification.user.id
|
||||
MessageBus.publish("/notification/#{user_id}",
|
||||
{unread_notifications: notification.user.unread_notifications,
|
||||
unread_private_messages: notification.user.unread_private_messages},
|
||||
user_ids: [user_id] # only publish the notification to this user
|
||||
)
|
||||
end
|
||||
end
|
|
@ -10,6 +10,9 @@ class Notification < ActiveRecord::Base
|
|||
scope :unread, lambda { where(read: false) }
|
||||
scope :recent, lambda { order('created_at desc').limit(10) }
|
||||
|
||||
after_save :refresh_notification_count
|
||||
after_destroy :refresh_notification_count
|
||||
|
||||
def self.types
|
||||
@types ||= Enum.new(
|
||||
:mentioned, :replied, :quoted, :edited, :liked, :private_message,
|
||||
|
@ -74,5 +77,18 @@ class Notification < ActiveRecord::Base
|
|||
|
||||
Post.where(topic_id: topic_id, post_number: post_number).first
|
||||
end
|
||||
|
||||
|
||||
protected
|
||||
|
||||
def refresh_notification_count
|
||||
user_id = user.id
|
||||
MessageBus.publish("/notification/#{user_id}",
|
||||
{unread_notifications: user.unread_notifications,
|
||||
unread_private_messages: user.unread_private_messages},
|
||||
user_ids: [user_id] # only publish the notification to this user
|
||||
)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
|
|
|
@ -150,6 +150,19 @@ LEFT JOIN categories c on c.id = t.category_id
|
|||
User.update_all('likes_received = likes_received + 1', id: user_id)
|
||||
end
|
||||
|
||||
topic = Topic.includes(:category).where(id: hash[:target_topic_id]).first
|
||||
|
||||
# move into Topic perhaps
|
||||
group_ids = nil
|
||||
if topic && topic.category && topic.category.secure
|
||||
group_ids = topic.category.groups.select("groups.id").map{|g| g.id}
|
||||
end
|
||||
|
||||
MessageBus.publish("/users/#{action.user.username.downcase}",
|
||||
action.id,
|
||||
user_ids: [user_id],
|
||||
group_ids: group_ids )
|
||||
|
||||
rescue ActiveRecord::RecordNotUnique
|
||||
# can happen, don't care already logged
|
||||
raise ActiveRecord::Rollback
|
||||
|
|
|
@ -55,7 +55,6 @@ module Discourse
|
|||
config.active_record.observers = [
|
||||
:user_email_observer,
|
||||
:user_action_observer,
|
||||
:message_bus_observer,
|
||||
:post_alert_observer,
|
||||
:search_observer
|
||||
]
|
||||
|
|
|
@ -7,6 +7,11 @@ MessageBus.user_id_lookup do |env|
|
|||
user.id if user
|
||||
end
|
||||
|
||||
MessageBus.group_ids_lookup do |env|
|
||||
user = CurrentUser.lookup_from_env(env)
|
||||
user.groups.select('groups.id').map{|g| g.id} if user
|
||||
end
|
||||
|
||||
MessageBus.on_connect do |site_id|
|
||||
RailsMultisite::ConnectionManagement.establish_connection(db: site_id)
|
||||
end
|
||||
|
|
|
@ -12,7 +12,6 @@ class Autospec::Runner
|
|||
|
||||
watch(%r{^spec/.+_spec\.rb$})
|
||||
watch(%r{^lib/(.+)\.rb$}) { |m| "spec/components/#{m[1]}_spec.rb" }
|
||||
watch('spec/spec_helper.rb') { "spec" }
|
||||
|
||||
# Rails example
|
||||
watch(%r{^app/(.+)\.rb$}) { |m| "spec/#{m[1]}_spec.rb" }
|
||||
|
@ -24,6 +23,14 @@ class Autospec::Runner
|
|||
# Capybara request specs
|
||||
watch(%r{^app/views/(.+)/.*\.(erb|haml)$}) { |m| "spec/requests/#{m[1]}_spec.rb" }
|
||||
|
||||
RELOAD_MATCHERS = Set.new
|
||||
def self.watch_reload(pattern)
|
||||
RELOAD_MATCHERS << pattern
|
||||
end
|
||||
|
||||
watch_reload('spec/spec_helper.rb')
|
||||
watch_reload('config/(.*).rb')
|
||||
|
||||
|
||||
def self.run(opts={})
|
||||
self.new.run(opts)
|
||||
|
@ -156,6 +163,15 @@ class Autospec::Runner
|
|||
specs = []
|
||||
hit = false
|
||||
files.each do |file|
|
||||
RELOAD_MATCHERS.each do |k|
|
||||
if k.match(file)
|
||||
spork_service.abort
|
||||
stop_spork
|
||||
sleep 1
|
||||
start_spork
|
||||
return
|
||||
end
|
||||
end
|
||||
MATCHERS.each do |k,v|
|
||||
if m = k.match(file)
|
||||
hit = true
|
||||
|
|
|
@ -41,9 +41,132 @@ class PostCreator
|
|||
def create
|
||||
topic = nil
|
||||
post = nil
|
||||
new_topic = false
|
||||
|
||||
Post.transaction do
|
||||
if @opts[:topic_id].blank?
|
||||
topic = create_topic
|
||||
new_topic = true
|
||||
else
|
||||
topic = Topic.where(id: @opts[:topic_id]).first
|
||||
guardian.ensure_can_create!(Post, topic)
|
||||
end
|
||||
|
||||
post = topic.posts.new(raw: @opts[:raw],
|
||||
user: @user,
|
||||
reply_to_post_number: @opts[:reply_to_post_number])
|
||||
|
||||
post.post_type = @opts[:post_type] if @opts[:post_type].present?
|
||||
post.no_bump = @opts[:no_bump] if @opts[:no_bump].present?
|
||||
post.extract_quoted_post_numbers
|
||||
post.acting_user = @opts[:acting_user] if @opts[:acting_user].present?
|
||||
|
||||
post.image_sizes = @opts[:image_sizes] if @opts[:image_sizes].present?
|
||||
post.invalidate_oneboxes = @opts[:invalidate_oneboxes] if @opts[:invalidate_oneboxes].present?
|
||||
unless post.save
|
||||
@errors = post.errors
|
||||
raise ActiveRecord::Rollback.new
|
||||
end
|
||||
|
||||
# Extract links
|
||||
TopicLink.extract_from(post)
|
||||
|
||||
# Store unique post key
|
||||
if SiteSetting.unique_posts_mins > 0
|
||||
$redis.setex(post.unique_post_key, SiteSetting.unique_posts_mins.minutes.to_i, "1")
|
||||
end
|
||||
|
||||
# send a mail to notify users in case of a private message
|
||||
if topic.private_message?
|
||||
topic.allowed_users.where(["users.email_private_messages = true and users.id != ?", @user.id]).each do |u|
|
||||
Jobs.enqueue_in(SiteSetting.email_time_window_mins.minutes,
|
||||
:user_email,
|
||||
type: :private_message,
|
||||
user_id: u.id,
|
||||
post_id: post.id
|
||||
)
|
||||
end
|
||||
|
||||
clear_possible_flags(topic) if post.post_number > 1 && topic.user_id != post.user_id
|
||||
end
|
||||
|
||||
# Track the topic
|
||||
TopicUser.auto_track(@user.id, topic.id, TopicUser.notification_reasons[:created_post])
|
||||
|
||||
# We don't count replies to your own topics
|
||||
if @user.id != topic.user_id
|
||||
@user.update_topic_reply_count
|
||||
end
|
||||
|
||||
@user.last_posted_at = post.created_at
|
||||
@user.save!
|
||||
|
||||
if post.post_number > 1
|
||||
MessageBus.publish("/topic/#{post.topic_id}",{
|
||||
id: post.id,
|
||||
created_at: post.created_at,
|
||||
user: BasicUserSerializer.new(post.user).as_json(root: false),
|
||||
post_number: post.post_number
|
||||
},
|
||||
group_ids: secure_group_ids(topic)
|
||||
)
|
||||
end
|
||||
|
||||
# Advance the draft sequence
|
||||
post.advance_draft_sequence
|
||||
|
||||
# Save the quote relationships
|
||||
post.save_reply_relationships
|
||||
end
|
||||
|
||||
# We need to enqueue jobs after the transaction. Otherwise they might begin before the data has
|
||||
# been comitted.
|
||||
topic_id = @opts[:topic_id] || topic.try(:id)
|
||||
Jobs.enqueue(:feature_topic_users, topic_id: topic.id) if topic_id.present?
|
||||
if post
|
||||
post.trigger_post_process
|
||||
after_topic_create(topic) if new_topic
|
||||
end
|
||||
|
||||
post
|
||||
end
|
||||
|
||||
|
||||
# Shortcut
|
||||
def self.create(user, opts)
|
||||
PostCreator.new(user, opts).create
|
||||
end
|
||||
|
||||
protected
|
||||
|
||||
def secure_group_ids(topic)
|
||||
@secure_group_ids ||= if topic.category && topic.category.secure?
|
||||
topic.category.groups.select("groups.id").map{|g| g.id}
|
||||
end
|
||||
end
|
||||
|
||||
def after_topic_create(topic)
|
||||
|
||||
# Don't publish invisible topics
|
||||
return unless topic.visible?
|
||||
|
||||
return if topic.private_message?
|
||||
|
||||
topic.posters = topic.posters_summary
|
||||
topic.posts_count = 1
|
||||
topic_json = TopicListItemSerializer.new(topic).as_json
|
||||
|
||||
group_ids = secure_group_ids(topic)
|
||||
|
||||
MessageBus.publish("/latest", topic_json, group_ids: group_ids)
|
||||
|
||||
# If it has a category, add it to the category views too
|
||||
if topic.category
|
||||
MessageBus.publish("/category/#{topic.category.slug}", topic_json, group_ids: group_ids)
|
||||
end
|
||||
end
|
||||
|
||||
def create_topic
|
||||
topic_params = {title: @opts[:title], user_id: @user.id, last_post_user_id: @user.id}
|
||||
topic_params[:archetype] = @opts[:archetype] if @opts[:archetype].present?
|
||||
topic_params[:subtype] = @opts[:subtype] if @opts[:subtype].present?
|
||||
|
@ -80,86 +203,10 @@ class PostCreator
|
|||
@errors = topic.errors
|
||||
raise ActiveRecord::Rollback.new
|
||||
end
|
||||
else
|
||||
topic = Topic.where(id: @opts[:topic_id]).first
|
||||
guardian.ensure_can_create!(Post, topic)
|
||||
|
||||
topic
|
||||
end
|
||||
|
||||
post = topic.posts.new(raw: @opts[:raw],
|
||||
user: @user,
|
||||
reply_to_post_number: @opts[:reply_to_post_number])
|
||||
|
||||
post.post_type = @opts[:post_type] if @opts[:post_type].present?
|
||||
post.no_bump = @opts[:no_bump] if @opts[:no_bump].present?
|
||||
post.extract_quoted_post_numbers
|
||||
post.acting_user = @opts[:acting_user] if @opts[:acting_user].present?
|
||||
|
||||
post.image_sizes = @opts[:image_sizes] if @opts[:image_sizes].present?
|
||||
post.invalidate_oneboxes = @opts[:invalidate_oneboxes] if @opts[:invalidate_oneboxes].present?
|
||||
unless post.save
|
||||
@errors = post.errors
|
||||
raise ActiveRecord::Rollback.new
|
||||
end
|
||||
|
||||
# Extract links
|
||||
TopicLink.extract_from(post)
|
||||
|
||||
# Store unique post key
|
||||
if SiteSetting.unique_posts_mins > 0
|
||||
$redis.setex(post.unique_post_key, SiteSetting.unique_posts_mins.minutes.to_i, "1")
|
||||
end
|
||||
|
||||
# send a mail to notify users in case of a private message
|
||||
if topic.private_message?
|
||||
topic.allowed_users.where(["users.email_private_messages = true and users.id != ?", @user.id]).each do |u|
|
||||
Jobs.enqueue_in(SiteSetting.email_time_window_mins.minutes, :user_email, type: :private_message, user_id: u.id, post_id: post.id)
|
||||
end
|
||||
|
||||
clear_possible_flags(topic) if post.post_number > 1 && topic.user_id != post.user_id
|
||||
end
|
||||
|
||||
# Track the topic
|
||||
TopicUser.auto_track(@user.id, topic.id, TopicUser.notification_reasons[:created_post])
|
||||
|
||||
# We don't count replies to your own topics
|
||||
if @user.id != topic.user_id
|
||||
@user.update_topic_reply_count
|
||||
end
|
||||
|
||||
@user.last_posted_at = post.created_at
|
||||
@user.save!
|
||||
|
||||
# Publish the post in the message bus
|
||||
MessageBus.publish("/topic/#{post.topic_id}",
|
||||
id: post.id,
|
||||
created_at: post.created_at,
|
||||
user: BasicUserSerializer.new(post.user).as_json(root: false),
|
||||
post_number: post.post_number)
|
||||
|
||||
# Advance the draft sequence
|
||||
post.advance_draft_sequence
|
||||
|
||||
# Save the quote relationships
|
||||
post.save_reply_relationships
|
||||
end
|
||||
|
||||
# We need to enqueue jobs after the transaction. Otherwise they might begin before the data has
|
||||
# been comitted.
|
||||
topic_id = @opts[:topic_id] || topic.try(:id)
|
||||
Jobs.enqueue(:feature_topic_users, topic_id: topic.id) if topic_id.present?
|
||||
post.trigger_post_process if post.present?
|
||||
|
||||
post
|
||||
end
|
||||
|
||||
|
||||
# Shortcut
|
||||
def self.create(user, opts)
|
||||
PostCreator.new(user, opts).create
|
||||
end
|
||||
|
||||
protected
|
||||
|
||||
def clear_possible_flags(topic)
|
||||
# at this point we know the topic is a PM and has been replied to ... check if we need to clear any flags
|
||||
#
|
||||
|
|
|
@ -31,12 +31,53 @@ describe PostCreator do
|
|||
end
|
||||
|
||||
context 'success' do
|
||||
it 'creates a topic' do
|
||||
lambda { creator.create }.should change(Topic, :count).by(1)
|
||||
|
||||
it 'generates the correct messages for a secure topic' do
|
||||
|
||||
admin = Fabricate(:admin)
|
||||
|
||||
cat = Fabricate(:category)
|
||||
cat.deny(:all)
|
||||
cat.allow(Group[:admins])
|
||||
cat.save
|
||||
|
||||
created_post = nil
|
||||
reply = nil
|
||||
|
||||
messages = MessageBus.track_publish do
|
||||
created_post = PostCreator.new(admin, basic_topic_params.merge(category: cat.name)).create
|
||||
reply = PostCreator.new(admin, raw: 'this is my test reply 123 testing', topic_id: created_post.topic_id).create
|
||||
end
|
||||
|
||||
it 'returns a post' do
|
||||
creator.create.is_a?(Post).should be_true
|
||||
topic_id = created_post.topic_id
|
||||
|
||||
|
||||
messages.map{|m| m.channel}.sort.should == [ "/latest",
|
||||
"/users/#{admin.username}",
|
||||
"/users/#{admin.username}",
|
||||
"/topic/#{created_post.topic_id}",
|
||||
"/category/#{cat.slug}"
|
||||
].sort
|
||||
admin_ids = [Group[:admins].id]
|
||||
messages.any?{|m| m.group_ids != admin_ids}.should be_false
|
||||
|
||||
end
|
||||
|
||||
it 'generates the correct messages for a normal topic' do
|
||||
|
||||
p = nil
|
||||
messages = MessageBus.track_publish do
|
||||
p = creator.create
|
||||
topic_id = p.topic_id
|
||||
end
|
||||
|
||||
latest = messages.find{|m| m.channel == "/latest"}
|
||||
latest.should_not be_nil
|
||||
|
||||
user_action = messages.find{|m| m.channel == "/users/#{p.user.username}"}
|
||||
user_action.should_not be_nil
|
||||
|
||||
messages.length.should == 2
|
||||
end
|
||||
|
||||
it 'extracts links from the post' do
|
||||
|
@ -44,20 +85,8 @@ describe PostCreator do
|
|||
creator.create
|
||||
end
|
||||
|
||||
it 'enqueues the post on the message bus' do
|
||||
MessageBus.stubs(:publish).with("/users/#{user.username}", anything)
|
||||
MessageBus.expects(:publish).with("/topic/#{topic.id}", instance_of(Hash))
|
||||
PostCreator.new(user, raw: basic_topic_params[:raw], topic_id: topic.id)
|
||||
end
|
||||
|
||||
it 'features topic users' do
|
||||
Jobs.stubs(:enqueue).with(:process_post, anything)
|
||||
Jobs.expects(:enqueue).with(:feature_topic_users, has_key(:topic_id))
|
||||
creator.create
|
||||
end
|
||||
|
||||
it 'queues up post processing job when saved' do
|
||||
Jobs.stubs(:enqueue).with(:feature_topic_users, has_key(:topic_id))
|
||||
Jobs.expects(:enqueue).with(:feature_topic_users, has_key(:topic_id))
|
||||
Jobs.expects(:enqueue).with(:process_post, has_key(:post_id))
|
||||
creator.create
|
||||
end
|
||||
|
|
|
@ -1,18 +0,0 @@
|
|||
require 'spec_helper'
|
||||
|
||||
describe MessageBusObserver do
|
||||
|
||||
context 'after create topic' do
|
||||
|
||||
after do
|
||||
@topic = Fabricate(:topic)
|
||||
end
|
||||
|
||||
it 'publishes the topic to the list' do
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
|
||||
end
|
|
@ -90,7 +90,7 @@ describe Notification do
|
|||
describe 'message bus' do
|
||||
|
||||
it 'updates the notification count on create' do
|
||||
MessageBusObserver.any_instance.expects(:refresh_notification_count).with(instance_of(Notification))
|
||||
Notification.any_instance.expects(:refresh_notification_count).returns(nil)
|
||||
Fabricate(:notification)
|
||||
end
|
||||
|
||||
|
@ -99,7 +99,7 @@ describe Notification do
|
|||
let!(:notification) { Fabricate(:notification) }
|
||||
|
||||
it 'updates the notification count on destroy' do
|
||||
MessageBusObserver.any_instance.expects(:refresh_notification_count).with(instance_of(Notification))
|
||||
Notification.any_instance.expects(:refresh_notification_count).returns(nil)
|
||||
notification.destroy
|
||||
end
|
||||
|
||||
|
|
|
@ -180,14 +180,6 @@ describe Topic do
|
|||
|
||||
end
|
||||
|
||||
context 'message bus' do
|
||||
it 'calls the message bus observer after create' do
|
||||
ActiveRecord::Base.observers.enable :all
|
||||
MessageBusObserver.any_instance.expects(:after_create_topic).with(instance_of(Topic))
|
||||
Fabricate(:topic)
|
||||
end
|
||||
end
|
||||
|
||||
context 'post_numbers' do
|
||||
let!(:topic) { Fabricate(:topic) }
|
||||
let!(:p1) { Fabricate(:post, topic: topic, user: topic.user) }
|
||||
|
|
|
@ -91,11 +91,6 @@ describe UserAction do
|
|||
end
|
||||
end
|
||||
|
||||
it 'calls the message bus observer' do
|
||||
MessageBusObserver.any_instance.expects(:after_create_user_action).with(instance_of(UserAction))
|
||||
Fabricate(:user_action)
|
||||
end
|
||||
|
||||
describe 'when user likes' do
|
||||
|
||||
let!(:post) { Fabricate(:post) }
|
||||
|
|
|
@ -119,6 +119,30 @@ def build(*args)
|
|||
Fabricate.build(*args)
|
||||
end
|
||||
|
||||
module MessageBus::DiagnosticsHelper
|
||||
def publish(channel, data, opts = nil)
|
||||
id = super(channel, data, opts)
|
||||
if @tracking
|
||||
m = MessageBus::Message.new(-1, id, channel, data)
|
||||
m.user_ids = opts[:user_ids] if opts
|
||||
m.group_ids = opts[:group_ids] if opts
|
||||
@tracking << m
|
||||
end
|
||||
id
|
||||
end
|
||||
|
||||
def track_publish
|
||||
@tracking = tracking = []
|
||||
yield
|
||||
@tracking = nil
|
||||
tracking
|
||||
end
|
||||
end
|
||||
|
||||
module MessageBus
|
||||
extend MessageBus::DiagnosticsHelper
|
||||
end
|
||||
|
||||
# --- Instructions ---
|
||||
# Sort the contents of this file into a Spork.prefork and a Spork.each_run
|
||||
# block.
|
||||
|
|
Loading…
Reference in New Issue