diff --git a/contrib/dataimporthandler/CHANGES.txt b/contrib/dataimporthandler/CHANGES.txt index 9ff188b07bc..8dc1827a056 100644 --- a/contrib/dataimporthandler/CHANGES.txt +++ b/contrib/dataimporthandler/CHANGES.txt @@ -75,6 +75,9 @@ New Features 17.SOLR-996: Expose Context to Evaluators. (Noble Paul, shalin) +18.SOLR-783: Enhance delta-imports by maintaining separate last_index_time for each entity. + (Jon Baer, Noble Paul via shalin) + Optimizations ---------------------- 1. SOLR-846: Reduce memory consumption during delta import by removing keys when used diff --git a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java index 9926ab1e90e..27b3d9bd440 100644 --- a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java +++ b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java @@ -19,6 +19,7 @@ package org.apache.solr.handler.dataimport; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.core.SolrCore; +import static org.apache.solr.handler.dataimport.SolrWriter.LAST_INDEX_KEY; import org.apache.solr.schema.SchemaField; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,15 +63,17 @@ public class DocBuilder { static final ThreadLocal INSTANCE = new ThreadLocal(); Map functionsNamespace; + private Properties persistedProperties; - public DocBuilder(DataImporter context, SolrWriter writer, DataImporter.RequestParams reqParams) { + public DocBuilder(DataImporter dataImporter, SolrWriter writer, DataImporter.RequestParams reqParams) { INSTANCE.set(this); - this.dataImporter = context; + this.dataImporter = dataImporter; this.writer = writer; DataImporter.QUERY_COUNT.set(importStatistics.queryCount); requestParameters = reqParams; verboseDebug = requestParameters.debug && requestParameters.verbose; - functionsNamespace = EvaluatorBag.getFunctionsNamespace(dataImporter.getConfig().functions, this); + functionsNamespace = EvaluatorBag.getFunctionsNamespace(this.dataImporter.getConfig().functions, this); + persistedProperties = writer.readIndexerProperties(); } public VariableResolverImpl getVariableResolver() { @@ -82,6 +85,13 @@ public class DocBuilder { indexerNamespace.put(INDEX_START_TIME, dataImporter.getIndexStartTime()); indexerNamespace.put("request", requestParameters.requestParams); indexerNamespace.put("functions", functionsNamespace); + for (DataConfig.Entity entity : dataImporter.getConfig().document.entities) { + String key = entity.name + "." + SolrWriter.LAST_INDEX_KEY; + String lastIndex = persistedProperties.getProperty(key); + if (lastIndex != null) { + indexerNamespace.put(key, lastIndex); + } + } if (dataImporter.getConfig().script != null) { indexerNamespace.put(DataConfig.SCRIPT, dataImporter.getConfig().script.script); indexerNamespace.put(DataConfig.SCRIPT_LANG, dataImporter.getConfig().script.language); @@ -133,10 +143,14 @@ public class DocBuilder { } AtomicBoolean fullCleanDone = new AtomicBoolean(false); //we must not do a delete of *:* multiple times if there are multiple root entities to be run + Properties lastIndexTimeProps = new Properties(); + lastIndexTimeProps.setProperty(LAST_INDEX_KEY, + DataImporter.DATE_TIME_FORMAT.get().format(dataImporter.getIndexStartTime())); for (DataConfig.Entity e : document.entities) { if (entities != null && !entities.contains(e.name)) continue; - + lastIndexTimeProps.setProperty(e.name + "." + LAST_INDEX_KEY, + DataImporter.DATE_TIME_FORMAT.get().format(new Date())); root = e; String delQuery = e.allAttributes.get("preImportDeleteQuery"); if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP @@ -168,11 +182,11 @@ public class DocBuilder { // Do not commit unnecessarily if this is a delta-import and no documents were created or deleted if (!requestParameters.clean) { if (importStatistics.docCount.get() > 0 || importStatistics.deletedDocCount.get() > 0) { - commit(); + commit(lastIndexTimeProps); } } else { // Finished operation normally, commit now - commit(); + commit(lastIndexTimeProps); } if (document.onImportEnd != null) { invokeEventListener(document.onImportEnd); @@ -185,9 +199,7 @@ public class DocBuilder { } @SuppressWarnings("unchecked") - private void commit() { - if (requestParameters.commit) - writer.persistIndexStartTime(dataImporter.getIndexStartTime()); + private void commit(Properties lastIndexTimeProps) { LOG.info("Full Import completed successfully"); statusMessages.put("", "Indexing completed. Added/Updated: " + importStatistics.docCount + " documents. Deleted " @@ -196,7 +208,8 @@ public class DocBuilder { addStatusMessage("Committed"); if (requestParameters.optimize) addStatusMessage("Optimized"); - + if (requestParameters.commit) + writer.persist(lastIndexTimeProps); } void rollback() { @@ -253,7 +266,6 @@ public class DocBuilder { } if (!stop.get()) { - writer.persistIndexStartTime(dataImporter.getIndexStartTime()); LOG.info("Delta Import completed successfully"); } } @@ -336,7 +348,7 @@ public class DocBuilder { if (entity.isDocRoot) { if (seenDocCount <= requestParameters.start) continue; - if (seenDocCount > requestParameters.start + requestParameters.rows) { + if (seenDocCount > requestParameters.start + requestParameters.rows) { LOG.info("Indexing stopped at docCount = " + importStatistics.docCount); break; } diff --git a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/SolrWriter.java b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/SolrWriter.java index 26e4429b60f..dc92ece4028 100644 --- a/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/SolrWriter.java +++ b/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/SolrWriter.java @@ -31,10 +31,7 @@ import java.util.Date; import java.util.Properties; /** - *

- * Writes documents to SOLR as well as provides methods for loading and - * persisting last index time. - *

+ *

Writes documents to SOLR as well as provides methods for loading and persisting last index time.

*

* This API is experimental and may change in the future. * @@ -92,29 +89,14 @@ public class SolrWriter { } } - Date getStartTime() { - Properties props = readIndexerProperties(); - String result = props.getProperty(SolrWriter.LAST_INDEX_KEY); - try { - if (result != null) - return DataImporter.DATE_TIME_FORMAT.get().parse(result); - } catch (ParseException e) { - throw new DataImportHandlerException(DataImportHandlerException.WARN, - "Unable to read last indexed time from: " - + SolrWriter.IMPORTER_PROPERTIES, e); - } - return null; - } - - private void persistStartTime(Date date) { + void persist(Properties p) { OutputStream propOutput = null; Properties props = readIndexerProperties(); try { - props.put(SolrWriter.LAST_INDEX_KEY, - DataImporter.DATE_TIME_FORMAT.get().format(date)); + props.putAll(p); String filePath = configDir; if (configDir != null && !configDir.endsWith(File.separator)) filePath += File.separator; @@ -138,7 +120,7 @@ public class SolrWriter { } } - private Properties readIndexerProperties() { + Properties readIndexerProperties() { Properties props = new Properties(); InputStream propInput = null; @@ -183,7 +165,7 @@ public class SolrWriter { } } - public void rollback() { + public void rollback() { try { RollbackUpdateCommand rollback = new RollbackUpdateCommand(); processor.processRollback(rollback); @@ -236,20 +218,19 @@ public class SolrWriter { } public Date loadIndexStartTime() { - return this.getStartTime(); - } + Properties props; + props = readIndexerProperties(); + String result = props.getProperty(SolrWriter.LAST_INDEX_KEY); - /** - *

- * Stores the last indexed time into the IMPORTER_PROPERTIES - * file. If any properties are already defined in the file, then they are - * preserved. - *

- * - * @param date the Date instance to be persisted - */ - public void persistIndexStartTime(Date date) { - this.persistStartTime(date); + try { + if (result != null) + return DataImporter.DATE_TIME_FORMAT.get().parse(result); + } catch (ParseException e) { + throw new DataImportHandlerException(DataImportHandlerException.WARN, + "Unable to read last indexed time from: " + + SolrWriter.IMPORTER_PROPERTIES, e); + } + return null; } /**