mirror of https://github.com/apache/lucene.git
SOLR-3622, SOLR-5847, SOLR-6194, SOLR-6269: Several DIH fixes/improvements
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1613406 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0b11b8ab6b
commit
5cd69d47e1
|
@ -222,6 +222,8 @@ Bug Fixes
|
||||||
datatypes of options people can specify, additional error handling of duplicated/unidentified
|
datatypes of options people can specify, additional error handling of duplicated/unidentified
|
||||||
options has also been added. (Maciej Zasada, hossman)
|
options has also been added. (Maciej Zasada, hossman)
|
||||||
|
|
||||||
|
* SOLR-5847: Fixed data import abort button in admin UI. (ehatcher)
|
||||||
|
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
---------------------
|
---------------------
|
||||||
|
@ -289,6 +291,15 @@ Other Changes
|
||||||
* SOLR-6274: UpdateShardHandler should log the params used to configure it's
|
* SOLR-6274: UpdateShardHandler should log the params used to configure it's
|
||||||
HttpClient. (Ramkumar Aiyengar via Mark Miller)
|
HttpClient. (Ramkumar Aiyengar via Mark Miller)
|
||||||
|
|
||||||
|
* SOLR-6194: Opened up "public" access to DataSource, DocBuilder, and EntityProcessorWrapper
|
||||||
|
in DIH. (Aaron LaBella via ehatcher)
|
||||||
|
|
||||||
|
* SOLR-6269: Renamed "rollback" to "error" in DIH internals, including renaming onRollback
|
||||||
|
to onError introduced in SOLR-6258. (ehatcher)
|
||||||
|
|
||||||
|
* SOLR-3622: When using DIH in SolrCloud-mode, rollback will no longer be called when
|
||||||
|
an error occurs. (ehatcher)
|
||||||
|
|
||||||
================== 4.9.0 ==================
|
================== 4.9.0 ==================
|
||||||
|
|
||||||
Versions of Major Components
|
Versions of Major Components
|
||||||
|
|
|
@ -54,6 +54,8 @@ public class ContextImpl extends Context {
|
||||||
|
|
||||||
DocBuilder docBuilder;
|
DocBuilder docBuilder;
|
||||||
|
|
||||||
|
Exception lastException = null;
|
||||||
|
|
||||||
|
|
||||||
public ContextImpl(EntityProcessorWrapper epw, VariableResolver resolver,
|
public ContextImpl(EntityProcessorWrapper epw, VariableResolver resolver,
|
||||||
DataSource ds, String currProcess,
|
DataSource ds, String currProcess,
|
||||||
|
|
|
@ -179,7 +179,11 @@ public class DataImporter {
|
||||||
* Used by tests
|
* Used by tests
|
||||||
*/
|
*/
|
||||||
public void loadAndInit(String configStr) {
|
public void loadAndInit(String configStr) {
|
||||||
config = loadDataConfig(new InputSource(new StringReader(configStr)));
|
config = loadDataConfig(new InputSource(new StringReader(configStr)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void loadAndInit(InputSource configFile) {
|
||||||
|
config = loadDataConfig(configFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
public DIHConfiguration loadDataConfig(InputSource configFile) {
|
public DIHConfiguration loadDataConfig(InputSource configFile) {
|
||||||
|
@ -349,7 +353,7 @@ public class DataImporter {
|
||||||
return store.get(key);
|
return store.get(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
DataSource getDataSourceInstance(Entity key, String name, Context ctx) {
|
public DataSource getDataSourceInstance(Entity key, String name, Context ctx) {
|
||||||
Map<String,String> p = requestLevelDataSourceProps.get(name);
|
Map<String,String> p = requestLevelDataSourceProps.get(name);
|
||||||
if (p == null)
|
if (p == null)
|
||||||
p = config.getDataSources().get(name);
|
p = config.getDataSources().get(name);
|
||||||
|
@ -404,7 +408,6 @@ public class DataImporter {
|
||||||
public void doFullImport(DIHWriter writer, RequestInfo requestParams) {
|
public void doFullImport(DIHWriter writer, RequestInfo requestParams) {
|
||||||
LOG.info("Starting Full Import");
|
LOG.info("Starting Full Import");
|
||||||
setStatus(Status.RUNNING_FULL_DUMP);
|
setStatus(Status.RUNNING_FULL_DUMP);
|
||||||
boolean success = false;
|
|
||||||
try {
|
try {
|
||||||
DIHProperties dihPropWriter = createPropertyWriter();
|
DIHProperties dihPropWriter = createPropertyWriter();
|
||||||
setIndexStartTime(dihPropWriter.getCurrentTimestamp());
|
setIndexStartTime(dihPropWriter.getCurrentTimestamp());
|
||||||
|
@ -413,14 +416,10 @@ public class DataImporter {
|
||||||
docBuilder.execute();
|
docBuilder.execute();
|
||||||
if (!requestParams.isDebug())
|
if (!requestParams.isDebug())
|
||||||
cumulativeStatistics.add(docBuilder.importStatistics);
|
cumulativeStatistics.add(docBuilder.importStatistics);
|
||||||
success = true;
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
SolrException.log(LOG, "Full Import failed", e);
|
SolrException.log(LOG, "Full Import failed", e);
|
||||||
|
docBuilder.handleError("Full Import failed", e);
|
||||||
} finally {
|
} finally {
|
||||||
if (!success) {
|
|
||||||
docBuilder.rollback();
|
|
||||||
}
|
|
||||||
|
|
||||||
setStatus(Status.IDLE);
|
setStatus(Status.IDLE);
|
||||||
DocBuilder.INSTANCE.set(null);
|
DocBuilder.INSTANCE.set(null);
|
||||||
}
|
}
|
||||||
|
@ -437,7 +436,6 @@ public class DataImporter {
|
||||||
public void doDeltaImport(DIHWriter writer, RequestInfo requestParams) {
|
public void doDeltaImport(DIHWriter writer, RequestInfo requestParams) {
|
||||||
LOG.info("Starting Delta Import");
|
LOG.info("Starting Delta Import");
|
||||||
setStatus(Status.RUNNING_DELTA_DUMP);
|
setStatus(Status.RUNNING_DELTA_DUMP);
|
||||||
boolean success = false;
|
|
||||||
try {
|
try {
|
||||||
DIHProperties dihPropWriter = createPropertyWriter();
|
DIHProperties dihPropWriter = createPropertyWriter();
|
||||||
setIndexStartTime(dihPropWriter.getCurrentTimestamp());
|
setIndexStartTime(dihPropWriter.getCurrentTimestamp());
|
||||||
|
@ -446,13 +444,10 @@ public class DataImporter {
|
||||||
docBuilder.execute();
|
docBuilder.execute();
|
||||||
if (!requestParams.isDebug())
|
if (!requestParams.isDebug())
|
||||||
cumulativeStatistics.add(docBuilder.importStatistics);
|
cumulativeStatistics.add(docBuilder.importStatistics);
|
||||||
success = true;
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Delta Import Failed", e);
|
LOG.error("Delta Import Failed", e);
|
||||||
|
docBuilder.handleError("Delta Import Failed", e);
|
||||||
} finally {
|
} finally {
|
||||||
if (!success) {
|
|
||||||
docBuilder.rollback();
|
|
||||||
}
|
|
||||||
setStatus(Status.IDLE);
|
setStatus(Status.IDLE);
|
||||||
DocBuilder.INSTANCE.set(null);
|
DocBuilder.INSTANCE.set(null);
|
||||||
}
|
}
|
||||||
|
@ -510,10 +505,15 @@ public class DataImporter {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DocBuilder getDocBuilder() {
|
public DocBuilder getDocBuilder() {
|
||||||
return docBuilder;
|
return docBuilder;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public DocBuilder getDocBuilder(DIHWriter writer, RequestInfo requestParams) {
|
||||||
|
DIHProperties dihPropWriter = createPropertyWriter();
|
||||||
|
return new DocBuilder(this, writer, dihPropWriter, requestParams);
|
||||||
|
}
|
||||||
|
|
||||||
Map<String, Evaluator> getEvaluators() {
|
Map<String, Evaluator> getEvaluators() {
|
||||||
return getEvaluators(config.getFunctions());
|
return getEvaluators(config.getFunctions());
|
||||||
}
|
}
|
||||||
|
|
|
@ -94,7 +94,9 @@ public class DocBuilder {
|
||||||
|
|
||||||
writer = solrWriter;
|
writer = solrWriter;
|
||||||
ContextImpl ctx = new ContextImpl(null, null, null, null, reqParams.getRawParams(), null, this);
|
ContextImpl ctx = new ContextImpl(null, null, null, null, reqParams.getRawParams(), null, this);
|
||||||
writer.init(ctx);
|
if (writer != null) {
|
||||||
|
writer.init(ctx);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -146,25 +148,31 @@ public class DocBuilder {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private void invokeEventListener(String className) {
|
private void invokeEventListener(String className) {
|
||||||
|
invokeEventListener(className, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void invokeEventListener(String className, Exception lastException) {
|
||||||
try {
|
try {
|
||||||
EventListener listener = (EventListener) loadClass(className, dataImporter.getCore()).newInstance();
|
EventListener listener = (EventListener) loadClass(className, dataImporter.getCore()).newInstance();
|
||||||
notifyListener(listener);
|
notifyListener(listener, lastException);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
wrapAndThrow(SEVERE, e, "Unable to load class : " + className);
|
wrapAndThrow(SEVERE, e, "Unable to load class : " + className);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void notifyListener(EventListener listener) {
|
private void notifyListener(EventListener listener, Exception lastException) {
|
||||||
String currentProcess;
|
String currentProcess;
|
||||||
if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
|
if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
|
||||||
currentProcess = Context.DELTA_DUMP;
|
currentProcess = Context.DELTA_DUMP;
|
||||||
} else {
|
} else {
|
||||||
currentProcess = Context.FULL_DUMP;
|
currentProcess = Context.FULL_DUMP;
|
||||||
}
|
}
|
||||||
listener.onEvent(new ContextImpl(null, getVariableResolver(), null, currentProcess, session, null, this));
|
ContextImpl ctx = new ContextImpl(null, getVariableResolver(), null, currentProcess, session, null, this);
|
||||||
|
ctx.lastException = lastException;
|
||||||
|
listener.onEvent(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
@ -234,7 +242,7 @@ public class DocBuilder {
|
||||||
if (stop.get()) {
|
if (stop.get()) {
|
||||||
// Dont commit if aborted using command=abort
|
// Dont commit if aborted using command=abort
|
||||||
statusMessages.put("Aborted", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ROOT).format(new Date()));
|
statusMessages.put("Aborted", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ROOT).format(new Date()));
|
||||||
rollback();
|
handleError("Aborted", null);
|
||||||
} else {
|
} else {
|
||||||
// Do not commit unnecessarily if this is a delta-import and no documents were created or deleted
|
// Do not commit unnecessarily if this is a delta-import and no documents were created or deleted
|
||||||
if (!reqParams.isClean()) {
|
if (!reqParams.isClean()) {
|
||||||
|
@ -305,12 +313,15 @@ public class DocBuilder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void rollback() {
|
void handleError(String message, Exception e) {
|
||||||
writer.rollback();
|
if (!dataImporter.getCore().getCoreDescriptor().getCoreContainer().isZooKeeperAware()) {
|
||||||
statusMessages.put("", "Indexing failed. Rolled back all changes.");
|
writer.rollback();
|
||||||
addStatusMessage("Rolledback");
|
}
|
||||||
if ((config != null) && (config.getOnRollback() != null)) {
|
|
||||||
invokeEventListener(config.getOnRollback());
|
statusMessages.put(message, "Indexing error");
|
||||||
|
addStatusMessage(message);
|
||||||
|
if ((config != null) && (config.getOnError() != null)) {
|
||||||
|
invokeEventListener(config.getOnError(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -688,7 +699,7 @@ public class DocBuilder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private EntityProcessorWrapper getEntityProcessorWrapper(Entity entity) {
|
public EntityProcessorWrapper getEntityProcessorWrapper(Entity entity) {
|
||||||
EntityProcessor entityProcessor = null;
|
EntityProcessor entityProcessor = null;
|
||||||
if (entity.getProcessorName() == null) {
|
if (entity.getProcessorName() == null) {
|
||||||
entityProcessor = new SqlEntityProcessor();
|
entityProcessor = new SqlEntityProcessor();
|
||||||
|
|
|
@ -117,7 +117,7 @@ public class SolrWriter extends DIHWriterBase implements DIHWriter {
|
||||||
RollbackUpdateCommand rollback = new RollbackUpdateCommand(req);
|
RollbackUpdateCommand rollback = new RollbackUpdateCommand(req);
|
||||||
processor.processRollback(rollback);
|
processor.processRollback(rollback);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Exception while solr rollback.", e);
|
log.error("Exception during rollback command.", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -56,7 +56,7 @@ public class DIHConfiguration {
|
||||||
private final List<Entity> entities;
|
private final List<Entity> entities;
|
||||||
private final String onImportStart;
|
private final String onImportStart;
|
||||||
private final String onImportEnd;
|
private final String onImportEnd;
|
||||||
private final String onRollback;
|
private final String onError;
|
||||||
private final List<Map<String, String>> functions;
|
private final List<Map<String, String>> functions;
|
||||||
private final Script script;
|
private final Script script;
|
||||||
private final Map<String, Map<String,String>> dataSources;
|
private final Map<String, Map<String,String>> dataSources;
|
||||||
|
@ -72,7 +72,7 @@ public class DIHConfiguration {
|
||||||
this.deleteQuery = ConfigParseUtil.getStringAttribute(element, "deleteQuery", null);
|
this.deleteQuery = ConfigParseUtil.getStringAttribute(element, "deleteQuery", null);
|
||||||
this.onImportStart = ConfigParseUtil.getStringAttribute(element, "onImportStart", null);
|
this.onImportStart = ConfigParseUtil.getStringAttribute(element, "onImportStart", null);
|
||||||
this.onImportEnd = ConfigParseUtil.getStringAttribute(element, "onImportEnd", null);
|
this.onImportEnd = ConfigParseUtil.getStringAttribute(element, "onImportEnd", null);
|
||||||
this.onRollback = ConfigParseUtil.getStringAttribute(element, "onRollback", null);
|
this.onError = ConfigParseUtil.getStringAttribute(element, "onError", null);
|
||||||
List<Entity> modEntities = new ArrayList<>();
|
List<Entity> modEntities = new ArrayList<>();
|
||||||
List<Element> l = ConfigParseUtil.getChildNodes(element, "entity");
|
List<Element> l = ConfigParseUtil.getChildNodes(element, "entity");
|
||||||
boolean docRootFound = false;
|
boolean docRootFound = false;
|
||||||
|
@ -165,8 +165,8 @@ public class DIHConfiguration {
|
||||||
public String getOnImportEnd() {
|
public String getOnImportEnd() {
|
||||||
return onImportEnd;
|
return onImportEnd;
|
||||||
}
|
}
|
||||||
public String getOnRollback() {
|
public String getOnError() {
|
||||||
return onRollback;
|
return onError;
|
||||||
}
|
}
|
||||||
public List<Map<String,String>> getFunctions() {
|
public List<Map<String,String>> getFunctions() {
|
||||||
return functions;
|
return functions;
|
||||||
|
|
|
@ -76,14 +76,15 @@ public class TestDocBuilder2 extends AbstractDataImportHandlerTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRollbackHandler() throws Exception {
|
public void testErrorHandler() throws Exception {
|
||||||
List rows = new ArrayList();
|
List rows = new ArrayList();
|
||||||
rows.add(createMap("id", "1", "FORCE_ROLLBACK", "true"));
|
rows.add(createMap("id", "1", "FORCE_ERROR", "true"));
|
||||||
MockDataSource.setIterator("select * from x", rows.iterator());
|
MockDataSource.setIterator("select * from x", rows.iterator());
|
||||||
|
|
||||||
runFullImport(dataConfigWithRollbackHandler);
|
runFullImport(dataConfigWithErrorHandler);
|
||||||
|
|
||||||
assertTrue("Rollback event listener was not called", RollbackEventListener.executed);
|
assertTrue("Error event listener was not called", ErrorEventListener.executed);
|
||||||
|
assertTrue(ErrorEventListener.lastException.getMessage().contains("ForcedException"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -316,12 +317,14 @@ public class TestDocBuilder2 extends AbstractDataImportHandlerTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class RollbackEventListener implements EventListener {
|
public static class ErrorEventListener implements EventListener {
|
||||||
public static boolean executed = false;
|
public static boolean executed = false;
|
||||||
|
public static Exception lastException = null;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onEvent(Context ctx) {
|
public void onEvent(Context ctx) {
|
||||||
executed = true;
|
executed = true;
|
||||||
|
lastException = ((ContextImpl) ctx).lastException;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -377,11 +380,11 @@ public class TestDocBuilder2 extends AbstractDataImportHandlerTestCase {
|
||||||
" </document>\n" +
|
" </document>\n" +
|
||||||
"</dataConfig>";
|
"</dataConfig>";
|
||||||
|
|
||||||
private final String dataConfigWithRollbackHandler = "<dataConfig> <dataSource type=\"MockDataSource\"/>\n" +
|
private final String dataConfigWithErrorHandler = "<dataConfig> <dataSource type=\"MockDataSource\"/>\n" +
|
||||||
" <document onRollback=\"TestDocBuilder2$RollbackEventListener\">\n" +
|
" <document onError=\"TestDocBuilder2$ErrorEventListener\">\n" +
|
||||||
" <entity name=\"books\" query=\"select * from x\" transformer=\"TestDocBuilder2$ForcedExceptionTransformer\">\n" +
|
" <entity name=\"books\" query=\"select * from x\" transformer=\"TestDocBuilder2$ForcedExceptionTransformer\">\n" +
|
||||||
" <field column=\"id\" />\n" +
|
" <field column=\"id\" />\n" +
|
||||||
" <field column=\"FORCE_ROLLBACK\" />\n" +
|
" <field column=\"FORCE_ERROR\" />\n" +
|
||||||
" </entity>\n" +
|
" </entity>\n" +
|
||||||
" </document>\n" +
|
" </document>\n" +
|
||||||
"</dataConfig>";
|
"</dataConfig>";
|
||||||
|
|
|
@ -350,7 +350,7 @@ sammy.get
|
||||||
{
|
{
|
||||||
url : handler_url + '?command=abort&wt=json',
|
url : handler_url + '?command=abort&wt=json',
|
||||||
dataType : 'json',
|
dataType : 'json',
|
||||||
type: 'POST',
|
type: 'GET',
|
||||||
context: $( this ),
|
context: $( this ),
|
||||||
beforeSend : function( xhr, settings )
|
beforeSend : function( xhr, settings )
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue