SOLR-2245: Numerous improvements to the MailEntityProcessor

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1607147 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy Potter 2014-07-01 17:13:46 +00:00
parent 33bf336cd4
commit cad18b5384
8 changed files with 627 additions and 278 deletions

View File

@ -67,7 +67,8 @@ com.sun.jersey.version = 1.9
/jakarta-regexp/jakarta-regexp = 1.4
/javax.activation/activation = 1.1.1
/javax.inject/javax.inject= 1
/javax.mail/mail = 1.4.3
/com.sun.mail/javax.mail = 1.5.1
/com.sun.mail/gimap = 1.5.1
/javax.servlet/javax.servlet-api = 3.0.1
/javax.servlet/servlet-api = 2.4
/jdom/jdom = 1.0

View File

@ -90,6 +90,10 @@ Other Changes
* SOLR-6215: TrieDateField should directly extend TrieField instead of
forwarding to a wrapped TrieField. (Steve Rowe)
* SOLR-2245: Numerous improvements of the MailEntityProcessor, including using
the GMail extensions to do server-side date filtering and using GreenMail in
the unit test to enable automated tests. (Timothy Potter)
================== 4.10.0 =================
Versions of Major Components

View File

@ -32,5 +32,6 @@
</configurations>
<dependencies>
<dependency org="org.slf4j" name="jcl-over-slf4j" rev="${/org.slf4j/jcl-over-slf4j}" conf="test->*"/>
<dependency org="com.icegreen" name="greenmail" rev="1.3.1b" conf="test->*"/>
</dependencies>
</ivy-module>

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -18,10 +18,9 @@ package org.apache.solr.handler.dataimport;
import com.sun.mail.imap.IMAPMessage;
import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
import org.apache.tika.Tika;
import org.apache.tika.metadata.HttpHeaders;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaMetadataKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -30,33 +29,40 @@ import javax.mail.internet.AddressException;
import javax.mail.internet.ContentType;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import javax.mail.search.AndTerm;
import javax.mail.search.ComparisonTerm;
import javax.mail.search.ReceivedDateTerm;
import javax.mail.search.SearchTerm;
import javax.mail.search.*;
import java.io.InputStream;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import com.sun.mail.gimap.GmailFolder;
import com.sun.mail.gimap.GmailRawSearchTerm;
/**
* An {@link EntityProcessor} instance which can index emails along with their attachments from POP3 or IMAP sources. Refer to
* <a href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a> for more
* details. <b>This API is experimental and subject to change</b>
*
*
* An EntityProcessor instance which can index emails along with their
* attachments from POP3 or IMAP sources. Refer to <a
* href="http://wiki.apache.org/solr/DataImportHandler"
* >http://wiki.apache.org/solr/DataImportHandler</a> for more details. <b>This
* API is experimental and subject to change</b>
*
* @version $Id$
* @since solr 1.4
*/
public class MailEntityProcessor extends EntityProcessorBase {
private static final SimpleDateFormat sinceDateParser =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static final SimpleDateFormat afterFmt =
new SimpleDateFormat("yyyy/MM/dd");
public static interface CustomFilter {
public SearchTerm getCustomSearch(Folder folder);
}
@Override
public void init(Context context) {
super.init(context);
// set attributes using XXX getXXXFromContext(attribute, defualtValue);
// set attributes using XXX getXXXFromContext(attribute, defualtValue);
// applies variable resolver and return default if value is not found or null
// REQUIRED : connection and folder info
user = getStringFromContext("user", null);
@ -66,60 +72,116 @@ public class MailEntityProcessor extends EntityProcessorBase {
folderNames = getStringFromContext("folders", null);
// validate
if (host == null || protocol == null || user == null || password == null
|| folderNames == null)
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
"'user|password|protocol|host|folders' are required attributes");
//OPTIONAL : have defaults and are optional
|| folderNames == null) throw new DataImportHandlerException(
DataImportHandlerException.SEVERE,
"'user|password|protocol|host|folders' are required attributes");
// OPTIONAL : have defaults and are optional
recurse = getBoolFromContext("recurse", true);
exclude.clear();
String excludes = getStringFromContext("exclude", "");
if (excludes != null && !excludes.trim().equals("")) {
exclude = Arrays.asList(excludes.split(","));
}
include.clear();
String includes = getStringFromContext("include", "");
if (includes != null && !includes.trim().equals("")) {
include = Arrays.asList(includes.split(","));
}
batchSize = getIntFromContext("batchSize", 20);
customFilter = getStringFromContext("customFilter", "");
String s = getStringFromContext("fetchMailsSince", null);
if (s != null)
try {
fetchMailsSince = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ROOT).parse(s);
} catch (ParseException e) {
throw new DataImportHandlerException(DataImportHandlerException.SEVERE, "Invalid value for fetchMailSince: " + s, e);
}
if (filters != null) filters.clear();
folderIter = null;
msgIter = null;
String lastIndexTime = null;
String command =
String.valueOf(context.getRequestParameters().get("command"));
if (!DataImporter.FULL_IMPORT_CMD.equals(command))
throw new IllegalArgumentException(this.getClass().getSimpleName()+
" only supports "+DataImporter.FULL_IMPORT_CMD);
// Read the last_index_time out of the dataimport.properties if available
String cname = getStringFromContext("name", "mailimporter");
String varName = ConfigNameConstants.IMPORTER_NS_SHORT + "." + cname + "."
+ DocBuilder.LAST_INDEX_TIME;
Object varValue = context.getVariableResolver().resolve(varName);
if (varValue == null || "".equals(varValue)) {
varName = ConfigNameConstants.IMPORTER_NS_SHORT + "."
+ DocBuilder.LAST_INDEX_TIME;
varValue = context.getVariableResolver().resolve(varName);
}
if (varValue != null && varValue instanceof String) {
lastIndexTime = (String)varValue;
if (lastIndexTime != null && lastIndexTime.length() == 0)
lastIndexTime = null;
}
if (lastIndexTime == null)
lastIndexTime = getStringFromContext("fetchMailsSince", "");
LOG.info("Using lastIndexTime "+lastIndexTime+" for mail import");
this.fetchMailsSince = null;
if (lastIndexTime != null && lastIndexTime.length() > 0) {
try {
fetchMailsSince = sinceDateParser.parse(lastIndexTime);
LOG.info("Parsed fetchMailsSince=" + lastIndexTime);
} catch (ParseException e) {
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
"Invalid value for fetchMailSince: " + lastIndexTime, e);
}
}
fetchSize = getIntFromContext("fetchSize", 32 * 1024);
cTimeout = getIntFromContext("connectTimeout", 30 * 1000);
rTimeout = getIntFromContext("readTimeout", 60 * 1000);
processAttachment = getBoolFromContext(
getStringFromContext("processAttachment",null) == null ? "processAttachement":"processAttachment"
, true);
tika = new Tika();
String tmp = context.getEntityAttribute("includeOtherUserFolders");
includeOtherUserFolders = (tmp != null && Boolean.valueOf(tmp.trim()));
tmp = context.getEntityAttribute("includeSharedFolders");
includeSharedFolders = (tmp != null && Boolean.valueOf(tmp.trim()));
setProcessAttachmentConfig();
includeContent = getBoolFromContext("includeContent", true);
logConfig();
}
private void setProcessAttachmentConfig() {
processAttachment = true;
String tbval = context.getEntityAttribute("processAttachments");
if (tbval == null) {
tbval = context.getEntityAttribute("processAttachement");
if (tbval != null) processAttachment = Boolean.valueOf(tbval);
} else processAttachment = Boolean.valueOf(tbval);
}
@Override
public Map<String, Object> nextRow() {
Message mail;
Map<String, Object> row = null;
public Map<String,Object> nextRow() {
Message mail = null;
Map<String,Object> row = null;
do {
// try till there is a valid document or folders get exhausted.
// when mail == NULL, it means end of processing
mail = getNextMail();
mail = getNextMail();
if (mail != null)
row = getDocumentFromMail(mail);
} while (row == null && mail != null);
if (row != null && row.get("folder") == null)
row.put("folder", mail.getFolder().getFullName());
} while (row == null && mail != null);
return row;
}
private Message getNextMail() {
if (!connected) {
if (!connectToMailBox())
return null;
if (!connectToMailBox()) return null;
connected = true;
}
if (folderIter == null) {
@ -131,119 +193,127 @@ public class MailEntityProcessor extends EntityProcessorBase {
// loop till a valid mail or all folders exhausted.
while (msgIter == null || !msgIter.hasNext()) {
Folder next = folderIter.hasNext() ? folderIter.next() : null;
if (next == null) {
return null;
}
if (next == null) return null;
msgIter = new MessageIterator(next, batchSize);
}
return msgIter.next();
}
private Map<String, Object> getDocumentFromMail(Message mail) {
Map<String, Object> row = new HashMap<>();
private Map<String,Object> getDocumentFromMail(Message mail) {
Map<String,Object> row = new HashMap<>();
try {
addPartToDocument(mail, row, true);
return row;
} catch (Exception e) {
LOG.error("Failed to convert message [" + mail.toString()
+ "] to document due to: " + e, e);
return null;
}
}
public void addPartToDocument(Part part, Map<String, Object> row, boolean outerMost) throws Exception {
public void addPartToDocument(Part part, Map<String,Object> row, boolean outerMost) throws Exception {
if (part instanceof Message) {
addEnvelopToDocument(part, row);
addEnvelopeToDocument(part, row);
}
String ct = part.getContentType();
String ct = part.getContentType().toLowerCase();
ContentType ctype = new ContentType(ct);
if (part.isMimeType("multipart/*")) {
Multipart mp = (Multipart) part.getContent();
int count = mp.getCount();
if (part.isMimeType("multipart/alternative"))
count = 1;
for (int i = 0; i < count; i++)
addPartToDocument(mp.getBodyPart(i), row, false);
Object content = part.getContent();
if (content != null && content instanceof Multipart) {
Multipart mp = (Multipart) part.getContent();
int count = mp.getCount();
if (part.isMimeType("multipart/alternative")) count = 1;
for (int i = 0; i < count; i++)
addPartToDocument(mp.getBodyPart(i), row, false);
} else {
LOG.warn("Multipart content is a not an instance of Multipart! Content is: "
+ (content != null ? content.getClass().getName() : "null")
+ ". Typically, this is due to the Java Activation JAR being loaded by the wrong classloader.");
}
} else if (part.isMimeType("message/rfc822")) {
addPartToDocument((Part) part.getContent(), row, false);
} else {
String disp = part.getDisposition();
if (!processAttachment || (disp != null && disp.equalsIgnoreCase(Part.ATTACHMENT))) return;
InputStream is = part.getInputStream();
String fileName = part.getFileName();
Metadata md = new Metadata();
md.set(HttpHeaders.CONTENT_TYPE, ctype.getBaseType().toLowerCase(Locale.ROOT));
md.set(TikaMetadataKeys.RESOURCE_NAME_KEY, fileName);
String content = tika.parseToString(is, md);
if (disp != null && disp.equalsIgnoreCase(Part.ATTACHMENT)) {
if (row.get(ATTACHMENT) == null)
row.put(ATTACHMENT, new ArrayList<String>());
List<String> contents = (List<String>) row.get(ATTACHMENT);
contents.add(content);
row.put(ATTACHMENT, contents);
if (row.get(ATTACHMENT_NAMES) == null)
row.put(ATTACHMENT_NAMES, new ArrayList<String>());
List<String> names = (List<String>) row.get(ATTACHMENT_NAMES);
names.add(fileName);
row.put(ATTACHMENT_NAMES, names);
} else {
if (row.get(CONTENT) == null)
row.put(CONTENT, new ArrayList<String>());
if (includeContent
&& !(disp != null && disp.equalsIgnoreCase(Part.ATTACHMENT))) {
InputStream is = part.getInputStream();
Metadata contentTypeHint = new Metadata();
contentTypeHint.set(Metadata.CONTENT_TYPE, ctype.getBaseType()
.toLowerCase(Locale.ENGLISH));
String content = (new Tika()).parseToString(is, contentTypeHint);
if (row.get(CONTENT) == null) row.put(CONTENT, new ArrayList<String>());
List<String> contents = (List<String>) row.get(CONTENT);
contents.add(content);
contents.add(content.trim());
row.put(CONTENT, contents);
}
if (!processAttachment || disp == null
|| !disp.equalsIgnoreCase(Part.ATTACHMENT)) return;
InputStream is = part.getInputStream();
String fileName = part.getFileName();
Metadata contentTypeHint = new Metadata();
contentTypeHint.set(Metadata.CONTENT_TYPE, ctype.getBaseType()
.toLowerCase(Locale.ENGLISH));
String content = (new Tika()).parseToString(is, contentTypeHint);
if (content == null || content.trim().length() == 0) return;
if (row.get(ATTACHMENT) == null) row.put(ATTACHMENT,
new ArrayList<String>());
List<String> contents = (List<String>) row.get(ATTACHMENT);
contents.add(content.trim());
row.put(ATTACHMENT, contents);
if (row.get(ATTACHMENT_NAMES) == null) row.put(ATTACHMENT_NAMES,
new ArrayList<String>());
List<String> names = (List<String>) row.get(ATTACHMENT_NAMES);
names.add(fileName);
row.put(ATTACHMENT_NAMES, names);
}
}
private void addEnvelopToDocument(Part part, Map<String, Object> row) throws MessagingException {
private void addEnvelopeToDocument(Part part, Map<String,Object> row)
throws MessagingException {
MimeMessage mail = (MimeMessage) part;
Address[] adresses;
if ((adresses = mail.getFrom()) != null && adresses.length > 0)
row.put(FROM, adresses[0].toString());
if ((adresses = mail.getFrom()) != null && adresses.length > 0) row.put(
FROM, adresses[0].toString());
List<String> to = new ArrayList<>();
if ((adresses = mail.getRecipients(Message.RecipientType.TO)) != null)
addAddressToList(adresses, to);
if ((adresses = mail.getRecipients(Message.RecipientType.CC)) != null)
addAddressToList(adresses, to);
if ((adresses = mail.getRecipients(Message.RecipientType.BCC)) != null)
addAddressToList(adresses, to);
if (to.size() > 0)
row.put(TO_CC_BCC, to);
if ((adresses = mail.getRecipients(Message.RecipientType.TO)) != null) addAddressToList(
adresses, to);
if ((adresses = mail.getRecipients(Message.RecipientType.CC)) != null) addAddressToList(
adresses, to);
if ((adresses = mail.getRecipients(Message.RecipientType.BCC)) != null) addAddressToList(
adresses, to);
if (to.size() > 0) row.put(TO_CC_BCC, to);
row.put(MESSAGE_ID, mail.getMessageID());
row.put(SUBJECT, mail.getSubject());
Date d = mail.getSentDate();
if (d != null) {
row.put(SENT_DATE, d);
}
List<String> flags = new ArrayList<>();
for (Flags.Flag flag : mail.getFlags().getSystemFlags()) {
if (flag == Flags.Flag.ANSWERED)
flags.add(FLAG_ANSWERED);
else if (flag == Flags.Flag.DELETED)
flags.add(FLAG_DELETED);
else if (flag == Flags.Flag.DRAFT)
flags.add(FLAG_DRAFT);
else if (flag == Flags.Flag.FLAGGED)
flags.add(FLAG_FLAGGED);
else if (flag == Flags.Flag.RECENT)
flags.add(FLAG_RECENT);
else if (flag == Flags.Flag.SEEN)
flags.add(FLAG_SEEN);
if (flag == Flags.Flag.ANSWERED) flags.add(FLAG_ANSWERED);
else if (flag == Flags.Flag.DELETED) flags.add(FLAG_DELETED);
else if (flag == Flags.Flag.DRAFT) flags.add(FLAG_DRAFT);
else if (flag == Flags.Flag.FLAGGED) flags.add(FLAG_FLAGGED);
else if (flag == Flags.Flag.RECENT) flags.add(FLAG_RECENT);
else if (flag == Flags.Flag.SEEN) flags.add(FLAG_SEEN);
}
flags.addAll(Arrays.asList(mail.getFlags().getUserFlags()));
if (flags.size() == 0) flags.add(FLAG_NONE);
row.put(FLAGS, flags);
String[] hdrs = mail.getHeader("X-Mailer");
if (hdrs != null)
row.put(XMAILER, hdrs[0]);
if (hdrs != null) row.put(XMAILER, hdrs[0]);
}
private void addAddressToList(Address[] adresses, List<String> to) throws AddressException {
private void addAddressToList(Address[] adresses, List<String> to)
throws AddressException {
for (Address address : adresses) {
to.add(address.toString());
InternetAddress ia = (InternetAddress) address;
@ -254,25 +324,60 @@ public class MailEntityProcessor extends EntityProcessorBase {
}
}
}
private boolean connectToMailBox() {
// this is needed to load the activation mail stuff correctly
// otherwise, the JavaMail multipart support doesn't get configured
// correctly, which leads to a class cast exception when processing
// multipart messages: IMAPInputStream cannot be cast to
// javax.mail.Multipart
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
try {
Properties props = new Properties();
if (System.getProperty("mail.debug") != null)
props.setProperty("mail.debug", System.getProperty("mail.debug"));
if (("imap".equals(protocol) || "imaps".equals(protocol))
&& "imap.gmail.com".equals(host)) {
LOG.info("Consider using 'gimaps' protocol instead of '" + protocol
+ "' for enabling GMail specific extensions for " + host);
}
props.setProperty("mail.store.protocol", protocol);
props.setProperty("mail.imap.fetchsize", "" + fetchSize);
props.setProperty("mail.imap.timeout", "" + rTimeout);
props.setProperty("mail.imap.connectiontimeout", "" + cTimeout);
String imapPropPrefix = protocol.startsWith("gimap") ? "gimap" : "imap";
props.setProperty("mail." + imapPropPrefix + ".fetchsize", "" + fetchSize);
props.setProperty("mail." + imapPropPrefix + ".timeout", "" + rTimeout);
props.setProperty("mail." + imapPropPrefix + ".connectiontimeout", "" + cTimeout);
int port = -1;
int colonAt = host.indexOf(":");
if (colonAt != -1) {
port = Integer.parseInt(host.substring(colonAt + 1));
host = host.substring(0, colonAt);
}
Session session = Session.getDefaultInstance(props, null);
mailbox = session.getStore(protocol);
mailbox.connect(host, user, password);
LOG.info("Connected to mailbox");
if (port != -1) {
mailbox.connect(host, port, user, password);
} else {
mailbox.connect(host, user, password);
}
LOG.info("Connected to " + user + "'s mailbox on " + host);
return true;
} catch (MessagingException e) {
} catch (MessagingException e) {
String errMsg = String.format(Locale.ENGLISH,
"Failed to connect to %s server %s as user %s due to: %s", protocol,
host, user, e.toString());
LOG.error(errMsg, e);
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
"Connection failed", e);
errMsg, e);
}
}
private void createFilters() {
if (fetchMailsSince != null) {
filters.add(new MailsSinceLastCheckFilter(fetchMailsSince));
@ -286,49 +391,76 @@ public class MailEntityProcessor extends EntityProcessorBase {
}
} catch (Exception e) {
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
"Custom filter could not be created", e);
"Custom filter could not be created", e);
}
}
}
private void logConfig() {
if (!LOG.isInfoEnabled()) return;
StringBuilder config = new StringBuilder();
config.append("user : ").append(user).append(System.getProperty("line.separator"));
config.append("pwd : ").append(password).append(System.getProperty("line.separator"));
config.append("protocol : ").append(protocol).append(System.getProperty("line.separator"));
config.append("host : ").append(host).append(System.getProperty("line.separator"));
config.append("folders : ").append(folderNames).append(System.getProperty("line.separator"));
config.append("recurse : ").append(recurse).append(System.getProperty("line.separator"));
config.append("exclude : ").append(exclude.toString()).append(System.getProperty("line.separator"));
config.append("include : ").append(include.toString()).append(System.getProperty("line.separator"));
config.append("batchSize : ").append(batchSize).append(System.getProperty("line.separator"));
config.append("fetchSize : ").append(fetchSize).append(System.getProperty("line.separator"));
config.append("read timeout : ").append(rTimeout).append(System.getProperty("line.separator"));
config.append("conection timeout : ").append(cTimeout).append(System.getProperty("line.separator"));
config.append("custom filter : ").append(customFilter).append(System.getProperty("line.separator"));
config.append("fetch mail since : ").append(fetchMailsSince).append(System.getProperty("line.separator"));
String lineSep = System.getProperty("line.separator");
StringBuffer config = new StringBuffer();
config.append("user : ").append(user).append(lineSep);
config
.append("pwd : ")
.append(
password != null && password.length() > 0 ? "<non-null>" : "<null>")
.append(lineSep);
config.append("protocol : ").append(protocol)
.append(lineSep);
config.append("host : ").append(host)
.append(lineSep);
config.append("folders : ").append(folderNames)
.append(lineSep);
config.append("recurse : ").append(recurse)
.append(lineSep);
config.append("exclude : ").append(exclude.toString())
.append(lineSep);
config.append("include : ").append(include.toString())
.append(lineSep);
config.append("batchSize : ").append(batchSize)
.append(lineSep);
config.append("fetchSize : ").append(fetchSize)
.append(lineSep);
config.append("read timeout : ").append(rTimeout)
.append(lineSep);
config.append("conection timeout : ").append(cTimeout)
.append(lineSep);
config.append("custom filter : ").append(customFilter)
.append(lineSep);
config.append("fetch mail since : ").append(fetchMailsSince)
.append(lineSep);
config.append("includeContent : ").append(includeContent)
.append(lineSep);
config.append("processAttachments : ").append(processAttachment)
.append(lineSep);
config.append("includeOtherUserFolders : ").append(includeOtherUserFolders)
.append(lineSep);
config.append("includeSharedFolders : ").append(includeSharedFolders)
.append(lineSep);
LOG.info(config.toString());
}
class FolderIterator implements Iterator<Folder> {
private Store mailbox;
private List<String> topLevelFolders;
private List<Folder> folders = null;
private Folder lastFolder = null;
public FolderIterator(Store mailBox) {
this.mailbox = mailBox;
folders = new ArrayList<>();
getTopLevelFolders(mailBox);
if (includeOtherUserFolders) getOtherUserFolders();
if (includeSharedFolders) getSharedFolders();
}
@Override
public boolean hasNext() {
return !folders.isEmpty();
}
@Override
public Folder next() {
try {
boolean hasMessages = false;
@ -358,83 +490,142 @@ public class MailEntityProcessor extends EntityProcessorBase {
folders.add(0, children[i]);
LOG.info("child name : " + children[i].getFullName());
}
if (children.length == 0)
LOG.info("NO children : ");
if (children.length == 0) LOG.info("NO children : ");
}
}
}
while (!hasMessages);
} while (!hasMessages);
return next;
} catch (MessagingException e) {
//throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
// "Folder open failed", e);
} catch (Exception e) {
LOG.warn("Failed to read folders due to: "+e);
// throw new
// DataImportHandlerException(DataImportHandlerException.SEVERE,
// "Folder open failed", e);
}
return null;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Its read only mode...");
}
private void getTopLevelFolders(Store mailBox) {
if (folderNames != null)
topLevelFolders = Arrays.asList(folderNames.split(","));
if (folderNames != null) topLevelFolders = Arrays.asList(folderNames
.split(","));
for (int i = 0; topLevelFolders != null && i < topLevelFolders.size(); i++) {
try {
folders.add(mailbox.getFolder(topLevelFolders.get(i)));
} catch (MessagingException e) {
// skip bad ones unless its the last one and still no good folder
if (folders.size() == 0 && i == topLevelFolders.size() - 1)
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
"Folder retreival failed");
if (folders.size() == 0 && i == topLevelFolders.size() - 1) throw new DataImportHandlerException(
DataImportHandlerException.SEVERE, "Folder retreival failed");
}
}
if (topLevelFolders == null || topLevelFolders.size() == 0) {
try {
folders.add(mailBox.getDefaultFolder());
} catch (MessagingException e) {
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
"Folder retreival failed");
throw new DataImportHandlerException(
DataImportHandlerException.SEVERE, "Folder retreival failed");
}
}
}
private void getOtherUserFolders() {
try {
Folder[] ufldrs = mailbox.getUserNamespaces(null);
if (ufldrs != null) {
LOG.info("Found " + ufldrs.length + " user namespace folders");
for (Folder ufldr : ufldrs)
folders.add(ufldr);
}
} catch (MessagingException me) {
LOG.warn("Messaging exception retrieving user namespaces: "
+ me.getMessage());
}
}
private void getSharedFolders() {
try {
Folder[] sfldrs = mailbox.getSharedNamespaces();
if (sfldrs != null) {
LOG.info("Found " + sfldrs.length + " shared namespace folders");
for (Folder sfldr : sfldrs)
folders.add(sfldr);
}
} catch (MessagingException me) {
LOG.warn("Messaging exception retrieving shared namespaces: "
+ me.getMessage());
}
}
private boolean excludeFolder(String name) {
for (String s : exclude) {
if (name.matches(s))
return true;
if (name.matches(s)) return true;
}
for (String s : include) {
if (name.matches(s))
return false;
if (name.matches(s)) return false;
}
return include.size() > 0;
}
}
class MessageIterator implements Iterator<Message> {
class MessageIterator extends SearchTerm implements Iterator<Message> {
private Folder folder;
private Message[] messagesInCurBatch;
private Message[] messagesInCurBatch = null;
private int current = 0;
private int currentBatch = 0;
private int batchSize = 0;
private int totalInFolder = 0;
private boolean doBatching = true;
public MessageIterator(Folder folder, int batchSize) {
super();
try {
this.folder = folder;
this.batchSize = batchSize;
SearchTerm st = getSearchTerm();
if (st != null) {
LOG.info("SearchTerm=" + st);
if (st != null || folder instanceof GmailFolder) {
doBatching = false;
messagesInCurBatch = folder.search(st);
// Searching can still take a while even though we're only pulling
// envelopes; unless you're using gmail server-side filter, which is
// fast
LOG.info("Searching folder " + folder.getName() + " for messages");
long searchAtMs = System.currentTimeMillis();
// If using GMail, speed up the envelope processing by doing a
// server-side
// search for messages occurring on or after the fetch date (at
// midnight),
// which reduces the number of envelopes we need to pull from the
// server
// to apply the precise DateTerm filter; GMail server-side search has
// date
// granularity only but the local filters are also applied
if (folder instanceof GmailFolder && fetchMailsSince != null) {
String afterCrit = "after:" + afterFmt.format(fetchMailsSince);
LOG.info("Added server-side gmail filter: " + afterCrit);
Message[] afterMessages = folder.search(new GmailRawSearchTerm(
afterCrit));
LOG.info("GMail server-side filter found " + afterMessages.length
+ " messages received " + afterCrit + " in folder " + folder.getName());
// now pass in the server-side filtered messages to the local filter
messagesInCurBatch = folder.search((st != null ? st : this), afterMessages);
} else {
messagesInCurBatch = folder.search(st);
}
totalInFolder = messagesInCurBatch.length;
folder.fetch(messagesInCurBatch, fp);
current = 0;
long tookMs = (System.currentTimeMillis() - searchAtMs);
LOG.info("Total messages : " + totalInFolder);
LOG.info("Search criteria applied. Batching disabled");
LOG.info("Search criteria applied. Batching disabled. Took " + tookMs + " (ms)");
} else {
totalInFolder = folder.getMessageCount();
LOG.info("Total messages : " + totalInFolder);
@ -442,60 +633,55 @@ public class MailEntityProcessor extends EntityProcessorBase {
}
} catch (MessagingException e) {
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
"Message retreival failed", e);
"Message retreival failed", e);
}
}
private void getNextBatch(int batchSize, Folder folder) throws MessagingException {
private void getNextBatch(int batchSize, Folder folder)
throws MessagingException {
// after each batch invalidate cache
if (messagesInCurBatch != null) {
for (Message m : messagesInCurBatch) {
if (m instanceof IMAPMessage)
((IMAPMessage) m).invalidateHeaders();
if (m instanceof IMAPMessage) ((IMAPMessage) m).invalidateHeaders();
}
}
int lastMsg = (currentBatch + 1) * batchSize;
lastMsg = lastMsg > totalInFolder ? totalInFolder : lastMsg;
messagesInCurBatch = folder.getMessages(currentBatch * batchSize + 1, lastMsg);
messagesInCurBatch = folder.getMessages(currentBatch * batchSize + 1,
lastMsg);
folder.fetch(messagesInCurBatch, fp);
current = 0;
currentBatch++;
LOG.info("Current Batch : " + currentBatch);
LOG.info("Messages in this batch : " + messagesInCurBatch.length);
}
@Override
public boolean hasNext() {
boolean hasMore = current < messagesInCurBatch.length;
if (!hasMore && doBatching
&& currentBatch * batchSize < totalInFolder) {
if (!hasMore && doBatching && currentBatch * batchSize < totalInFolder) {
// try next batch
try {
getNextBatch(batchSize, folder);
hasMore = current < messagesInCurBatch.length;
} catch (MessagingException e) {
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
"Message retreival failed", e);
throw new DataImportHandlerException(
DataImportHandlerException.SEVERE, "Message retreival failed", e);
}
}
return hasMore;
}
@Override
public Message next() {
return hasNext() ? messagesInCurBatch[current++] : null;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Its read only mode...");
}
private SearchTerm getSearchTerm() {
if (filters.size() == 0)
return null;
if (filters.size() == 1)
return filters.get(0).getCustomSearch(folder);
if (filters.size() == 0) return null;
if (filters.size() == 1) return filters.get(0).getCustomSearch(folder);
SearchTerm last = filters.get(0).getCustomSearch(folder);
for (int i = 1; i < filters.size(); i++) {
CustomFilter filter = filters.get(i);
@ -506,44 +692,83 @@ public class MailEntityProcessor extends EntityProcessorBase {
}
return last;
}
public boolean match(Message message) {
return true;
}
}
class MailsSinceLastCheckFilter implements CustomFilter {
private Date since;
public MailsSinceLastCheckFilter(Date date) {
since = date;
}
@Override
public SearchTerm getCustomSearch(Folder folder) {
return new ReceivedDateTerm(ComparisonTerm.GE, since);
@SuppressWarnings("serial")
public SearchTerm getCustomSearch(final Folder folder) {
LOG.info("Building mail filter for messages in " + folder.getName()
+ " that occur after " + sinceDateParser.format(since));
return new DateTerm(ComparisonTerm.GE, since) {
private int matched = 0;
private int seen = 0;
@Override
public boolean match(Message msg) {
boolean isMatch = false;
++seen;
try {
Date msgDate = msg.getReceivedDate();
if (msgDate == null) msgDate = msg.getSentDate();
if (msgDate != null && msgDate.getTime() >= since.getTime()) {
++matched;
isMatch = true;
} else {
String msgDateStr = (msgDate != null) ? sinceDateParser.format(msgDate) : "null";
String sinceDateStr = (since != null) ? sinceDateParser.format(since) : "null";
LOG.debug("Message " + msg.getSubject() + " was received at [" + msgDateStr
+ "], since filter is [" + sinceDateStr + "]");
}
} catch (MessagingException e) {
LOG.warn("Failed to process message due to: "+e, e);
}
if (seen % 100 == 0) {
LOG.info("Matched " + matched + " of " + seen + " messages since: "
+ sinceDateParser.format(since));
}
return isMatch;
}
};
}
}
// user settings stored in member variables
private String user;
private String password;
private String host;
private String protocol;
private String folderNames;
private List<String> exclude = new ArrayList<>();
private List<String> include = new ArrayList<>();
private boolean recurse;
private int batchSize;
private int fetchSize;
private int cTimeout;
private int rTimeout;
private Date fetchMailsSince;
private String customFilter;
private boolean processAttachment = true;
private Tika tika;
private boolean includeContent = true;
private boolean includeOtherUserFolders = false;
private boolean includeSharedFolders = false;
// holds the current state
private Store mailbox;
@ -553,16 +778,13 @@ public class MailEntityProcessor extends EntityProcessorBase {
private List<CustomFilter> filters = new ArrayList<>();
private static FetchProfile fp = new FetchProfile();
private static final Logger LOG = LoggerFactory.getLogger(DataImporter.class);
// diagnostics
private int rowCount = 0;
static {
fp.add(FetchProfile.Item.ENVELOPE);
fp.add(FetchProfile.Item.FLAGS);
fp.add("X-Mailer");
}
// Fields To Index
// single valued
private static final String MESSAGE_ID = "messageId";
@ -577,13 +799,14 @@ public class MailEntityProcessor extends EntityProcessorBase {
private static final String ATTACHMENT = "attachment";
private static final String ATTACHMENT_NAMES = "attachmentNames";
// flag values
private static final String FLAG_NONE = "none";
private static final String FLAG_ANSWERED = "answered";
private static final String FLAG_DELETED = "deleted";
private static final String FLAG_DRAFT = "draft";
private static final String FLAG_FLAGGED = "flagged";
private static final String FLAG_RECENT = "recent";
private static final String FLAG_SEEN = "seen";
private int getIntFromContext(String prop, int ifNull) {
int v = ifNull;
try {
@ -593,11 +816,11 @@ public class MailEntityProcessor extends EntityProcessorBase {
v = Integer.valueOf(val);
}
} catch (NumberFormatException e) {
//do nothing
// do nothing
}
return v;
}
private boolean getBoolFromContext(String prop, boolean ifNull) {
boolean v = ifNull;
String val = context.getEntityAttribute(prop);
@ -607,7 +830,7 @@ public class MailEntityProcessor extends EntityProcessorBase {
}
return v;
}
private String getStringFromContext(String prop, String ifNull) {
String v = ifNull;
String val = context.getEntityAttribute(prop);

View File

@ -18,151 +18,252 @@ package org.apache.solr.handler.dataimport;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.handler.dataimport.config.Entity;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import com.icegreen.greenmail.imap.ImapHostManager;
import com.icegreen.greenmail.store.MailFolder;
import com.icegreen.greenmail.user.GreenMailUser;
import com.icegreen.greenmail.util.GreenMail;
import com.icegreen.greenmail.util.GreenMailUtil;
import com.icegreen.greenmail.util.ServerSetup;
import com.icegreen.greenmail.imap.ImapConstants;
import java.io.IOException;
import java.net.ServerSocket;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
// Test mailbox is like this: foldername(mailcount)
// top1(2) -> child11(6)
// -> child12(0)
// top2(2) -> child21(1)
// -> grandchild211(2)
// -> grandchild212(1)
// -> child22(2)
import javax.mail.Flags;
import javax.mail.Session;
import javax.mail.internet.MimeMessage;
/**
* Test for MailEntityProcessor. The tests are marked as ignored because we'd need a mail server (real or mocked) for
* these to work.
*
* TODO: Find a way to make the tests actually test code
*
* Test for MailEntityProcessor; uses GreenMail embedded Java mail server.
*
* @see org.apache.solr.handler.dataimport.MailEntityProcessor
* @since solr 1.4
*/
public class TestMailEntityProcessor extends AbstractDataImportHandlerTestCase {
// Credentials
private static final String user = "user";
private static final String password = "password";
private static final String host = "host";
private static final String protocol = "imaps";
private static final String email = "test@localhost.com";
private static final String user = "test";
private static final String password = "secret";
private static final String protocol = "imap";
private static Map<String, String> paramMap = new HashMap<>();
// embedded test mail server
private ServerSetup serverSetup;
private GreenMail greenMail;
private GreenMailUser mailUser;
private String hostAndPort;
private String sep = ImapConstants.HIERARCHY_DELIMITER;
private Calendar cal = Calendar.getInstance();
/**
* Setup an embedded GreenMail server for testing.
*/
@Override
@Before
public void setUp() throws Exception {
super.setUp();
int port = findAvailablePort(9103,9193);
serverSetup = new ServerSetup(port, null, protocol);
greenMail = new GreenMail(serverSetup);
greenMail.start();
mailUser = greenMail.setUser(email, user, password);
hostAndPort = "localhost:"+port;
// Test mailbox is like this: foldername(mailcount)
// top1(2) -> child11(6)
// -> child12(0)
// top2(2) -> child21(1)
// -> grandchild211(2)
// -> grandchild212(1)
// -> child22(2)
ImapHostManager imapMgr = greenMail.getManagers().getImapHostManager();
setupFolder(imapMgr, "top1", 2);
setupFolder(imapMgr, "top1"+sep+"child11", 6);
setupFolder(imapMgr, "top2", 2);
setupFolder(imapMgr, "top2"+sep+"child21", 1);
setupFolder(imapMgr, "top2"+sep+"child21"+sep+"grandchild211", 2);
setupFolder(imapMgr, "top2"+sep+"child21"+sep+"grandchild212", 1);
setupFolder(imapMgr, "top2"+sep+"child22", 2);
setupFolder(imapMgr, "top3", 2);
}
private int findAvailablePort(int min, int max) {
for (int port = min; port < max; port++) {
try {
new ServerSocket(port).close();
return port;
} catch (IOException e) {
// Port is in use
}
}
throw new IllegalStateException("Could not find available port in range " + min + " to " + max);
}
@Override
@After
public void tearDown() throws Exception {
greenMail.stop();
super.tearDown();
}
/**
* Creates 1 or more messages in the specified folder.
*/
protected void setupFolder(ImapHostManager imapMgr, String folderName, int numMessages) throws Exception {
setupFolder(imapMgr, folderName, numMessages, 0);
}
protected void setupFolder(ImapHostManager imapMgr, String folderName, int numMessages, int startAt) throws Exception {
cal.setTimeInMillis(System.currentTimeMillis());
Date now = cal.getTime();
MailFolder folder = imapMgr.getFolder(mailUser, folderName, false);
if (folder == null)
folder = imapMgr.createMailbox(mailUser, folderName);
Session session = GreenMailUtil.getSession(serverSetup);
for (int m=0; m < numMessages; m++) {
int idx = m + startAt;
MimeMessage msg = new MimeMessage(session);
msg.setSubject("test"+idx);
msg.setFrom("from@localhost.com");
msg.setText("test"+idx);
msg.setSentDate(now);
folder.appendMessage(msg, new Flags(Flags.Flag.RECENT), now);
}
folder.getMessages();
}
@SuppressWarnings("unchecked")
@Test
@Ignore("Needs a Mock Mail Server to work")
public void testConnection() {
public void testConnection() throws Exception {
// also tests recurse = false and default settings
paramMap.put("folders", "top2");
Map<String, String> paramMap = new HashMap<>();
paramMap.put("folders", "top1");
paramMap.put("recurse", "false");
paramMap.put("processAttachement", "false");
paramMap.put("processAttachments", "false");
DataImporter di = new DataImporter();
di.loadAndInit(getConfigFromMap(paramMap));
Entity ent = di.getConfig().getEntities().get(0);
RequestInfo rp = new RequestInfo(null, createMap("command", "full-import"), null);
SolrWriterImpl swi = new SolrWriterImpl();
di.runCmd(rp, swi);
assertEquals("top1 did not return 2 messages", swi.docs.size(), 2);
assertEquals("top1 did not return 2 messages", 2, swi.docs.size());
}
@SuppressWarnings("unchecked")
@Test
@Ignore("Needs a Mock Mail Server to work")
public void testRecursion() {
Map<String, String> paramMap = new HashMap<>();
paramMap.put("folders", "top2");
paramMap.put("recurse", "true");
paramMap.put("processAttachement", "false");
paramMap.put("processAttachments", "false");
DataImporter di = new DataImporter();
di.loadAndInit(getConfigFromMap(paramMap));
Entity ent = di.getConfig().getEntities().get(0);
RequestInfo rp = new RequestInfo(null, createMap("command", "full-import"), null);
SolrWriterImpl swi = new SolrWriterImpl();
di.runCmd(rp, swi);
assertEquals("top2 and its children did not return 8 messages", swi.docs.size(), 8);
assertEquals("top2 and its children did not return 8 messages", 8, swi.docs.size());
}
@SuppressWarnings("unchecked")
@Test
@Ignore("Needs a Mock Mail Server to work")
public void testExclude() {
Map<String, String> paramMap = new HashMap<>();
paramMap.put("folders", "top2");
paramMap.put("recurse", "true");
paramMap.put("processAttachement", "false");
paramMap.put("processAttachments", "false");
paramMap.put("exclude", ".*grandchild.*");
DataImporter di = new DataImporter();
di.loadAndInit(getConfigFromMap(paramMap));
Entity ent = di.getConfig().getEntities().get(0);
RequestInfo rp = new RequestInfo(null, createMap("command", "full-import"), null);
SolrWriterImpl swi = new SolrWriterImpl();
di.runCmd(rp, swi);
assertEquals("top2 and its direct children did not return 5 messages", swi.docs.size(), 5);
assertEquals("top2 and its direct children did not return 5 messages", 5, swi.docs.size());
}
@SuppressWarnings("unchecked")
@Test
@Ignore("Needs a Mock Mail Server to work")
public void testInclude() {
Map<String, String> paramMap = new HashMap<>();
paramMap.put("folders", "top2");
paramMap.put("recurse", "true");
paramMap.put("processAttachement", "false");
paramMap.put("processAttachments", "false");
paramMap.put("include", ".*grandchild.*");
DataImporter di = new DataImporter();
di.loadAndInit(getConfigFromMap(paramMap));
Entity ent = di.getConfig().getEntities().get(0);
RequestInfo rp = new RequestInfo(null, createMap("command", "full-import"), null);
SolrWriterImpl swi = new SolrWriterImpl();
di.runCmd(rp, swi);
assertEquals("top2 and its direct children did not return 3 messages", swi.docs.size(), 3);
assertEquals("top2 and its direct children did not return 3 messages", 3, swi.docs.size());
}
@SuppressWarnings("unchecked")
@Test
@Ignore("Needs a Mock Mail Server to work")
public void testIncludeAndExclude() {
Map<String, String> paramMap = new HashMap<>();
paramMap.put("folders", "top1,top2");
paramMap.put("recurse", "true");
paramMap.put("processAttachement", "false");
paramMap.put("processAttachments", "false");
paramMap.put("exclude", ".*top1.*");
paramMap.put("include", ".*grandchild.*");
DataImporter di = new DataImporter();
di.loadAndInit(getConfigFromMap(paramMap));
Entity ent = di.getConfig().getEntities().get(0);
RequestInfo rp = new RequestInfo(null, createMap("command", "full-import"), null);
SolrWriterImpl swi = new SolrWriterImpl();
di.runCmd(rp, swi);
assertEquals("top2 and its direct children did not return 3 messages", swi.docs.size(), 3);
assertEquals("top2 and its direct children did not return 3 messages", 3, swi.docs.size());
}
@SuppressWarnings("unchecked")
@Test
@Ignore("Needs a Mock Mail Server to work")
public void testFetchTimeSince() throws ParseException {
paramMap.put("folders", "top1/child11");
Map<String, String> paramMap = new HashMap<>();
paramMap.put("folders", "top1"+sep+"child11");
paramMap.put("recurse", "true");
paramMap.put("processAttachement", "false");
paramMap.put("processAttachments", "false");
paramMap.put("fetchMailsSince", "2008-12-26 00:00:00");
DataImporter di = new DataImporter();
di.loadAndInit(getConfigFromMap(paramMap));
Entity ent = di.getConfig().getEntities().get(0);
RequestInfo rp = new RequestInfo(null, createMap("command", "full-import"), null);
SolrWriterImpl swi = new SolrWriterImpl();
di.runCmd(rp, swi);
assertEquals("top2 and its direct children did not return 3 messages", swi.docs.size(), 3);
assertEquals("top1"+sep+"child11 and its direct children did not return 6 messages", 6, swi.docs.size());
}
// configures the data importer to use the MailEntityProcessor we're testing in this class
private String getConfigFromMap(Map<String, String> params) {
String conf =
"<dataConfig>" +
"<document>" +
"<entity processor=\"org.apache.solr.handler.dataimport.MailEntityProcessor\" " +
"someconfig" +
"/>" +
"</document>" +
"</dataConfig>";
"<dataConfig>" +
"<document>" +
"<entity processor=\"org.apache.solr.handler.dataimport.MailEntityProcessor\" name=\"mail_entity\" " +
"someconfig" +
">" +
"<field column=\"messageId\" name=\"id\" />"+
"<field column=\"folder\" name=\"folder_s\" />"+
"<field column=\"subject\" name=\"subject_s\" />"+
"<field column=\"sentDate\" name=\"date_dt\" />"+
"<field column=\"from\" name=\"from_s\" />"+
"<field column=\"content\" name=\"content_s\" />"+
"</entity>" +
"</document>" +
"</dataConfig>";
params.put("user", user);
params.put("password", password);
params.put("host", host);
params.put("host", hostAndPort);
params.put("protocol", protocol);
StringBuilder attribs = new StringBuilder("");
for (String key : params.keySet())
@ -171,6 +272,7 @@ public class TestMailEntityProcessor extends AbstractDataImportHandlerTestCase {
return conf.replace("someconfig", attribs.toString());
}
// collects documents written by the DataImporter (from the MailEntityProcessor)
static class SolrWriterImpl extends SolrWriter {
List<SolrInputDocument> docs = new ArrayList<>();
Boolean deleteAllCalled;
@ -181,7 +283,12 @@ public class TestMailEntityProcessor extends AbstractDataImportHandlerTestCase {
}
@Override
public boolean upload(SolrInputDocument doc) {
public void close() {
// no-op method to avoid NPE in super impl
}
@Override
public boolean upload(SolrInputDocument doc) {
return docs.add(doc);
}

View File

@ -24,7 +24,8 @@
</configurations>
<dependencies>
<dependency org="javax.activation" name="activation" rev="${/javax.activation/activation}" conf="compile->*"/>
<dependency org="javax.mail" name="mail" rev="${/javax.mail/mail}" conf="compile->*"/>
<dependency org="com.sun.mail" name="javax.mail" rev="${/com.sun.mail/javax.mail}" conf="compile->*"/>
<dependency org="com.sun.mail" name="gimap" rev="${/com.sun.mail/gimap}" conf="compile->*"/>
<dependency org="hsqldb" name="hsqldb" rev="${/hsqldb/hsqldb}" conf="test->*"/>
<dependency org="org.apache.derby" name="derby" rev="${/org.apache.derby/derby}" conf="test->*"/>

View File

@ -35,7 +35,7 @@ To import data from the slashdot feed, connect to
To import data from your imap server
1. Edit the example-DIH/solr/mail/conf/data-config.xml and add details about username, password, imap server
1. Edit the example-DIH/solr/mail/conf/mail-data-config.xml and add details about username, password, imap server
2. Connect to http://localhost:8983/solr/mail/dataimport?command=full-import
To copy data from db Solr core, connect to

View File

@ -0,0 +1,12 @@
<dataConfig>
<document>
<!--
Note - In order to index attachments, set processAttachement="true" and drop
Tika and its dependencies to example-DIH/solr/mail/lib directory
-->
<entity processor="MailEntityProcessor" user="email@gmail.com"
password="password" host="imap.gmail.com" protocol="gimaps"
fetchMailsSince="2014-06-30 00:00:00" batchSize="20" folders="inbox" processAttachement="false"
name="mail_entity"/>
</document>
</dataConfig>