mirror of https://github.com/apache/lucene.git
SOLR-783 -- Enhance delta-imports by maintaining separate last_index_time for each entity
git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@746189 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5414333861
commit
831a3026de
|
@ -75,6 +75,9 @@ New Features
|
|||
17.SOLR-996: Expose Context to Evaluators.
|
||||
(Noble Paul, shalin)
|
||||
|
||||
18.SOLR-783: Enhance delta-imports by maintaining separate last_index_time for each entity.
|
||||
(Jon Baer, Noble Paul via shalin)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
1. SOLR-846: Reduce memory consumption during delta import by removing keys when used
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.solr.handler.dataimport;
|
|||
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import static org.apache.solr.handler.dataimport.SolrWriter.LAST_INDEX_KEY;
|
||||
import org.apache.solr.schema.SchemaField;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -62,15 +63,17 @@ public class DocBuilder {
|
|||
|
||||
static final ThreadLocal<DocBuilder> INSTANCE = new ThreadLocal<DocBuilder>();
|
||||
Map<String, Object> functionsNamespace;
|
||||
private Properties persistedProperties;
|
||||
|
||||
public DocBuilder(DataImporter context, SolrWriter writer, DataImporter.RequestParams reqParams) {
|
||||
public DocBuilder(DataImporter dataImporter, SolrWriter writer, DataImporter.RequestParams reqParams) {
|
||||
INSTANCE.set(this);
|
||||
this.dataImporter = context;
|
||||
this.dataImporter = dataImporter;
|
||||
this.writer = writer;
|
||||
DataImporter.QUERY_COUNT.set(importStatistics.queryCount);
|
||||
requestParameters = reqParams;
|
||||
verboseDebug = requestParameters.debug && requestParameters.verbose;
|
||||
functionsNamespace = EvaluatorBag.getFunctionsNamespace(dataImporter.getConfig().functions, this);
|
||||
functionsNamespace = EvaluatorBag.getFunctionsNamespace(this.dataImporter.getConfig().functions, this);
|
||||
persistedProperties = writer.readIndexerProperties();
|
||||
}
|
||||
|
||||
public VariableResolverImpl getVariableResolver() {
|
||||
|
@ -82,6 +85,13 @@ public class DocBuilder {
|
|||
indexerNamespace.put(INDEX_START_TIME, dataImporter.getIndexStartTime());
|
||||
indexerNamespace.put("request", requestParameters.requestParams);
|
||||
indexerNamespace.put("functions", functionsNamespace);
|
||||
for (DataConfig.Entity entity : dataImporter.getConfig().document.entities) {
|
||||
String key = entity.name + "." + SolrWriter.LAST_INDEX_KEY;
|
||||
String lastIndex = persistedProperties.getProperty(key);
|
||||
if (lastIndex != null) {
|
||||
indexerNamespace.put(key, lastIndex);
|
||||
}
|
||||
}
|
||||
if (dataImporter.getConfig().script != null) {
|
||||
indexerNamespace.put(DataConfig.SCRIPT, dataImporter.getConfig().script.script);
|
||||
indexerNamespace.put(DataConfig.SCRIPT_LANG, dataImporter.getConfig().script.language);
|
||||
|
@ -133,10 +143,14 @@ public class DocBuilder {
|
|||
}
|
||||
AtomicBoolean fullCleanDone = new AtomicBoolean(false);
|
||||
//we must not do a delete of *:* multiple times if there are multiple root entities to be run
|
||||
Properties lastIndexTimeProps = new Properties();
|
||||
lastIndexTimeProps.setProperty(LAST_INDEX_KEY,
|
||||
DataImporter.DATE_TIME_FORMAT.get().format(dataImporter.getIndexStartTime()));
|
||||
for (DataConfig.Entity e : document.entities) {
|
||||
if (entities != null && !entities.contains(e.name))
|
||||
continue;
|
||||
|
||||
lastIndexTimeProps.setProperty(e.name + "." + LAST_INDEX_KEY,
|
||||
DataImporter.DATE_TIME_FORMAT.get().format(new Date()));
|
||||
root = e;
|
||||
String delQuery = e.allAttributes.get("preImportDeleteQuery");
|
||||
if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP
|
||||
|
@ -168,11 +182,11 @@ public class DocBuilder {
|
|||
// Do not commit unnecessarily if this is a delta-import and no documents were created or deleted
|
||||
if (!requestParameters.clean) {
|
||||
if (importStatistics.docCount.get() > 0 || importStatistics.deletedDocCount.get() > 0) {
|
||||
commit();
|
||||
commit(lastIndexTimeProps);
|
||||
}
|
||||
} else {
|
||||
// Finished operation normally, commit now
|
||||
commit();
|
||||
commit(lastIndexTimeProps);
|
||||
}
|
||||
if (document.onImportEnd != null) {
|
||||
invokeEventListener(document.onImportEnd);
|
||||
|
@ -185,9 +199,7 @@ public class DocBuilder {
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void commit() {
|
||||
if (requestParameters.commit)
|
||||
writer.persistIndexStartTime(dataImporter.getIndexStartTime());
|
||||
private void commit(Properties lastIndexTimeProps) {
|
||||
LOG.info("Full Import completed successfully");
|
||||
statusMessages.put("", "Indexing completed. Added/Updated: "
|
||||
+ importStatistics.docCount + " documents. Deleted "
|
||||
|
@ -196,7 +208,8 @@ public class DocBuilder {
|
|||
addStatusMessage("Committed");
|
||||
if (requestParameters.optimize)
|
||||
addStatusMessage("Optimized");
|
||||
|
||||
if (requestParameters.commit)
|
||||
writer.persist(lastIndexTimeProps);
|
||||
}
|
||||
|
||||
void rollback() {
|
||||
|
@ -253,7 +266,6 @@ public class DocBuilder {
|
|||
}
|
||||
|
||||
if (!stop.get()) {
|
||||
writer.persistIndexStartTime(dataImporter.getIndexStartTime());
|
||||
LOG.info("Delta Import completed successfully");
|
||||
}
|
||||
}
|
||||
|
@ -336,7 +348,7 @@ public class DocBuilder {
|
|||
if (entity.isDocRoot) {
|
||||
if (seenDocCount <= requestParameters.start)
|
||||
continue;
|
||||
if (seenDocCount > requestParameters.start + requestParameters.rows) {
|
||||
if (seenDocCount > requestParameters.start + requestParameters.rows) {
|
||||
LOG.info("Indexing stopped at docCount = " + importStatistics.docCount);
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -31,10 +31,7 @@ import java.util.Date;
|
|||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Writes documents to SOLR as well as provides methods for loading and
|
||||
* persisting last index time.
|
||||
* </p>
|
||||
* <p> Writes documents to SOLR as well as provides methods for loading and persisting last index time. </p>
|
||||
* <p/>
|
||||
* <b>This API is experimental and may change in the future.</b>
|
||||
*
|
||||
|
@ -92,29 +89,14 @@ public class SolrWriter {
|
|||
}
|
||||
}
|
||||
|
||||
Date getStartTime() {
|
||||
Properties props = readIndexerProperties();
|
||||
String result = props.getProperty(SolrWriter.LAST_INDEX_KEY);
|
||||
|
||||
try {
|
||||
if (result != null)
|
||||
return DataImporter.DATE_TIME_FORMAT.get().parse(result);
|
||||
} catch (ParseException e) {
|
||||
throw new DataImportHandlerException(DataImportHandlerException.WARN,
|
||||
"Unable to read last indexed time from: "
|
||||
+ SolrWriter.IMPORTER_PROPERTIES, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void persistStartTime(Date date) {
|
||||
void persist(Properties p) {
|
||||
OutputStream propOutput = null;
|
||||
|
||||
Properties props = readIndexerProperties();
|
||||
|
||||
try {
|
||||
props.put(SolrWriter.LAST_INDEX_KEY,
|
||||
DataImporter.DATE_TIME_FORMAT.get().format(date));
|
||||
props.putAll(p);
|
||||
String filePath = configDir;
|
||||
if (configDir != null && !configDir.endsWith(File.separator))
|
||||
filePath += File.separator;
|
||||
|
@ -138,7 +120,7 @@ public class SolrWriter {
|
|||
}
|
||||
}
|
||||
|
||||
private Properties readIndexerProperties() {
|
||||
Properties readIndexerProperties() {
|
||||
Properties props = new Properties();
|
||||
InputStream propInput = null;
|
||||
|
||||
|
@ -183,7 +165,7 @@ public class SolrWriter {
|
|||
}
|
||||
}
|
||||
|
||||
public void rollback() {
|
||||
public void rollback() {
|
||||
try {
|
||||
RollbackUpdateCommand rollback = new RollbackUpdateCommand();
|
||||
processor.processRollback(rollback);
|
||||
|
@ -236,20 +218,19 @@ public class SolrWriter {
|
|||
}
|
||||
|
||||
public Date loadIndexStartTime() {
|
||||
return this.getStartTime();
|
||||
}
|
||||
Properties props;
|
||||
props = readIndexerProperties();
|
||||
String result = props.getProperty(SolrWriter.LAST_INDEX_KEY);
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Stores the last indexed time into the <code>IMPORTER_PROPERTIES</code>
|
||||
* file. If any properties are already defined in the file, then they are
|
||||
* preserved.
|
||||
* </p>
|
||||
*
|
||||
* @param date the Date instance to be persisted
|
||||
*/
|
||||
public void persistIndexStartTime(Date date) {
|
||||
this.persistStartTime(date);
|
||||
try {
|
||||
if (result != null)
|
||||
return DataImporter.DATE_TIME_FORMAT.get().parse(result);
|
||||
} catch (ParseException e) {
|
||||
throw new DataImportHandlerException(DataImportHandlerException.WARN,
|
||||
"Unable to read last indexed time from: "
|
||||
+ SolrWriter.IMPORTER_PROPERTIES, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue