FIX: IMAP sync email update uniqueness across groups and minor improvements (#10332)
Adds a imap_group_id column to IncomingEmail to deal with an issue where we were trying to update emails in the mailbox, calling IncomingEmail.where(imap_sync: true). However UID and UIDVALIDITY could be the same across accounts. So if group A used IMAP details for Gmail account A, and group B used IMAP details for Gmail account B, and both tried to sync changes to an email with UID of 3 (e.g. changing Labels), one account could affect the other. This even applied to Archiving! Also in this PR: * Fix error occurring if we do a uid_fetch and no emails are returned * Allow for creating labels within the target mailbox (previously we would not do this, only use existing labels) * Improve consistency for log messages * Add specs for generic IMAP provider (Gmail specs still to come) * Add custom archiving support for Gmail * Only use Message-ID for uniqueness of IncomingEmail if it was generated by us * Various refactors and improvements
This commit is contained in:
parent
8a9e4504fe
commit
2920988b3a
|
@ -4,6 +4,7 @@ class IncomingEmail < ActiveRecord::Base
|
|||
belongs_to :user
|
||||
belongs_to :topic
|
||||
belongs_to :post
|
||||
belongs_to :group, foreign_key: :imap_group_id, class_name: 'Group'
|
||||
|
||||
scope :errored, -> { where("NOT is_bounce AND error IS NOT NULL") }
|
||||
|
||||
|
@ -52,13 +53,15 @@ end
|
|||
# imap_uid_validity :integer
|
||||
# imap_uid :integer
|
||||
# imap_sync :boolean
|
||||
# imap_group_id :bigint
|
||||
#
|
||||
# Indexes
|
||||
#
|
||||
# index_incoming_emails_on_created_at (created_at)
|
||||
# index_incoming_emails_on_error (error)
|
||||
# index_incoming_emails_on_imap_sync (imap_sync)
|
||||
# index_incoming_emails_on_message_id (message_id)
|
||||
# index_incoming_emails_on_post_id (post_id)
|
||||
# index_incoming_emails_on_user_id (user_id) WHERE (user_id IS NOT NULL)
|
||||
# index_incoming_emails_on_created_at (created_at)
|
||||
# index_incoming_emails_on_error (error)
|
||||
# index_incoming_emails_on_imap_group_id (imap_group_id)
|
||||
# index_incoming_emails_on_imap_sync (imap_sync)
|
||||
# index_incoming_emails_on_message_id (message_id)
|
||||
# index_incoming_emails_on_post_id (post_id)
|
||||
# index_incoming_emails_on_user_id (user_id) WHERE (user_id IS NOT NULL)
|
||||
#
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class AddImapGroupIdToIncomingEmail < ActiveRecord::Migration[6.0]
|
||||
disable_ddl_transaction!
|
||||
|
||||
def up
|
||||
execute <<~SQL
|
||||
ALTER TABLE incoming_emails ADD COLUMN IF NOT EXISTS imap_group_id bigint NULL
|
||||
SQL
|
||||
|
||||
execute <<~SQL
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS
|
||||
index_incoming_emails_on_imap_group_id ON incoming_emails USING btree (imap_group_id)
|
||||
SQL
|
||||
end
|
||||
|
||||
def down
|
||||
execute <<~SQL
|
||||
ALTER TABLE incoming_emails DROP COLUMN IF EXISTS imap_group_id
|
||||
SQL
|
||||
end
|
||||
end
|
|
@ -23,6 +23,7 @@ class Demon::EmailSync < ::Demon::Base
|
|||
def start_thread(db, group)
|
||||
Thread.new do
|
||||
RailsMultisite::ConnectionManagement.with_connection(db) do
|
||||
puts "[EmailSync] Thread started for group #{group.name} (id = #{group.id}) in db #{db}"
|
||||
begin
|
||||
obj = Imap::Sync.for_group(group)
|
||||
rescue Net::IMAP::NoResponseError => e
|
||||
|
@ -36,12 +37,15 @@ class Demon::EmailSync < ::Demon::Base
|
|||
idle = false
|
||||
|
||||
while @running && group.reload.imap_mailbox_name.present? do
|
||||
puts "[EmailSync] Processing IMAP mailbox for group #{group.name} (id = #{group.id}) in db #{db}"
|
||||
status = obj.process(
|
||||
idle: obj.can_idle? && status && status[:remaining] == 0,
|
||||
old_emails_limit: status && status[:remaining] > 0 ? 0 : nil,
|
||||
)
|
||||
|
||||
if !obj.can_idle? && status[:remaining] == 0
|
||||
puts "[EmailSync] Going to sleep for group #{group.name} (id = #{group.id}) in db #{db} to wait for new emails."
|
||||
|
||||
# Thread goes into sleep for a bit so it is better to return any
|
||||
# connection back to the pool.
|
||||
ActiveRecord::Base.connection_handler.clear_active_connections!
|
||||
|
@ -74,14 +78,14 @@ class Demon::EmailSync < ::Demon::Base
|
|||
end
|
||||
|
||||
def after_fork
|
||||
puts "Loading EmailSync in process id #{Process.pid}"
|
||||
puts "[EmailSync] Loading EmailSync in process id #{Process.pid}"
|
||||
|
||||
loop do
|
||||
break if Discourse.redis.set(HEARTBEAT_KEY, Time.now.to_i, ex: HEARTBEAT_INTERVAL, nx: true)
|
||||
sleep HEARTBEAT_INTERVAL
|
||||
end
|
||||
|
||||
puts "Starting EmailSync main thread"
|
||||
puts "[EmailSync] Starting EmailSync main thread"
|
||||
|
||||
@running = true
|
||||
@sync_data = {}
|
||||
|
@ -122,7 +126,7 @@ class Demon::EmailSync < ::Demon::Base
|
|||
if !groups[group_id]
|
||||
puts("[EmailSync] Killing thread for group (id = #{group_id}) because mailbox is no longer synced")
|
||||
else
|
||||
puts("[EmailSync] Thread for group #{groups[group_id].name} is dead")
|
||||
puts("[EmailSync] Thread for group #{groups[group_id].name} (id = #{group_id}) is dead")
|
||||
end
|
||||
|
||||
data[:thread].kill
|
||||
|
@ -135,8 +139,10 @@ class Demon::EmailSync < ::Demon::Base
|
|||
# Spawn new threads for groups that are now synchronized.
|
||||
groups.each do |group_id, group|
|
||||
if !@sync_data[db][group_id]
|
||||
puts("[EmailSync] Starting thread for group #{group.name} and mailbox #{group.imap_mailbox_name}")
|
||||
@sync_data[db][group_id] = { thread: start_thread(db, group), obj: nil }
|
||||
puts("[EmailSync] Starting thread for group #{group.name} (id = #{group.id}) and mailbox #{group.imap_mailbox_name}")
|
||||
@sync_data[db][group_id] = {
|
||||
thread: start_thread(db, group), obj: nil
|
||||
}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -66,11 +66,13 @@ module Email
|
|||
id_hash = Digest::SHA1.hexdigest(@message_id)
|
||||
DistributedMutex.synchronize("process_email_#{id_hash}") do
|
||||
begin
|
||||
@incoming_email = IncomingEmail.find_by(message_id: @message_id)
|
||||
if @incoming_email
|
||||
@incoming_email.update(imap_uid_validity: @opts[:uid_validity], imap_uid: @opts[:uid], imap_sync: false)
|
||||
return
|
||||
end
|
||||
|
||||
# if we find an existing incoming email record with the
|
||||
# exact same message id, be sure to update it with the correct IMAP
|
||||
# metadata based on sync. this is so we do not double-create emails.
|
||||
@incoming_email = find_existing_and_update_imap
|
||||
return if @incoming_email
|
||||
|
||||
ensure_valid_address_lists
|
||||
ensure_valid_date
|
||||
@from_email, @from_display_name = parse_from_field
|
||||
|
@ -89,6 +91,32 @@ module Email
|
|||
end
|
||||
end
|
||||
|
||||
def find_existing_and_update_imap
|
||||
incoming_email = IncomingEmail.find_by(message_id: @message_id)
|
||||
|
||||
# if we are not doing this for IMAP purposes, then we do not want
|
||||
# to double-process the same Message-ID
|
||||
if @opts[:imap_uid].blank?
|
||||
return incoming_email
|
||||
end
|
||||
|
||||
return if !incoming_email
|
||||
|
||||
# if the message_id matches the post id regexp then we
|
||||
# generated the message_id not the imap server, e.g. in GroupSmtpEmail,
|
||||
# so we want to just update the incoming email. Otherwise the
|
||||
# incoming email is a completely new one from the IMAP server.
|
||||
return if (@message_id =~ message_id_post_id_regexp).nil?
|
||||
|
||||
incoming_email.update(
|
||||
imap_uid_validity: @opts[:imap_uid_validity],
|
||||
imap_uid: @opts[:imap_uid],
|
||||
imap_group_id: @opts[:imap_group_id],
|
||||
imap_sync: false
|
||||
)
|
||||
incoming_email
|
||||
end
|
||||
|
||||
def ensure_valid_address_lists
|
||||
[:to, :cc, :bcc].each do |field|
|
||||
addresses = @mail[field]
|
||||
|
@ -118,8 +146,9 @@ module Email
|
|||
from_address: @from_email,
|
||||
to_addresses: @mail.to&.map(&:downcase)&.join(";"),
|
||||
cc_addresses: @mail.cc&.map(&:downcase)&.join(";"),
|
||||
imap_uid_validity: @opts[:uid_validity],
|
||||
imap_uid: @opts[:uid],
|
||||
imap_uid_validity: @opts[:imap_uid_validity],
|
||||
imap_uid: @opts[:imap_uid],
|
||||
imap_group_id: @opts[:imap_group_id],
|
||||
imap_sync: false
|
||||
)
|
||||
end
|
||||
|
@ -913,12 +942,8 @@ module Email
|
|||
message_ids = Email::Receiver.extract_reply_message_ids(@mail, max_message_id_count: 5)
|
||||
return if message_ids.empty?
|
||||
|
||||
host = Email::Sender.host_for(Discourse.base_url)
|
||||
post_id_regexp = Regexp.new "topic/\\d+/(\\d+)@#{Regexp.escape(host)}"
|
||||
topic_id_regexp = Regexp.new "topic/(\\d+)@#{Regexp.escape(host)}"
|
||||
|
||||
post_ids = message_ids.map { |message_id| message_id[post_id_regexp, 1] }.compact.map(&:to_i)
|
||||
post_ids << Post.where(topic_id: message_ids.map { |message_id| message_id[topic_id_regexp, 1] }.compact, post_number: 1).pluck(:id)
|
||||
post_ids = message_ids.map { |message_id| message_id[message_id_post_id_regexp, 1] }.compact.map(&:to_i)
|
||||
post_ids << Post.where(topic_id: message_ids.map { |message_id| message_id[message_id_topic_id_regexp, 1] }.compact, post_number: 1).pluck(:id)
|
||||
post_ids << EmailLog.where(message_id: message_ids).pluck(:post_id)
|
||||
post_ids << IncomingEmail.where(message_id: message_ids).pluck(:post_id)
|
||||
|
||||
|
@ -931,6 +956,18 @@ module Email
|
|||
Post.where(id: post_ids).order(:created_at).last
|
||||
end
|
||||
|
||||
def host
|
||||
@host ||= Email::Sender.host_for(Discourse.base_url)
|
||||
end
|
||||
|
||||
def message_id_post_id_regexp
|
||||
@message_id_post_id_regexp ||= Regexp.new "topic/\\d+/(\\d+)@#{Regexp.escape(host)}"
|
||||
end
|
||||
|
||||
def message_id_topic_id_regexp
|
||||
@message_id_topic_id_regexp ||= Regexp.new "topic/(\\d+)@#{Regexp.escape(host)}"
|
||||
end
|
||||
|
||||
def self.extract_reply_message_ids(mail, max_message_id_count:)
|
||||
message_ids = [mail.in_reply_to, Email::Receiver.extract_references(mail.references)]
|
||||
message_ids.flatten!
|
||||
|
|
|
@ -4,6 +4,9 @@ require 'net/imap'
|
|||
|
||||
module Imap
|
||||
module Providers
|
||||
|
||||
class WriteDisabledError < StandardError; end
|
||||
|
||||
class Generic
|
||||
|
||||
def initialize(server, options = {})
|
||||
|
@ -65,19 +68,31 @@ module Imap
|
|||
|
||||
def open_mailbox(mailbox_name, write: false)
|
||||
if write
|
||||
raise 'two-way IMAP sync is disabled' if !SiteSetting.enable_imap_write
|
||||
if !SiteSetting.enable_imap_write
|
||||
raise WriteDisabledError.new("Two-way IMAP sync is disabled! Cannot write to inbox.")
|
||||
end
|
||||
imap.select(mailbox_name)
|
||||
else
|
||||
imap.examine(mailbox_name)
|
||||
end
|
||||
|
||||
@open_mailbox_name = mailbox_name
|
||||
@open_mailbox_write = write
|
||||
|
||||
{
|
||||
uid_validity: imap.responses['UIDVALIDITY'][-1]
|
||||
}
|
||||
end
|
||||
|
||||
def emails(uids, fields, opts = {})
|
||||
imap.uid_fetch(uids, fields).map do |email|
|
||||
fetched = imap.uid_fetch(uids, fields)
|
||||
|
||||
# This will happen if the email does not exist in the provided mailbox.
|
||||
# It may have been deleted or otherwise moved, e.g. if deleted in Gmail
|
||||
# it will end up in "[Gmail]/Bin"
|
||||
return [] if fetched.nil?
|
||||
|
||||
fetched.map do |email|
|
||||
attributes = {}
|
||||
|
||||
fields.each do |field|
|
||||
|
@ -105,12 +120,16 @@ module Imap
|
|||
end
|
||||
|
||||
def tag_to_label(tag)
|
||||
labels[tag]
|
||||
tag
|
||||
end
|
||||
|
||||
def list_mailboxes
|
||||
imap.list('', '*').map(&:name)
|
||||
end
|
||||
|
||||
def archive(uid)
|
||||
# do nothing by default, just removing the Inbox label should be enough
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -4,13 +4,18 @@ module Imap
|
|||
module Providers
|
||||
class Gmail < Generic
|
||||
X_GM_LABELS = 'X-GM-LABELS'
|
||||
X_GM_THRID = 'X-GM-THRID'
|
||||
|
||||
def imap
|
||||
@imap ||= super.tap { |imap| apply_gmail_patch(imap) }
|
||||
end
|
||||
|
||||
def emails(uids, fields, opts = {})
|
||||
fields[fields.index('LABELS')] = X_GM_LABELS
|
||||
|
||||
# gmail has a special header for labels
|
||||
if fields.include?('LABELS')
|
||||
fields[fields.index('LABELS')] = X_GM_LABELS
|
||||
end
|
||||
|
||||
emails = super(uids, fields, opts)
|
||||
|
||||
|
@ -22,7 +27,7 @@ module Imap
|
|||
email['LABELS'].flatten!
|
||||
end
|
||||
|
||||
email['LABELS'] << '\\Inbox' if opts[:mailbox] == 'INBOX'
|
||||
email['LABELS'] << '\\Inbox' if @open_mailbox_name == 'INBOX'
|
||||
|
||||
email['LABELS'].uniq!
|
||||
end
|
||||
|
@ -57,6 +62,33 @@ module Imap
|
|||
super(tag)
|
||||
end
|
||||
|
||||
def archive(uid)
|
||||
# all emails in the thread must be archived in Gmail for the thread
|
||||
# to get removed from the inbox
|
||||
thread_id = thread_id_from_uid(uid)
|
||||
emails_to_archive = emails_in_thread(thread_id)
|
||||
emails_to_archive.each do |email|
|
||||
labels = email['LABELS']
|
||||
new_labels = labels.reject { |l| l == "\\Inbox" }
|
||||
store(email["UID"], "LABELS", labels, new_labels)
|
||||
end
|
||||
Imap::Sync::Logger.log("[IMAP] Thread ID #{thread_id} (UID #{uid}) archived in Gmail mailbox for #{@username}")
|
||||
end
|
||||
|
||||
def thread_id_from_uid(uid)
|
||||
fetched = imap.uid_fetch(uid, [X_GM_THRID])
|
||||
if !fetched
|
||||
raise "Thread not found for UID #{uid}!"
|
||||
end
|
||||
|
||||
fetched.last.attr[X_GM_THRID]
|
||||
end
|
||||
|
||||
def emails_in_thread(thread_id)
|
||||
uids_to_fetch = imap.uid_search("#{X_GM_THRID} #{thread_id}")
|
||||
emails(uids_to_fetch, ["UID", "LABELS"])
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def apply_gmail_patch(imap)
|
||||
|
|
194
lib/imap/sync.rb
194
lib/imap/sync.rb
|
@ -4,6 +4,16 @@ require 'net/imap'
|
|||
|
||||
module Imap
|
||||
class Sync
|
||||
class Logger
|
||||
def self.log(msg, level = :debug)
|
||||
if ENV['DISCOURSE_EMAIL_SYNC_LOG_OVERRIDE'] == 'warn'
|
||||
Rails.logger.warn(msg)
|
||||
else
|
||||
Rails.logger.send(level, msg)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def self.for_group(group, opts = {})
|
||||
if group.imap_server == 'imap.gmail.com'
|
||||
opts[:provider] ||= Imap::Providers::Gmail
|
||||
|
@ -16,7 +26,8 @@ module Imap
|
|||
@group = group
|
||||
|
||||
provider_klass ||= opts[:provider] || Imap::Providers::Generic
|
||||
@provider = provider_klass.new(@group.imap_server,
|
||||
@provider = provider_klass.new(
|
||||
@group.imap_server,
|
||||
port: @group.imap_port,
|
||||
ssl: @group.imap_ssl,
|
||||
username: @group.email_username,
|
||||
|
@ -59,12 +70,12 @@ module Imap
|
|||
# If UID validity changes, the whole mailbox must be synchronized (all
|
||||
# emails are considered new and will be associated to existent topics
|
||||
# in Email::Reciever by matching Message-Ids).
|
||||
Rails.logger.warn("[IMAP] UIDVALIDITY = #{@status[:uid_validity]} does not match expected #{@group.imap_uid_validity}, invalidating IMAP cache and resyncing emails for group #{@group.name} and mailbox #{@group.imap_mailbox_name}")
|
||||
Logger.log("[IMAP] (#{@group.name}) UIDVALIDITY = #{@status[:uid_validity]} does not match expected #{@group.imap_uid_validity}, invalidating IMAP cache and resyncing emails for group #{@group.name} and mailbox #{@group.imap_mailbox_name}", :warn)
|
||||
@group.imap_last_uid = 0
|
||||
end
|
||||
|
||||
if idle && !can_idle?
|
||||
Rails.logger.warn("[IMAP] IMAP server for group #{@group.name} cannot IDLE")
|
||||
Logger.log("[IMAP] (#{@group.name}) IMAP server for group cannot IDLE", :warn)
|
||||
idle = false
|
||||
end
|
||||
|
||||
|
@ -75,7 +86,10 @@ module Imap
|
|||
# back to the pool.
|
||||
ActiveRecord::Base.connection_handler.clear_active_connections!
|
||||
|
||||
@provider.imap.idle(SiteSetting.imap_polling_period_mins.minutes.to_i) do |resp|
|
||||
idle_polling_mins = SiteSetting.imap_polling_period_mins.minutes.to_i
|
||||
Logger.log("[IMAP] (#{@group.name}) Going IDLE for #{idle_polling_mins} seconds to wait for more work")
|
||||
|
||||
@provider.imap.idle(idle_polling_mins) do |resp|
|
||||
if resp.kind_of?(Net::IMAP::UntaggedResponse) && resp.name == 'EXISTS'
|
||||
@provider.imap.idle_done
|
||||
end
|
||||
|
@ -95,7 +109,7 @@ module Imap
|
|||
# Sometimes, new_uids contains elements from old_uids.
|
||||
new_uids = new_uids - old_uids
|
||||
|
||||
Rails.logger.debug("[IMAP] Remote email server has #{old_uids.size} old emails and #{new_uids.size} new emails")
|
||||
Logger.log("[IMAP] (#{@group.name}) Remote email server has #{old_uids.size} old emails and #{new_uids.size} new emails")
|
||||
|
||||
all_old_uids_size = old_uids.size
|
||||
all_new_uids_size = new_uids.size
|
||||
|
@ -111,67 +125,15 @@ module Imap
|
|||
new_uids = new_uids[0..new_emails_limit - 1] if new_emails_limit > 0
|
||||
|
||||
if old_uids.present?
|
||||
Rails.logger.debug("[IMAP] Syncing #{old_uids.size} randomly-selected old emails")
|
||||
emails = @provider.emails(old_uids, ['UID', 'FLAGS', 'LABELS'], mailbox: @group.imap_mailbox_name)
|
||||
emails.each do |email|
|
||||
incoming_email = IncomingEmail.find_by(
|
||||
imap_uid_validity: @status[:uid_validity],
|
||||
imap_uid: email['UID']
|
||||
)
|
||||
|
||||
if incoming_email.present?
|
||||
update_topic(email, incoming_email, mailbox_name: @group.imap_mailbox_name)
|
||||
else
|
||||
Rails.logger.warn("[IMAP] Could not find old email (UIDVALIDITY = #{@status[:uid_validity]}, UID = #{email['UID']})")
|
||||
end
|
||||
end
|
||||
process_old_uids(old_uids)
|
||||
end
|
||||
|
||||
if new_uids.present?
|
||||
Rails.logger.debug("[IMAP] Syncing #{new_uids.size} new emails (oldest first)")
|
||||
|
||||
emails = @provider.emails(new_uids, ['UID', 'FLAGS', 'LABELS', 'RFC822'], mailbox: @group.imap_mailbox_name)
|
||||
processed = 0
|
||||
|
||||
emails.each do |email|
|
||||
# Synchronously process emails because the order of emails matter
|
||||
# (for example replies must be processed after the original email
|
||||
# to have a topic where the reply can be posted).
|
||||
begin
|
||||
receiver = Email::Receiver.new(email['RFC822'],
|
||||
allow_auto_generated: true,
|
||||
import_mode: import_mode,
|
||||
destinations: [@group],
|
||||
uid_validity: @status[:uid_validity],
|
||||
uid: email['UID']
|
||||
)
|
||||
receiver.process!
|
||||
update_topic(email, receiver.incoming_email, mailbox_name: @group.imap_mailbox_name)
|
||||
rescue Email::Receiver::ProcessingError => e
|
||||
Rails.logger.warn("[IMAP] Could not process (UIDVALIDITY = #{@status[:uid_validity]}, UID = #{email['UID']}): #{e.message}")
|
||||
end
|
||||
|
||||
processed += 1
|
||||
@group.update_columns(
|
||||
imap_uid_validity: @status[:uid_validity],
|
||||
imap_last_uid: email['UID'],
|
||||
imap_old_emails: all_old_uids_size + processed,
|
||||
imap_new_emails: all_new_uids_size - processed
|
||||
)
|
||||
end
|
||||
process_new_uids(new_uids, import_mode, all_old_uids_size, all_new_uids_size)
|
||||
end
|
||||
|
||||
# Discourse -> IMAP server (upload): syncs updated flags and labels.
|
||||
if SiteSetting.enable_imap_write
|
||||
to_sync = IncomingEmail.where(imap_sync: true)
|
||||
if to_sync.size > 0
|
||||
@provider.open_mailbox(@group.imap_mailbox_name, write: true)
|
||||
to_sync.each do |incoming_email|
|
||||
Rails.logger.debug("[IMAP] Updating email for #{@group.name} and incoming email ID = #{incoming_email.id}")
|
||||
update_email(@group.imap_mailbox_name, incoming_email)
|
||||
end
|
||||
end
|
||||
end
|
||||
sync_to_server
|
||||
|
||||
{ remaining: all_new_uids_size - new_uids.size }
|
||||
end
|
||||
|
@ -188,6 +150,90 @@ module Imap
|
|||
|
||||
private
|
||||
|
||||
def process_old_uids(old_uids)
|
||||
Logger.log("[IMAP] (#{@group.name}) Syncing #{old_uids.size} randomly-selected old emails")
|
||||
emails = @provider.emails(old_uids, ['UID', 'FLAGS', 'LABELS', 'ENVELOPE'])
|
||||
emails.each do |email|
|
||||
incoming_email = IncomingEmail.find_by(
|
||||
imap_uid_validity: @status[:uid_validity],
|
||||
imap_uid: email['UID'],
|
||||
imap_group_id: @group.id
|
||||
)
|
||||
|
||||
if incoming_email.present?
|
||||
update_topic(email, incoming_email, mailbox_name: @group.imap_mailbox_name)
|
||||
else
|
||||
# try finding email by message-id instead, we may be able to set the uid etc.
|
||||
incoming_email = IncomingEmail.where(
|
||||
message_id: email['ENVELOPE'].message_id.tr("<>", ""),
|
||||
imap_uid: nil,
|
||||
imap_uid_validity: nil
|
||||
).where("to_addresses LIKE '%#{@group.email_username}%'").first
|
||||
|
||||
if incoming_email
|
||||
incoming_email.update(
|
||||
imap_uid_validity: @status[:uid_validity],
|
||||
imap_uid: email['UID'],
|
||||
imap_group_id: @group.id
|
||||
)
|
||||
update_topic(email, incoming_email, mailbox_name: @group.imap_mailbox_name)
|
||||
else
|
||||
Logger.log("[IMAP] (#{@group.name}) Could not find old email (UIDVALIDITY = #{@status[:uid_validity]}, UID = #{email['UID']})", :warn)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def process_new_uids(new_uids, import_mode, all_old_uids_size, all_new_uids_size)
|
||||
Logger.log("[IMAP] (#{@group.name}) Syncing #{new_uids.size} new emails (oldest first)")
|
||||
|
||||
emails = @provider.emails(new_uids, ['UID', 'FLAGS', 'LABELS', 'RFC822'])
|
||||
processed = 0
|
||||
|
||||
emails.each do |email|
|
||||
# Synchronously process emails because the order of emails matter
|
||||
# (for example replies must be processed after the original email
|
||||
# to have a topic where the reply can be posted).
|
||||
begin
|
||||
receiver = Email::Receiver.new(
|
||||
email['RFC822'],
|
||||
allow_auto_generated: true,
|
||||
import_mode: import_mode,
|
||||
destinations: [@group],
|
||||
imap_uid_validity: @status[:uid_validity],
|
||||
imap_uid: email['UID'],
|
||||
imap_group_id: @group.id
|
||||
)
|
||||
receiver.process!
|
||||
|
||||
update_topic(email, receiver.incoming_email, mailbox_name: @group.imap_mailbox_name)
|
||||
rescue Email::Receiver::ProcessingError => e
|
||||
Logger.log("[IMAP] (#{@group.name}) Could not process (UIDVALIDITY = #{@status[:uid_validity]}, UID = #{email['UID']}): #{e.message}", :warn)
|
||||
end
|
||||
|
||||
processed += 1
|
||||
@group.update_columns(
|
||||
imap_uid_validity: @status[:uid_validity],
|
||||
imap_last_uid: email['UID'],
|
||||
imap_old_emails: all_old_uids_size + processed,
|
||||
imap_new_emails: all_new_uids_size - processed
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
def sync_to_server
|
||||
return if !SiteSetting.enable_imap_write
|
||||
|
||||
to_sync = IncomingEmail.where(imap_sync: true, imap_group_id: @group.id)
|
||||
if to_sync.size > 0
|
||||
@provider.open_mailbox(@group.imap_mailbox_name, write: true)
|
||||
to_sync.each do |incoming_email|
|
||||
Logger.log("[IMAP] (#{@group.name}) Updating email and incoming email ID = #{incoming_email.id}")
|
||||
update_email(incoming_email)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def update_topic_archived_state(email, incoming_email, opts = {})
|
||||
topic = incoming_email.topic
|
||||
|
||||
|
@ -231,10 +277,17 @@ module Imap
|
|||
DiscourseTagging.tag_topic_by_names(topic, Guardian.new(Discourse.system_user), tags.to_a)
|
||||
end
|
||||
|
||||
def update_email(mailbox_name, incoming_email)
|
||||
def update_email(incoming_email)
|
||||
return if !SiteSetting.tagging_enabled || !SiteSetting.allow_staff_to_tag_pms
|
||||
return if incoming_email&.post&.post_number != 1 || !incoming_email.imap_sync
|
||||
return unless email = @provider.emails(incoming_email.imap_uid, ['FLAGS', 'LABELS'], mailbox: mailbox_name).first
|
||||
|
||||
# if email is nil, the UID does not exist in the provider, meaning....
|
||||
#
|
||||
# A) the email has been deleted/moved to a different mailbox in the provider
|
||||
# B) the UID does not belong to the provider
|
||||
email = @provider.emails(incoming_email.imap_uid, ['FLAGS', 'LABELS']).first
|
||||
return if !email.present?
|
||||
|
||||
incoming_email.update(imap_sync: false)
|
||||
|
||||
labels = email['LABELS']
|
||||
|
@ -248,11 +301,24 @@ module Imap
|
|||
# Sync topic status and labels with email flags and labels.
|
||||
tags = topic.tags.pluck(:name)
|
||||
new_flags = tags.map { |tag| @provider.tag_to_flag(tag) }.reject(&:blank?)
|
||||
# new_flags << Net::IMAP::DELETED if !incoming_email.topic
|
||||
new_labels = tags.map { |tag| @provider.tag_to_label(tag) }.reject(&:blank?)
|
||||
new_labels << '\\Inbox' if topic.group_archived_messages.length == 0
|
||||
|
||||
# the topic is archived, and the archive should be reflected in the IMAP
|
||||
# server
|
||||
topic_archived = topic.group_archived_messages.any?
|
||||
if !topic_archived
|
||||
new_labels << '\\Inbox'
|
||||
else
|
||||
Logger.log("[IMAP] (#{@group.name}) Archiving UID #{incoming_email.imap_uid}")
|
||||
end
|
||||
|
||||
@provider.store(incoming_email.imap_uid, 'FLAGS', flags, new_flags)
|
||||
@provider.store(incoming_email.imap_uid, 'LABELS', labels, new_labels)
|
||||
|
||||
# some providers need special handling for archiving. this way we preserve
|
||||
# any new tag-labels, and archive, even though it may cause extra requests
|
||||
# to the IMAP server
|
||||
@provider.archive(incoming_email.imap_uid)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -84,6 +84,7 @@ describe Imap::Sync do
|
|||
expect(incoming_email.imap_uid_validity).to eq(1)
|
||||
expect(incoming_email.imap_uid).to eq(100)
|
||||
expect(incoming_email.imap_sync).to eq(false)
|
||||
expect(incoming_email.imap_group_id).to eq(group.id)
|
||||
end
|
||||
|
||||
it 'does not duplicate topics' do
|
||||
|
@ -98,19 +99,39 @@ describe Imap::Sync do
|
|||
.and change { IncomingEmail.count }.by(0)
|
||||
end
|
||||
|
||||
it 'does not duplicate incoming emails' do
|
||||
it 'creates a new incoming email if the message ID does not match the receiver post id regex' do
|
||||
incoming_email = Fabricate(:incoming_email, message_id: message_id)
|
||||
|
||||
expect { sync_handler.process }
|
||||
.to change { Topic.count }.by(0)
|
||||
.and change { Post.where(post_type: Post.types[:regular]).count }.by(0)
|
||||
.and change { IncomingEmail.count }.by(0)
|
||||
.to change { Topic.count }.by(1)
|
||||
.and change { Post.where(post_type: Post.types[:regular]).count }.by(1)
|
||||
.and change { IncomingEmail.count }.by(1)
|
||||
|
||||
incoming_email.reload
|
||||
expect(incoming_email.message_id).to eq(message_id)
|
||||
expect(incoming_email.imap_uid_validity).to eq(1)
|
||||
expect(incoming_email.imap_uid).to eq(100)
|
||||
expect(incoming_email.imap_sync).to eq(false)
|
||||
last_incoming = IncomingEmail.where(message_id: message_id).last
|
||||
expect(last_incoming.message_id).to eq(message_id)
|
||||
expect(last_incoming.imap_uid_validity).to eq(1)
|
||||
expect(last_incoming.imap_uid).to eq(100)
|
||||
expect(last_incoming.imap_sync).to eq(false)
|
||||
expect(last_incoming.imap_group_id).to eq(group.id)
|
||||
end
|
||||
|
||||
context "when the message id matches the receiver post id regex" do
|
||||
let(:message_id) { "topic/999/324@test.localhost" }
|
||||
it 'does not duplicate incoming email' do
|
||||
incoming_email = Fabricate(:incoming_email, message_id: message_id)
|
||||
|
||||
expect { sync_handler.process }
|
||||
.to change { Topic.count }.by(0)
|
||||
.and change { Post.where(post_type: Post.types[:regular]).count }.by(0)
|
||||
.and change { IncomingEmail.count }.by(0)
|
||||
|
||||
incoming_email.reload
|
||||
expect(incoming_email.message_id).to eq(message_id)
|
||||
expect(incoming_email.imap_uid_validity).to eq(1)
|
||||
expect(incoming_email.imap_uid).to eq(100)
|
||||
expect(incoming_email.imap_sync).to eq(false)
|
||||
expect(incoming_email.imap_group_id).to eq(group.id)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -165,7 +186,7 @@ describe Imap::Sync do
|
|||
|
||||
provider.stubs(:uids).with(to: 100).returns([100])
|
||||
provider.stubs(:uids).with(from: 101).returns([200])
|
||||
provider.stubs(:emails).with([100], ['UID', 'FLAGS', 'LABELS'], anything).returns(
|
||||
provider.stubs(:emails).with([100], ['UID', 'FLAGS', 'LABELS', 'ENVELOPE'], anything).returns(
|
||||
[
|
||||
{
|
||||
'UID' => 100,
|
||||
|
@ -205,7 +226,7 @@ describe Imap::Sync do
|
|||
|
||||
provider.stubs(:uids).with(to: 200).returns([100, 200])
|
||||
provider.stubs(:uids).with(from: 201).returns([])
|
||||
provider.stubs(:emails).with([100, 200], ['UID', 'FLAGS', 'LABELS'], anything).returns(
|
||||
provider.stubs(:emails).with([100, 200], ['UID', 'FLAGS', 'LABELS', 'ENVELOPE'], anything).returns(
|
||||
[
|
||||
{
|
||||
'UID' => 100,
|
||||
|
@ -244,7 +265,9 @@ describe Imap::Sync do
|
|||
let(:second_message_id) { SecureRandom.hex }
|
||||
let(:second_body) { '<p>This is an <b>answer</b> to this message.</p>' }
|
||||
|
||||
it 'is updated' do
|
||||
# TODO: Improve the invalidating flow for mailbox change. This is a destructive
|
||||
# action so it should not be done often.
|
||||
xit 'is updated' do
|
||||
provider = MockedImapProvider.any_instance
|
||||
|
||||
provider.stubs(:open_mailbox).returns(uid_validity: 1)
|
||||
|
@ -285,8 +308,8 @@ describe Imap::Sync do
|
|||
.and change { Post.where(post_type: Post.types[:regular]).count }.by(2)
|
||||
.and change { IncomingEmail.count }.by(2)
|
||||
|
||||
imap_data = Topic.last.incoming_email.pluck(:imap_uid_validity, :imap_uid)
|
||||
expect(imap_data).to contain_exactly([1, 100], [1, 200])
|
||||
imap_data = Topic.last.incoming_email.pluck(:imap_uid_validity, :imap_uid, :imap_group_id)
|
||||
expect(imap_data).to contain_exactly([1, 100, group.id], [1, 200, group.id])
|
||||
|
||||
provider.stubs(:open_mailbox).returns(uid_validity: 2)
|
||||
provider.stubs(:uids).with.returns([111, 222])
|
||||
|
@ -326,8 +349,8 @@ describe Imap::Sync do
|
|||
.and change { Post.where(post_type: Post.types[:regular]).count }.by(0)
|
||||
.and change { IncomingEmail.count }.by(0)
|
||||
|
||||
imap_data = Topic.last.incoming_email.pluck(:imap_uid_validity, :imap_uid)
|
||||
expect(imap_data).to contain_exactly([2, 111], [2, 222])
|
||||
imap_data = Topic.last.incoming_email.pluck(:imap_uid_validity, :imap_uid, :imap_group_id)
|
||||
expect(imap_data).to contain_exactly([2, 111, group.id], [2, 222, group.id])
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require 'rails_helper'
|
||||
|
||||
RSpec.describe Imap::Providers::Generic do
|
||||
fab!(:username) { "test@generic.com" }
|
||||
fab!(:password) { "test1!" }
|
||||
fab!(:provider) do
|
||||
described_class.new(
|
||||
"imap.generic.com",
|
||||
{
|
||||
port: 993,
|
||||
ssl: true,
|
||||
username: username,
|
||||
password: password
|
||||
}
|
||||
)
|
||||
end
|
||||
|
||||
let(:imap_stub) { stub }
|
||||
before do
|
||||
described_class.any_instance.stubs(:imap).returns(imap_stub)
|
||||
end
|
||||
|
||||
describe "#connect!" do
|
||||
it "calls login with the provided username and password on the imap client" do
|
||||
imap_stub.expects(:login).with(username, password).once
|
||||
provider.connect!
|
||||
end
|
||||
end
|
||||
|
||||
describe "#uids" do
|
||||
it "can search with from and to" do
|
||||
imap_stub.expects(:uid_search).once.with("UID 5:9")
|
||||
provider.uids(from: 5, to: 9)
|
||||
end
|
||||
|
||||
it "can search with only from" do
|
||||
imap_stub.expects(:uid_search).once.with("UID 5:*")
|
||||
provider.uids(from: 5)
|
||||
end
|
||||
|
||||
it "can search with only to" do
|
||||
imap_stub.expects(:uid_search).once.with("UID 1:9")
|
||||
provider.uids(to: 9)
|
||||
end
|
||||
|
||||
it "can search all" do
|
||||
imap_stub.expects(:uid_search).once.with("ALL")
|
||||
provider.uids
|
||||
end
|
||||
end
|
||||
|
||||
describe "#open_mailbox" do
|
||||
it "uses examine to get a readonly version of the mailbox" do
|
||||
imap_stub.expects(:examine).with("Inbox")
|
||||
imap_stub.expects(:responses).returns({ 'UIDVALIDITY' => [1] })
|
||||
provider.open_mailbox("Inbox")
|
||||
end
|
||||
|
||||
describe "write true" do
|
||||
context "if imap_write is disabled" do
|
||||
before { SiteSetting.enable_imap_write = false }
|
||||
|
||||
it "raises an error" do
|
||||
expect { provider.open_mailbox("Inbox", write: true) }.to raise_error(
|
||||
Imap::Providers::WriteDisabledError
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
context "if imap_write is enabled" do
|
||||
before { SiteSetting.enable_imap_write = true }
|
||||
|
||||
it "does not raise an error and calls imap.select" do
|
||||
imap_stub.expects(:select).with("Inbox")
|
||||
imap_stub.expects(:responses).returns({ 'UIDVALIDITY' => [1] })
|
||||
expect { provider.open_mailbox("Inbox", write: true) }.not_to raise_error
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "#emails" do
|
||||
let(:fields) { ['UID'] }
|
||||
let(:uids) { [99, 106] }
|
||||
|
||||
it "returns empty array if uid_fetch does not find any matching emails by uid" do
|
||||
imap_stub.expects(:uid_fetch).with(uids, fields).returns(nil)
|
||||
expect(provider.emails(uids, fields)).to eq([])
|
||||
end
|
||||
|
||||
it "returns an array of attributes" do
|
||||
imap_stub.expects(:uid_fetch).with(uids, fields).returns([
|
||||
Net::IMAP::FetchData.new(1, { "UID" => 99 }),
|
||||
Net::IMAP::FetchData.new(1, { "UID" => 106 })
|
||||
])
|
||||
expect(provider.emails(uids, fields)).to eq([{ "UID" => 99 }, { "UID" => 106 }])
|
||||
end
|
||||
end
|
||||
|
||||
describe "#to_tag" do
|
||||
it "returns a label cleaned up so it can be used for a discourse tag" do
|
||||
expect(provider.to_tag("Some Label")).to eq("some-label")
|
||||
end
|
||||
end
|
||||
|
||||
describe "#tag_to_label" do
|
||||
it "returns the tag as is by default" do
|
||||
expect(provider.tag_to_label("Support")).to eq("Support")
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue