FIX: don't count users as replying long after they are done typing
Also, don't leak a large amount of redis usage for presence stuff
This commit is contained in:
parent
97ceebb570
commit
fd67508497
|
@ -166,7 +166,7 @@ GEM
|
||||||
mail (2.6.6)
|
mail (2.6.6)
|
||||||
mime-types (>= 1.16, < 4)
|
mime-types (>= 1.16, < 4)
|
||||||
memory_profiler (0.9.8)
|
memory_profiler (0.9.8)
|
||||||
message_bus (2.0.9)
|
message_bus (2.1.1)
|
||||||
rack (>= 1.1.3)
|
rack (>= 1.1.3)
|
||||||
metaclass (0.0.4)
|
metaclass (0.0.4)
|
||||||
method_source (0.8.2)
|
method_source (0.8.2)
|
||||||
|
|
|
@ -3,6 +3,9 @@ import { observes, on } from 'ember-addons/ember-computed-decorators';
|
||||||
import computed from 'ember-addons/ember-computed-decorators';
|
import computed from 'ember-addons/ember-computed-decorators';
|
||||||
import pageVisible from 'discourse/lib/page-visible';
|
import pageVisible from 'discourse/lib/page-visible';
|
||||||
|
|
||||||
|
export const keepAliveDuration = 10000;
|
||||||
|
const bufferTime = 3000;
|
||||||
|
|
||||||
export default Ember.Component.extend({
|
export default Ember.Component.extend({
|
||||||
composer: Ember.inject.controller(),
|
composer: Ember.inject.controller(),
|
||||||
|
|
||||||
|
@ -26,7 +29,17 @@ export default Ember.Component.extend({
|
||||||
|
|
||||||
@on('willDestroyElement')
|
@on('willDestroyElement')
|
||||||
composerClosing(){
|
composerClosing(){
|
||||||
this.updateStateObject(true);
|
this.updateStateObject({closing: true});
|
||||||
|
},
|
||||||
|
|
||||||
|
@observes('reply', 'title')
|
||||||
|
dataChanged() {
|
||||||
|
if (!this._dataChanged && (new Date() - this._lastPublish) > keepAliveDuration) {
|
||||||
|
this._dataChanged = true;
|
||||||
|
this.keepPresenceAlive();
|
||||||
|
} else {
|
||||||
|
this._dataChanged = true;
|
||||||
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
@observes('action', 'post', 'topic')
|
@observes('action', 'post', 'topic')
|
||||||
|
@ -34,7 +47,9 @@ export default Ember.Component.extend({
|
||||||
Ember.run.once(this, 'updateStateObject');
|
Ember.run.once(this, 'updateStateObject');
|
||||||
},
|
},
|
||||||
|
|
||||||
updateStateObject(isClosing = false){
|
updateStateObject(opts){
|
||||||
|
const isClosing = opts && opts.closing;
|
||||||
|
|
||||||
var stateObject = null;
|
var stateObject = null;
|
||||||
|
|
||||||
if(!isClosing && this.shouldSharePresence(this.get('action'))){
|
if(!isClosing && this.shouldSharePresence(this.get('action'))){
|
||||||
|
@ -73,14 +88,10 @@ export default Ember.Component.extend({
|
||||||
}
|
}
|
||||||
|
|
||||||
this.set('presenceUsers', []);
|
this.set('presenceUsers', []);
|
||||||
|
this.publish({
|
||||||
ajax('/presence/publish', {
|
response_needed: true,
|
||||||
type: 'POST',
|
previous: this.get('oldPresenceState'),
|
||||||
data: {
|
current: this.get('presenceState')
|
||||||
response_needed: true,
|
|
||||||
previous: this.get('oldPresenceState'),
|
|
||||||
current: this.get('presenceState')
|
|
||||||
}
|
|
||||||
}).then((data) => {
|
}).then((data) => {
|
||||||
const messageBusChannel = data['messagebus_channel'];
|
const messageBusChannel = data['messagebus_channel'];
|
||||||
if(messageBusChannel){
|
if(messageBusChannel){
|
||||||
|
@ -90,20 +101,42 @@ export default Ember.Component.extend({
|
||||||
this.set('messageBusChannel', messageBusChannel);
|
this.set('messageBusChannel', messageBusChannel);
|
||||||
this.messageBus.subscribe(messageBusChannel, message => {
|
this.messageBus.subscribe(messageBusChannel, message => {
|
||||||
this.set('presenceUsers', message['users']);
|
this.set('presenceUsers', message['users']);
|
||||||
|
this.timeoutPresence();
|
||||||
}, messageBusId);
|
}, messageBusId);
|
||||||
}
|
}
|
||||||
}).catch((error) => {
|
}).catch((error) => {
|
||||||
// This isn't a critical failure, so don't disturb the user
|
// This isn't a critical failure, so don't disturb the user
|
||||||
console.error("Error publishing composer status", error);
|
if (window.console && console.error) {
|
||||||
|
console.error("Error publishing composer status", error);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
Ember.run.cancel(this.get('keepAliveTimer'));
|
Ember.run.cancel(this.get('keepAliveTimer'));
|
||||||
if(this.shouldSharePresence(this.get('presenceState.action'))){
|
if(this.shouldSharePresence(this.get('presenceState.action'))){
|
||||||
// Send presence data every 10 seconds
|
// Send presence data every 10 seconds
|
||||||
this.set('keepAliveTimer', Ember.run.later(this, 'keepPresenceAlive', 10000));
|
this.set('keepAliveTimer', Ember.run.later(this, 'keepPresenceAlive', keepAliveDuration));
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
|
timeoutPresence() {
|
||||||
|
Ember.run.cancel(this._timeoutTimer);
|
||||||
|
this._timeoutTimer = Ember.run.later(
|
||||||
|
this,
|
||||||
|
() => { this.set("presenceUsers", []); },
|
||||||
|
keepAliveDuration + bufferTime
|
||||||
|
);
|
||||||
|
},
|
||||||
|
|
||||||
|
publish(data) {
|
||||||
|
this._lastPublish = new Date();
|
||||||
|
this._dataChanged = false;
|
||||||
|
|
||||||
|
return ajax('/presence/publish', {
|
||||||
|
type: 'POST',
|
||||||
|
data: data
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
keepPresenceAlive(){
|
keepPresenceAlive(){
|
||||||
// If we're not replying or editing,
|
// If we're not replying or editing,
|
||||||
// don't update anything, and don't schedule this task again
|
// don't update anything, and don't schedule this task again
|
||||||
|
@ -111,22 +144,26 @@ export default Ember.Component.extend({
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const browserInFocus = pageVisible();
|
if (this._dataChanged) {
|
||||||
|
this._dataChanged = false;
|
||||||
|
const browserInFocus = pageVisible();
|
||||||
|
|
||||||
// Only send the keepalive message if the browser has focus
|
// Only send the keepalive message if the browser has focus
|
||||||
if(browserInFocus){
|
if(browserInFocus){
|
||||||
ajax('/presence/publish', {
|
this.publish({
|
||||||
type: 'POST',
|
current: this.get('presenceState')
|
||||||
data: { current: this.get('presenceState') }
|
}).catch((error) => {
|
||||||
}).catch((error) => {
|
// This isn't a critical failure, so don't disturb the user
|
||||||
// This isn't a critical failure, so don't disturb the user
|
if (window.console && console.error) {
|
||||||
console.error("Error publishing composer status", error);
|
console.error("Error publishing composer status", error);
|
||||||
});
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Schedule again in another 10 seconds
|
// Schedule again in another 10 seconds
|
||||||
Ember.run.cancel(this.get('keepAliveTimer'));
|
Ember.run.cancel(this.get('keepAliveTimer'));
|
||||||
this.set('keepAliveTimer', Ember.run.later(this, 'keepPresenceAlive', 10000));
|
this.set('keepAliveTimer', Ember.run.later(this, 'keepPresenceAlive', keepAliveDuration));
|
||||||
},
|
},
|
||||||
|
|
||||||
@computed('presenceUsers', 'currentUser.id')
|
@computed('presenceUsers', 'currentUser.id')
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
import { ajax } from 'discourse/lib/ajax';
|
|
||||||
import { on } from 'ember-addons/ember-computed-decorators';
|
import { on } from 'ember-addons/ember-computed-decorators';
|
||||||
import computed from 'ember-addons/ember-computed-decorators';
|
import computed from 'ember-addons/ember-computed-decorators';
|
||||||
|
import { keepAliveDuration } from 'discourse/plugins/discourse-presence/discourse/components/composer-presence-display';
|
||||||
|
|
||||||
|
const bufferTime = 3000;
|
||||||
|
|
||||||
export default Ember.Component.extend({
|
export default Ember.Component.extend({
|
||||||
topicId: null,
|
topicId: null,
|
||||||
|
@ -11,22 +13,45 @@ export default Ember.Component.extend({
|
||||||
@on('didInsertElement')
|
@on('didInsertElement')
|
||||||
_inserted() {
|
_inserted() {
|
||||||
this.set("presenceUsers", []);
|
this.set("presenceUsers", []);
|
||||||
|
const messageBusChannel = `/presence/topic/${this.get('topicId')}`;
|
||||||
|
this.set('messageBusChannel', messageBusChannel);
|
||||||
|
|
||||||
ajax(`/presence/ping/${this.get("topicId")}`).then((data) => {
|
var firstMessage = true;
|
||||||
this.setProperties({
|
|
||||||
messageBusChannel: data.messagebus_channel,
|
this.messageBus.subscribe(messageBusChannel, message => {
|
||||||
presenceUsers: data.users,
|
|
||||||
});
|
let users = message.users;
|
||||||
this.messageBus.subscribe(data.messagebus_channel, message => {
|
|
||||||
this.set("presenceUsers", message.users);
|
// account for old messages,
|
||||||
}, data.messagebus_id);
|
// we only do this once to allow for some bad clocks
|
||||||
});
|
if (firstMessage) {
|
||||||
|
const old = ((new Date()) / 1000) - ((keepAliveDuration / 1000) * 2);
|
||||||
|
if (message.time && (message.time < old)) {
|
||||||
|
users = [];
|
||||||
|
}
|
||||||
|
firstMessage = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Em.run.cancel(this._expireTimer);
|
||||||
|
|
||||||
|
this.set("presenceUsers", users);
|
||||||
|
|
||||||
|
this._expireTimer = Em.run.later(
|
||||||
|
this,
|
||||||
|
() => {
|
||||||
|
this.set("presenceUsers", []);
|
||||||
|
},
|
||||||
|
keepAliveDuration + bufferTime
|
||||||
|
);
|
||||||
|
}, -2); /* subscribe at position -2 so we get last message */
|
||||||
},
|
},
|
||||||
|
|
||||||
@on('willDestroyElement')
|
@on('willDestroyElement')
|
||||||
_destroyed() {
|
_destroyed() {
|
||||||
if (this.get("messageBusChannel")) {
|
const channel = this.get("messageBusChannel");
|
||||||
this.messageBus.unsubscribe(this.get("messageBusChannel"));
|
if (channel) {
|
||||||
|
Em.run.cancel(this._expireTimer);
|
||||||
|
this.messageBus.unsubscribe(channel);
|
||||||
this.set("messageBusChannel", null);
|
this.set("messageBusChannel", null);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
|
@ -30,12 +30,17 @@ after_initialize do
|
||||||
|
|
||||||
def self.add(type, id, user_id)
|
def self.add(type, id, user_id)
|
||||||
# return true if a key was added
|
# return true if a key was added
|
||||||
$redis.hset(get_redis_key(type, id), user_id, Time.zone.now)
|
key = get_redis_key(type, id)
|
||||||
|
result = $redis.hset(key, user_id, Time.zone.now)
|
||||||
|
$redis.expire(key, 60)
|
||||||
|
result
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.remove(type, id, user_id)
|
def self.remove(type, id, user_id)
|
||||||
|
key = get_redis_key(type, id)
|
||||||
|
$redis.expire(key, 60)
|
||||||
# return true if a key was deleted
|
# return true if a key was deleted
|
||||||
$redis.hdel(get_redis_key(type, id), user_id) > 0
|
$redis.hdel(key, user_id) > 0
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.get_users(type, id)
|
def self.get_users(type, id)
|
||||||
|
@ -47,16 +52,16 @@ after_initialize do
|
||||||
def self.publish(type, id)
|
def self.publish(type, id)
|
||||||
users = get_users(type, id)
|
users = get_users(type, id)
|
||||||
serialized_users = users.map { |u| BasicUserSerializer.new(u, root: false) }
|
serialized_users = users.map { |u| BasicUserSerializer.new(u, root: false) }
|
||||||
message = { users: serialized_users }
|
message = { users: serialized_users, time: Time.now.to_i }
|
||||||
messagebus_channel = get_messagebus_channel(type, id)
|
messagebus_channel = get_messagebus_channel(type, id)
|
||||||
|
|
||||||
topic = type == 'post' ? Post.find_by(id: id).topic : Topic.find_by(id: id)
|
topic = type == 'post' ? Post.find_by(id: id).topic : Topic.find_by(id: id)
|
||||||
|
|
||||||
if topic.archetype == Archetype.private_message
|
if topic.archetype == Archetype.private_message
|
||||||
user_ids = User.where('admin OR moderator').pluck(:id) + topic.allowed_users.pluck(:id)
|
user_ids = User.where('admin OR moderator').pluck(:id) + topic.allowed_users.pluck(:id)
|
||||||
MessageBus.publish(messagebus_channel, message.as_json, user_ids: user_ids)
|
MessageBus.publish(messagebus_channel, message.as_json, user_ids: user_ids, max_backlog_age: 60)
|
||||||
else
|
else
|
||||||
MessageBus.publish(messagebus_channel, message.as_json, group_ids: topic.secure_group_ids)
|
MessageBus.publish(messagebus_channel, message.as_json, group_ids: topic.secure_group_ids, max_backlog_age: 60)
|
||||||
end
|
end
|
||||||
|
|
||||||
users
|
users
|
||||||
|
@ -104,9 +109,9 @@ after_initialize do
|
||||||
if topic
|
if topic
|
||||||
guardian.ensure_can_see!(topic)
|
guardian.ensure_can_see!(topic)
|
||||||
|
|
||||||
removed = Presence::PresenceManager.remove(type, id, current_user.id)
|
_removed = Presence::PresenceManager.remove(type, id, current_user.id)
|
||||||
cleaned = Presence::PresenceManager.cleanup(type, id)
|
cleaned = Presence::PresenceManager.cleanup(type, id)
|
||||||
users = Presence::PresenceManager.publish(type, id) if removed || cleaned
|
users = Presence::PresenceManager.publish(type, id)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -119,9 +124,9 @@ after_initialize do
|
||||||
if topic
|
if topic
|
||||||
guardian.ensure_can_see!(topic)
|
guardian.ensure_can_see!(topic)
|
||||||
|
|
||||||
added = Presence::PresenceManager.add(type, id, current_user.id)
|
_added = Presence::PresenceManager.add(type, id, current_user.id)
|
||||||
cleaned = Presence::PresenceManager.cleanup(type, id)
|
cleaned = Presence::PresenceManager.cleanup(type, id)
|
||||||
users = Presence::PresenceManager.publish(type, id) if added || cleaned
|
users = Presence::PresenceManager.publish(type, id)
|
||||||
|
|
||||||
if data[:response_needed]
|
if data[:response_needed]
|
||||||
messagebus_channel = Presence::PresenceManager.get_messagebus_channel(type, id)
|
messagebus_channel = Presence::PresenceManager.get_messagebus_channel(type, id)
|
||||||
|
@ -134,17 +139,6 @@ after_initialize do
|
||||||
render json: payload
|
render json: payload
|
||||||
end
|
end
|
||||||
|
|
||||||
def ping
|
|
||||||
topic_id = params.require(:topic_id)
|
|
||||||
|
|
||||||
Presence::PresenceManager.cleanup("topic", topic_id)
|
|
||||||
|
|
||||||
messagebus_channel = Presence::PresenceManager.get_messagebus_channel("topic", topic_id)
|
|
||||||
users = Presence::PresenceManager.get_users("topic", topic_id)
|
|
||||||
|
|
||||||
render json: json_payload(messagebus_channel, users)
|
|
||||||
end
|
|
||||||
|
|
||||||
def json_payload(channel, users)
|
def json_payload(channel, users)
|
||||||
{
|
{
|
||||||
messagebus_channel: channel,
|
messagebus_channel: channel,
|
||||||
|
@ -157,7 +151,6 @@ after_initialize do
|
||||||
|
|
||||||
Presence::Engine.routes.draw do
|
Presence::Engine.routes.draw do
|
||||||
post '/publish' => 'presences#publish'
|
post '/publish' => 'presences#publish'
|
||||||
get '/ping/:topic_id' => 'presences#ping'
|
|
||||||
end
|
end
|
||||||
|
|
||||||
Discourse::Application.routes.append do
|
Discourse::Application.routes.append do
|
||||||
|
|
|
@ -85,7 +85,7 @@ describe ::Presence::PresencesController do
|
||||||
expect(data).to eq({})
|
expect(data).to eq({})
|
||||||
end
|
end
|
||||||
|
|
||||||
it "doesn't send duplicate messagebus messages" do
|
it "does send duplicate messagebus messages" do
|
||||||
messages = MessageBus.track_publish do
|
messages = MessageBus.track_publish do
|
||||||
post '/presence/publish.json', params: {
|
post '/presence/publish.json', params: {
|
||||||
current: { compose_state: 'open', action: 'edit', post_id: post1.id }
|
current: { compose_state: 'open', action: 'edit', post_id: post1.id }
|
||||||
|
@ -100,7 +100,8 @@ describe ::Presence::PresencesController do
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
||||||
expect(messages.count).to eq(0)
|
# we do this cause we also publish time
|
||||||
|
expect(messages.count).to eq(1)
|
||||||
end
|
end
|
||||||
|
|
||||||
it "clears 'previous' state when supplied" do
|
it "clears 'previous' state when supplied" do
|
||||||
|
|
Loading…
Reference in New Issue