mirror of https://github.com/apache/lucene.git
SOLR-2382-abstracted DIHWriter
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1149477 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
dcaca50a86
commit
b20968241a
|
@ -196,10 +196,6 @@ public class TestMailEntityProcessor extends AbstractDataImportHandlerTestCase {
|
|||
return docs.add(doc);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void log(int event, String name, Object row) {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doDeleteAll() {
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
package org.apache.solr.handler.dataimport;
|
||||
|
||||
public enum DIHLogLevels {
|
||||
START_ENTITY, END_ENTITY, TRANSFORMED_ROW, ENTITY_META, PRE_TRANSFORMER_ROW, START_DOC, END_DOC, ENTITY_OUT, ROW_END, TRANSFORMER_EXCEPTION, ENTITY_EXCEPTION, DISABLE_LOGGING, ENABLE_LOGGING, NONE
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
package org.apache.solr.handler.dataimport;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
|
||||
/**
|
||||
* @solr.experimental
|
||||
*
|
||||
*/
|
||||
public interface DIHWriter {
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* If this writer supports transactions or commit points, then commit any changes,
|
||||
* optionally optimizing the data for read/write performance
|
||||
* </p>
|
||||
* @param optimize
|
||||
*/
|
||||
public void commit(boolean optimize);
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Release resources used by this writer. After calling close, reads & updates will throw exceptions.
|
||||
* </p>
|
||||
*/
|
||||
public void close();
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* If this writer supports transactions or commit points, then roll back any uncommitted changes.
|
||||
* </p>
|
||||
*/
|
||||
public void rollback();
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Delete from the writer's underlying data store based the passed-in writer-specific query. (Optional Operation)
|
||||
* </p>
|
||||
* @param q
|
||||
*/
|
||||
public void deleteByQuery(String q);
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Delete everything from the writer's underlying data store
|
||||
* </p>
|
||||
*/
|
||||
public void doDeleteAll();
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Delete from the writer's underlying data store based on the passed-in Primary Key
|
||||
* </p>
|
||||
* @param key
|
||||
*/
|
||||
public void deleteDoc(Object key);
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Add a document to this writer's underlying data store.
|
||||
* </p>
|
||||
* @param doc
|
||||
* @return
|
||||
*/
|
||||
public boolean upload(SolrInputDocument doc);
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Provide context information for this writer. init() should be called before using the writer.
|
||||
* </p>
|
||||
* @param context
|
||||
*/
|
||||
public void init(Context context) ;
|
||||
|
||||
}
|
|
@ -45,7 +45,7 @@ class DebugLogger {
|
|||
private Stack<DebugInfo> debugStack;
|
||||
|
||||
NamedList output;
|
||||
private final SolrWriter writer;
|
||||
// private final SolrWriter writer1;
|
||||
|
||||
private static final String LINE = "---------------------------------------------";
|
||||
|
||||
|
@ -54,8 +54,8 @@ class DebugLogger {
|
|||
|
||||
boolean enabled = true;
|
||||
|
||||
public DebugLogger(SolrWriter solrWriter) {
|
||||
writer = solrWriter;
|
||||
public DebugLogger() {
|
||||
// writer = solrWriter;
|
||||
output = new NamedList();
|
||||
debugStack = new Stack<DebugInfo>() {
|
||||
|
||||
|
@ -67,7 +67,7 @@ class DebugLogger {
|
|||
return super.pop();
|
||||
}
|
||||
};
|
||||
debugStack.push(new DebugInfo(null, -1, null));
|
||||
debugStack.push(new DebugInfo(null, DIHLogLevels.NONE, null));
|
||||
output = debugStack.peek().lst;
|
||||
}
|
||||
|
||||
|
@ -75,47 +75,47 @@ class DebugLogger {
|
|||
return debugStack.isEmpty() ? null : debugStack.peek();
|
||||
}
|
||||
|
||||
public void log(int event, String name, Object row) {
|
||||
if (event == SolrWriter.DISABLE_LOGGING) {
|
||||
public void log(DIHLogLevels event, String name, Object row) {
|
||||
if (event == DIHLogLevels.DISABLE_LOGGING) {
|
||||
enabled = false;
|
||||
return;
|
||||
} else if (event == SolrWriter.ENABLE_LOGGING) {
|
||||
} else if (event == DIHLogLevels.ENABLE_LOGGING) {
|
||||
enabled = true;
|
||||
return;
|
||||
}
|
||||
|
||||
if (!enabled && event != SolrWriter.START_ENTITY
|
||||
&& event != SolrWriter.END_ENTITY) {
|
||||
if (!enabled && event != DIHLogLevels.START_ENTITY
|
||||
&& event != DIHLogLevels.END_ENTITY) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (event == SolrWriter.START_DOC) {
|
||||
debugStack.push(new DebugInfo(null, SolrWriter.START_DOC, peekStack()));
|
||||
} else if (SolrWriter.START_ENTITY == event) {
|
||||
if (event == DIHLogLevels.START_DOC) {
|
||||
debugStack.push(new DebugInfo(null, DIHLogLevels.START_DOC, peekStack()));
|
||||
} else if (DIHLogLevels.START_ENTITY == event) {
|
||||
debugStack
|
||||
.push(new DebugInfo(name, SolrWriter.START_ENTITY, peekStack()));
|
||||
} else if (SolrWriter.ENTITY_OUT == event
|
||||
|| SolrWriter.PRE_TRANSFORMER_ROW == event) {
|
||||
if (debugStack.peek().type == SolrWriter.START_ENTITY
|
||||
|| debugStack.peek().type == SolrWriter.START_DOC) {
|
||||
.push(new DebugInfo(name, DIHLogLevels.START_ENTITY, peekStack()));
|
||||
} else if (DIHLogLevels.ENTITY_OUT == event
|
||||
|| DIHLogLevels.PRE_TRANSFORMER_ROW == event) {
|
||||
if (debugStack.peek().type == DIHLogLevels.START_ENTITY
|
||||
|| debugStack.peek().type == DIHLogLevels.START_DOC) {
|
||||
debugStack.peek().lst.add(null, fmt.format(new Object[]{++debugStack
|
||||
.peek().rowCount}));
|
||||
addToNamedList(debugStack.peek().lst, row);
|
||||
debugStack.peek().lst.add(null, LINE);
|
||||
}
|
||||
} else if (event == SolrWriter.ROW_END) {
|
||||
} else if (event == DIHLogLevels.ROW_END) {
|
||||
popAllTransformers();
|
||||
} else if (SolrWriter.END_ENTITY == event) {
|
||||
while (debugStack.pop().type != SolrWriter.START_ENTITY)
|
||||
} else if (DIHLogLevels.END_ENTITY == event) {
|
||||
while (debugStack.pop().type != DIHLogLevels.START_ENTITY)
|
||||
;
|
||||
} else if (SolrWriter.END_DOC == event) {
|
||||
while (debugStack.pop().type != SolrWriter.START_DOC)
|
||||
} else if (DIHLogLevels.END_DOC == event) {
|
||||
while (debugStack.pop().type != DIHLogLevels.START_DOC)
|
||||
;
|
||||
} else if (event == SolrWriter.TRANSFORMER_EXCEPTION) {
|
||||
} else if (event == DIHLogLevels.TRANSFORMER_EXCEPTION) {
|
||||
debugStack.push(new DebugInfo(name, event, peekStack()));
|
||||
debugStack.peek().lst.add("EXCEPTION",
|
||||
getStacktraceString((Exception) row));
|
||||
} else if (SolrWriter.TRANSFORMED_ROW == event) {
|
||||
} else if (DIHLogLevels.TRANSFORMED_ROW == event) {
|
||||
debugStack.push(new DebugInfo(name, event, peekStack()));
|
||||
debugStack.peek().lst.add(null, LINE);
|
||||
addToNamedList(debugStack.peek().lst, row);
|
||||
|
@ -124,10 +124,10 @@ class DebugLogger {
|
|||
DataImportHandlerException dataImportHandlerException = (DataImportHandlerException) row;
|
||||
dataImportHandlerException.debugged = true;
|
||||
}
|
||||
} else if (SolrWriter.ENTITY_META == event) {
|
||||
} else if (DIHLogLevels.ENTITY_META == event) {
|
||||
popAllTransformers();
|
||||
debugStack.peek().lst.add(name, row);
|
||||
} else if (SolrWriter.ENTITY_EXCEPTION == event) {
|
||||
} else if (DIHLogLevels.ENTITY_EXCEPTION == event) {
|
||||
if (row instanceof DataImportHandlerException) {
|
||||
DataImportHandlerException dihe = (DataImportHandlerException) row;
|
||||
if (dihe.debugged)
|
||||
|
@ -143,8 +143,8 @@ class DebugLogger {
|
|||
|
||||
private void popAllTransformers() {
|
||||
while (true) {
|
||||
int type = debugStack.peek().type;
|
||||
if (type == SolrWriter.START_DOC || type == SolrWriter.START_ENTITY)
|
||||
DIHLogLevels type = debugStack.peek().type;
|
||||
if (type == DIHLogLevels.START_DOC || type == DIHLogLevels.START_ENTITY)
|
||||
break;
|
||||
debugStack.pop();
|
||||
}
|
||||
|
@ -181,23 +181,23 @@ class DebugLogger {
|
|||
|
||||
@Override
|
||||
public Object getData(String query) {
|
||||
writer.log(SolrWriter.ENTITY_META, "query", query);
|
||||
log(DIHLogLevels.ENTITY_META, "query", query);
|
||||
long start = System.currentTimeMillis();
|
||||
try {
|
||||
return ds.getData(query);
|
||||
} catch (DataImportHandlerException de) {
|
||||
writer.log(SolrWriter.ENTITY_EXCEPTION,
|
||||
log(DIHLogLevels.ENTITY_EXCEPTION,
|
||||
null, de);
|
||||
throw de;
|
||||
} catch (Exception e) {
|
||||
writer.log(SolrWriter.ENTITY_EXCEPTION,
|
||||
log(DIHLogLevels.ENTITY_EXCEPTION,
|
||||
null, e);
|
||||
DataImportHandlerException de = new DataImportHandlerException(
|
||||
DataImportHandlerException.SEVERE, "", e);
|
||||
de.debugged = true;
|
||||
throw de;
|
||||
} finally {
|
||||
writer.log(SolrWriter.ENTITY_META, "time-taken", DocBuilder
|
||||
log(DIHLogLevels.ENTITY_META, "time-taken", DocBuilder
|
||||
.getTimeElapsedSince(start));
|
||||
}
|
||||
}
|
||||
|
@ -208,18 +208,18 @@ class DebugLogger {
|
|||
return new Transformer() {
|
||||
@Override
|
||||
public Object transformRow(Map<String, Object> row, Context context) {
|
||||
writer.log(SolrWriter.PRE_TRANSFORMER_ROW, null, row);
|
||||
log(DIHLogLevels.PRE_TRANSFORMER_ROW, null, row);
|
||||
String tName = getTransformerName(t);
|
||||
Object result = null;
|
||||
try {
|
||||
result = t.transformRow(row, context);
|
||||
writer.log(SolrWriter.TRANSFORMED_ROW, tName, result);
|
||||
log(DIHLogLevels.TRANSFORMED_ROW, tName, result);
|
||||
} catch (DataImportHandlerException de) {
|
||||
writer.log(SolrWriter.TRANSFORMER_EXCEPTION, tName, de);
|
||||
log(DIHLogLevels.TRANSFORMER_EXCEPTION, tName, de);
|
||||
de.debugged = true;
|
||||
throw de;
|
||||
} catch (Exception e) {
|
||||
writer.log(SolrWriter.TRANSFORMER_EXCEPTION, tName, e);
|
||||
log(DIHLogLevels.TRANSFORMER_EXCEPTION, tName, e);
|
||||
DataImportHandlerException de = new DataImportHandlerException(DataImportHandlerException.SEVERE, "", e);
|
||||
de.debugged = true;
|
||||
throw de;
|
||||
|
@ -258,23 +258,23 @@ class DebugLogger {
|
|||
|
||||
NamedList lst;
|
||||
|
||||
int type;
|
||||
DIHLogLevels type;
|
||||
|
||||
DebugInfo parent;
|
||||
|
||||
public DebugInfo(String name, int type, DebugInfo parent) {
|
||||
public DebugInfo(String name, DIHLogLevels type, DebugInfo parent) {
|
||||
this.name = name;
|
||||
this.type = type;
|
||||
this.parent = parent;
|
||||
lst = new NamedList();
|
||||
if (parent != null) {
|
||||
String displayName = null;
|
||||
if (type == SolrWriter.START_ENTITY) {
|
||||
if (type == DIHLogLevels.START_ENTITY) {
|
||||
displayName = "entity:" + name;
|
||||
} else if (type == SolrWriter.TRANSFORMED_ROW
|
||||
|| type == SolrWriter.TRANSFORMER_EXCEPTION) {
|
||||
} else if (type == DIHLogLevels.TRANSFORMED_ROW
|
||||
|| type == DIHLogLevels.TRANSFORMER_EXCEPTION) {
|
||||
displayName = "transformer:" + name;
|
||||
} else if (type == SolrWriter.START_DOC) {
|
||||
} else if (type == DIHLogLevels.START_DOC) {
|
||||
this.name = displayName = "document#" + SolrWriter.getDocCount();
|
||||
}
|
||||
parent.lst.add(displayName, lst);
|
||||
|
|
|
@ -56,30 +56,60 @@ public class DocBuilder {
|
|||
|
||||
public Statistics importStatistics = new Statistics();
|
||||
|
||||
SolrWriter writer;
|
||||
DIHWriter writer;
|
||||
|
||||
DataImporter.RequestParams requestParameters;
|
||||
|
||||
boolean verboseDebug = false;
|
||||
|
||||
Map<String, Object> session = new ConcurrentHashMap<String, Object>();
|
||||
Map<String, Object> session = new ConcurrentHashMap<String, Object>();
|
||||
|
||||
static final ThreadLocal<DocBuilder> INSTANCE = new ThreadLocal<DocBuilder>();
|
||||
Map<String, Object> functionsNamespace;
|
||||
private Properties persistedProperties;
|
||||
|
||||
private DIHPropertiesWriter propWriter;
|
||||
private static final String PARAM_WRITER_IMPL = "writerImpl";
|
||||
private static final String DEFAULT_WRITER_NAME = "SolrWriter";
|
||||
private DebugLogger debugLogger;
|
||||
|
||||
public DocBuilder(DataImporter dataImporter, SolrWriter writer, DIHPropertiesWriter propWriter, DataImporter.RequestParams reqParams) {
|
||||
@SuppressWarnings("unchecked")
|
||||
public DocBuilder(DataImporter dataImporter, SolrWriter solrWriter, DIHPropertiesWriter propWriter, DataImporter.RequestParams reqParams) {
|
||||
INSTANCE.set(this);
|
||||
this.dataImporter = dataImporter;
|
||||
this.writer = writer;
|
||||
this.propWriter = propWriter;
|
||||
DataImporter.QUERY_COUNT.set(importStatistics.queryCount);
|
||||
requestParameters = reqParams;
|
||||
verboseDebug = requestParameters.debug && requestParameters.verbose;
|
||||
functionsNamespace = EvaluatorBag.getFunctionsNamespace(this.dataImporter.getConfig().functions, this);
|
||||
persistedProperties = propWriter.readIndexerProperties();
|
||||
|
||||
String writerClassStr = null;
|
||||
if(reqParams!=null && reqParams.requestParams != null) {
|
||||
writerClassStr = (String) reqParams.requestParams.get(PARAM_WRITER_IMPL);
|
||||
}
|
||||
if(writerClassStr != null && !writerClassStr.equals(DEFAULT_WRITER_NAME) && !writerClassStr.equals(DocBuilder.class.getPackage().getName() + "." + DEFAULT_WRITER_NAME)) {
|
||||
try {
|
||||
Class<DIHWriter> writerClass = loadClass(writerClassStr, dataImporter.getCore());
|
||||
this.writer = writerClass.newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new DataImportHandlerException(DataImportHandlerException.SEVERE, "Unable to load Writer implementation:" + writerClassStr, e);
|
||||
}
|
||||
} else {
|
||||
writer = solrWriter;
|
||||
}
|
||||
ContextImpl ctx = new ContextImpl(null, null, null, null, reqParams.requestParams, null, this);
|
||||
writer.init(ctx);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
private DebugLogger getDebubLogger(){
|
||||
if (debugLogger == null) {
|
||||
debugLogger = new DebugLogger();
|
||||
}
|
||||
return debugLogger;
|
||||
}
|
||||
|
||||
public VariableResolverImpl getVariableResolver() {
|
||||
|
@ -139,94 +169,100 @@ public class DocBuilder {
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void execute() {
|
||||
dataImporter.store(DataImporter.STATUS_MSGS, statusMessages);
|
||||
document = dataImporter.getConfig().document;
|
||||
final AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
|
||||
statusMessages.put(TIME_ELAPSED, new Object() {
|
||||
@Override
|
||||
public String toString() {
|
||||
return getTimeElapsedSince(startTime.get());
|
||||
}
|
||||
});
|
||||
|
||||
statusMessages.put(DataImporter.MSG.TOTAL_QUERIES_EXECUTED,
|
||||
importStatistics.queryCount);
|
||||
statusMessages.put(DataImporter.MSG.TOTAL_ROWS_EXECUTED,
|
||||
importStatistics.rowsCount);
|
||||
statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED,
|
||||
importStatistics.docCount);
|
||||
statusMessages.put(DataImporter.MSG.TOTAL_DOCS_SKIPPED,
|
||||
importStatistics.skipDocCount);
|
||||
|
||||
List<String> entities = requestParameters.entities;
|
||||
|
||||
// Trigger onImportStart
|
||||
if (document.onImportStart != null) {
|
||||
invokeEventListener(document.onImportStart);
|
||||
}
|
||||
AtomicBoolean fullCleanDone = new AtomicBoolean(false);
|
||||
//we must not do a delete of *:* multiple times if there are multiple root entities to be run
|
||||
Properties lastIndexTimeProps = new Properties();
|
||||
lastIndexTimeProps.setProperty(LAST_INDEX_KEY,
|
||||
DataImporter.DATE_TIME_FORMAT.get().format(dataImporter.getIndexStartTime()));
|
||||
for (DataConfig.Entity e : document.entities) {
|
||||
if (entities != null && !entities.contains(e.name))
|
||||
continue;
|
||||
lastIndexTimeProps.setProperty(e.name + "." + LAST_INDEX_KEY,
|
||||
DataImporter.DATE_TIME_FORMAT.get().format(new Date()));
|
||||
root = e;
|
||||
String delQuery = e.allAttributes.get("preImportDeleteQuery");
|
||||
if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
|
||||
cleanByQuery(delQuery, fullCleanDone);
|
||||
doDelta();
|
||||
delQuery = e.allAttributes.get("postImportDeleteQuery");
|
||||
if (delQuery != null) {
|
||||
fullCleanDone.set(false);
|
||||
cleanByQuery(delQuery, fullCleanDone);
|
||||
}
|
||||
} else {
|
||||
cleanByQuery(delQuery, fullCleanDone);
|
||||
doFullDump();
|
||||
delQuery = e.allAttributes.get("postImportDeleteQuery");
|
||||
if (delQuery != null) {
|
||||
fullCleanDone.set(false);
|
||||
cleanByQuery(delQuery, fullCleanDone);
|
||||
}
|
||||
}
|
||||
statusMessages.remove(DataImporter.MSG.TOTAL_DOC_PROCESSED);
|
||||
}
|
||||
|
||||
if (stop.get()) {
|
||||
// Dont commit if aborted using command=abort
|
||||
statusMessages.put("Aborted", DataImporter.DATE_TIME_FORMAT.get().format(new Date()));
|
||||
rollback();
|
||||
} else {
|
||||
// Do not commit unnecessarily if this is a delta-import and no documents were created or deleted
|
||||
if (!requestParameters.clean) {
|
||||
if (importStatistics.docCount.get() > 0 || importStatistics.deletedDocCount.get() > 0) {
|
||||
finish(lastIndexTimeProps);
|
||||
}
|
||||
} else {
|
||||
// Finished operation normally, commit now
|
||||
finish(lastIndexTimeProps);
|
||||
}
|
||||
|
||||
if (writer != null) {
|
||||
writer.finish();
|
||||
}
|
||||
|
||||
if (document.onImportEnd != null) {
|
||||
invokeEventListener(document.onImportEnd);
|
||||
}
|
||||
}
|
||||
|
||||
statusMessages.remove(TIME_ELAPSED);
|
||||
statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED, ""+ importStatistics.docCount.get());
|
||||
if(importStatistics.failedDocCount.get() > 0)
|
||||
statusMessages.put(DataImporter.MSG.TOTAL_FAILED_DOCS, ""+ importStatistics.failedDocCount.get());
|
||||
|
||||
statusMessages.put("Time taken ", getTimeElapsedSince(startTime.get()));
|
||||
LOG.info("Time taken = " + getTimeElapsedSince(startTime.get()));
|
||||
try {
|
||||
dataImporter.store(DataImporter.STATUS_MSGS, statusMessages);
|
||||
document = dataImporter.getConfig().document;
|
||||
final AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
|
||||
statusMessages.put(TIME_ELAPSED, new Object() {
|
||||
@Override
|
||||
public String toString() {
|
||||
return getTimeElapsedSince(startTime.get());
|
||||
}
|
||||
});
|
||||
|
||||
statusMessages.put(DataImporter.MSG.TOTAL_QUERIES_EXECUTED,
|
||||
importStatistics.queryCount);
|
||||
statusMessages.put(DataImporter.MSG.TOTAL_ROWS_EXECUTED,
|
||||
importStatistics.rowsCount);
|
||||
statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED,
|
||||
importStatistics.docCount);
|
||||
statusMessages.put(DataImporter.MSG.TOTAL_DOCS_SKIPPED,
|
||||
importStatistics.skipDocCount);
|
||||
|
||||
List<String> entities = requestParameters.entities;
|
||||
|
||||
// Trigger onImportStart
|
||||
if (document.onImportStart != null) {
|
||||
invokeEventListener(document.onImportStart);
|
||||
}
|
||||
AtomicBoolean fullCleanDone = new AtomicBoolean(false);
|
||||
//we must not do a delete of *:* multiple times if there are multiple root entities to be run
|
||||
Properties lastIndexTimeProps = new Properties();
|
||||
lastIndexTimeProps.setProperty(LAST_INDEX_KEY,
|
||||
DataImporter.DATE_TIME_FORMAT.get().format(dataImporter.getIndexStartTime()));
|
||||
for (DataConfig.Entity e : document.entities) {
|
||||
if (entities != null && !entities.contains(e.name))
|
||||
continue;
|
||||
lastIndexTimeProps.setProperty(e.name + "." + LAST_INDEX_KEY,
|
||||
DataImporter.DATE_TIME_FORMAT.get().format(new Date()));
|
||||
root = e;
|
||||
String delQuery = e.allAttributes.get("preImportDeleteQuery");
|
||||
if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
|
||||
cleanByQuery(delQuery, fullCleanDone);
|
||||
doDelta();
|
||||
delQuery = e.allAttributes.get("postImportDeleteQuery");
|
||||
if (delQuery != null) {
|
||||
fullCleanDone.set(false);
|
||||
cleanByQuery(delQuery, fullCleanDone);
|
||||
}
|
||||
} else {
|
||||
cleanByQuery(delQuery, fullCleanDone);
|
||||
doFullDump();
|
||||
delQuery = e.allAttributes.get("postImportDeleteQuery");
|
||||
if (delQuery != null) {
|
||||
fullCleanDone.set(false);
|
||||
cleanByQuery(delQuery, fullCleanDone);
|
||||
}
|
||||
}
|
||||
statusMessages.remove(DataImporter.MSG.TOTAL_DOC_PROCESSED);
|
||||
}
|
||||
|
||||
if (stop.get()) {
|
||||
// Dont commit if aborted using command=abort
|
||||
statusMessages.put("Aborted", DataImporter.DATE_TIME_FORMAT.get().format(new Date()));
|
||||
rollback();
|
||||
} else {
|
||||
// Do not commit unnecessarily if this is a delta-import and no documents were created or deleted
|
||||
if (!requestParameters.clean) {
|
||||
if (importStatistics.docCount.get() > 0 || importStatistics.deletedDocCount.get() > 0) {
|
||||
finish(lastIndexTimeProps);
|
||||
}
|
||||
} else {
|
||||
// Finished operation normally, commit now
|
||||
finish(lastIndexTimeProps);
|
||||
}
|
||||
|
||||
if (document.onImportEnd != null) {
|
||||
invokeEventListener(document.onImportEnd);
|
||||
}
|
||||
}
|
||||
|
||||
statusMessages.remove(TIME_ELAPSED);
|
||||
statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED, ""+ importStatistics.docCount.get());
|
||||
if(importStatistics.failedDocCount.get() > 0)
|
||||
statusMessages.put(DataImporter.MSG.TOTAL_FAILED_DOCS, ""+ importStatistics.failedDocCount.get());
|
||||
|
||||
statusMessages.put("Time taken ", getTimeElapsedSince(startTime.get()));
|
||||
LOG.info("Time taken = " + getTimeElapsedSince(startTime.get()));
|
||||
} catch(Exception e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
} finally
|
||||
{
|
||||
if (writer != null) {
|
||||
writer.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -560,11 +596,11 @@ public class DocBuilder {
|
|||
Context.CURRENT_CONTEXT.set(ctx);
|
||||
|
||||
if (requestParameters.start > 0) {
|
||||
writer.log(SolrWriter.DISABLE_LOGGING, null, null);
|
||||
getDebubLogger().log(DIHLogLevels.DISABLE_LOGGING, null, null);
|
||||
}
|
||||
|
||||
if (verboseDebug) {
|
||||
writer.log(SolrWriter.START_ENTITY, entity.name, null);
|
||||
getDebubLogger().log(DIHLogLevels.START_ENTITY, entity.name, null);
|
||||
}
|
||||
|
||||
int seenDocCount = 0;
|
||||
|
@ -578,11 +614,11 @@ public class DocBuilder {
|
|||
seenDocCount++;
|
||||
|
||||
if (seenDocCount > requestParameters.start) {
|
||||
writer.log(SolrWriter.ENABLE_LOGGING, null, null);
|
||||
getDebubLogger().log(DIHLogLevels.ENABLE_LOGGING, null, null);
|
||||
}
|
||||
|
||||
if (verboseDebug && entity.isDocRoot) {
|
||||
writer.log(SolrWriter.START_DOC, entity.name, null);
|
||||
getDebubLogger().log(DIHLogLevels.START_DOC, entity.name, null);
|
||||
}
|
||||
if (doc == null && entity.isDocRoot) {
|
||||
doc = new DocWrapper();
|
||||
|
@ -611,7 +647,7 @@ public class DocBuilder {
|
|||
}
|
||||
|
||||
if (verboseDebug) {
|
||||
writer.log(SolrWriter.ENTITY_OUT, entity.name, arow);
|
||||
getDebubLogger().log(DIHLogLevels.ENTITY_OUT, entity.name, arow);
|
||||
}
|
||||
importStatistics.rowsCount.incrementAndGet();
|
||||
if (doc != null) {
|
||||
|
@ -647,7 +683,7 @@ public class DocBuilder {
|
|||
|
||||
} catch (DataImportHandlerException e) {
|
||||
if (verboseDebug) {
|
||||
writer.log(SolrWriter.ENTITY_EXCEPTION, entity.name, e);
|
||||
getDebubLogger().log(DIHLogLevels.ENTITY_EXCEPTION, entity.name, e);
|
||||
}
|
||||
if(e.getErrCode() == DataImportHandlerException.SKIP_ROW){
|
||||
continue;
|
||||
|
@ -666,21 +702,21 @@ public class DocBuilder {
|
|||
throw e;
|
||||
} catch (Throwable t) {
|
||||
if (verboseDebug) {
|
||||
writer.log(SolrWriter.ENTITY_EXCEPTION, entity.name, t);
|
||||
getDebubLogger().log(DIHLogLevels.ENTITY_EXCEPTION, entity.name, t);
|
||||
}
|
||||
throw new DataImportHandlerException(DataImportHandlerException.SEVERE, t);
|
||||
} finally {
|
||||
if (verboseDebug) {
|
||||
writer.log(SolrWriter.ROW_END, entity.name, null);
|
||||
getDebubLogger().log(DIHLogLevels.ROW_END, entity.name, null);
|
||||
if (entity.isDocRoot)
|
||||
writer.log(SolrWriter.END_DOC, null, null);
|
||||
getDebubLogger().log(DIHLogLevels.END_DOC, null, null);
|
||||
Context.CURRENT_CONTEXT.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (verboseDebug) {
|
||||
writer.log(SolrWriter.END_ENTITY, null, null);
|
||||
getDebubLogger().log(DIHLogLevels.END_ENTITY, null, null);
|
||||
}
|
||||
entityProcessor.destroy();
|
||||
}
|
||||
|
|
|
@ -27,15 +27,17 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* <p> Writes documents to SOLR as well as provides methods for loading and persisting last index time. </p>
|
||||
* <p> Writes documents to SOLR. </p>
|
||||
* <p/>
|
||||
* <b>This API is experimental and may change in the future.</b>
|
||||
*
|
||||
* @since solr 1.3
|
||||
*/
|
||||
public class SolrWriter {
|
||||
public class SolrWriter implements DIHWriter {
|
||||
private static final Logger log = LoggerFactory.getLogger(SolrWriter.class);
|
||||
|
||||
static final String LAST_INDEX_KEY = "last_index_time";
|
||||
|
@ -51,7 +53,16 @@ public class SolrWriter {
|
|||
this.req = req;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
try {
|
||||
processor.finish();
|
||||
} catch (IOException e) {
|
||||
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
|
||||
"Unable to call finish() on UpdateRequestProcessor", e);
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public boolean upload(SolrInputDocument d) {
|
||||
try {
|
||||
AddUpdateCommand command = new AddUpdateCommand(req);
|
||||
|
@ -64,7 +75,8 @@ public class SolrWriter {
|
|||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void deleteDoc(Object id) {
|
||||
try {
|
||||
log.info("Deleting document: " + id);
|
||||
|
@ -75,16 +87,8 @@ public class SolrWriter {
|
|||
log.error("Exception while deleteing: " + id, e);
|
||||
}
|
||||
}
|
||||
|
||||
void finish() {
|
||||
try {
|
||||
processor.finish();
|
||||
} catch (IOException e) {
|
||||
throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
|
||||
"Unable to call finish() on UpdateRequestProcessor", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void deleteByQuery(String query) {
|
||||
try {
|
||||
log.info("Deleting documents from Solr with query: " + query);
|
||||
|
@ -96,6 +100,7 @@ public class SolrWriter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commit(boolean optimize) {
|
||||
try {
|
||||
CommitUpdateCommand commit = new CommitUpdateCommand(req,optimize);
|
||||
|
@ -105,6 +110,7 @@ public class SolrWriter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rollback() {
|
||||
try {
|
||||
RollbackUpdateCommand rollback = new RollbackUpdateCommand(req);
|
||||
|
@ -114,6 +120,7 @@ public class SolrWriter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doDeleteAll() {
|
||||
try {
|
||||
DeleteUpdateCommand deleteCommand = new DeleteUpdateCommand(req);
|
||||
|
@ -151,28 +158,9 @@ public class SolrWriter {
|
|||
return null;
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void init(Context context) {
|
||||
/* NO-OP */
|
||||
}
|
||||
|
||||
public DebugLogger getDebugLogger() {
|
||||
if (debugLogger == null) {
|
||||
debugLogger = new DebugLogger(this);
|
||||
}
|
||||
return debugLogger;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is used for verbose debugging
|
||||
*
|
||||
* @param event The event name start.entity ,end.entity ,transformer.row
|
||||
* @param name Name of the entity/transformer
|
||||
* @param row The actual data . Can be a Map<String,object> or a List<Map<String,object>>
|
||||
*/
|
||||
public void log(int event, String name, Object row) {
|
||||
getDebugLogger().log(event, name, row);
|
||||
}
|
||||
|
||||
public static final int START_ENTITY = 1, END_ENTITY = 2,
|
||||
TRANSFORMED_ROW = 3, ENTITY_META = 4, PRE_TRANSFORMER_ROW = 5,
|
||||
START_DOC = 6, END_DOC = 7, ENTITY_OUT = 8, ROW_END = 9,
|
||||
TRANSFORMER_EXCEPTION = 10, ENTITY_EXCEPTION = 11, DISABLE_LOGGING = 12,
|
||||
ENABLE_LOGGING = 13;
|
||||
}
|
||||
|
|
|
@ -206,11 +206,6 @@ public class TestDocBuilder extends AbstractDataImportHandlerTestCase {
|
|||
return docs.add(doc);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void log(int event, String name, Object row) {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doDeleteAll() {
|
||||
deleteAllCalled = Boolean.TRUE;
|
||||
|
@ -222,7 +217,7 @@ public class TestDocBuilder extends AbstractDataImportHandlerTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void finish() {
|
||||
public void close() {
|
||||
finishCalled = Boolean.TRUE;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue