mirror of https://github.com/apache/lucene.git
SOLR-2668 -- DIH multithreaded mode does not rollback on errors from EntityProcessor
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1161505 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0f443840c4
commit
582df287cd
|
@ -21,6 +21,7 @@ Bug Fixes
|
||||||
* SOLR-2186: DataImportHandler's multi-threaded option throws NPE (Lance Norskog, Frank Wesemann, shalin)
|
* SOLR-2186: DataImportHandler's multi-threaded option throws NPE (Lance Norskog, Frank Wesemann, shalin)
|
||||||
* SOLR-2655: DIH multi threaded mode does not resolve attributes correctly (Frank Wesemann, shalin)
|
* SOLR-2655: DIH multi threaded mode does not resolve attributes correctly (Frank Wesemann, shalin)
|
||||||
* SOLR-2695: Documents are collected in unsynchronized list in multi-threaded debug mode (Michael McCandless, shalin)
|
* SOLR-2695: Documents are collected in unsynchronized list in multi-threaded debug mode (Michael McCandless, shalin)
|
||||||
|
* SOLR-2668: DIH multithreaded mode does not rollback on errors from EntityProcessor (Frank Wesemann, shalin)
|
||||||
|
|
||||||
================== 3.3.0 ==================
|
================== 3.3.0 ==================
|
||||||
|
|
||||||
|
|
|
@ -305,7 +305,7 @@ public class DocBuilder {
|
||||||
LOG.info("running multithreaded full-import");
|
LOG.info("running multithreaded full-import");
|
||||||
new EntityRunner(root,null).run(null,Context.FULL_DUMP,null);
|
new EntityRunner(root,null).run(null,Context.FULL_DUMP,null);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("error in import", e);
|
throw new RuntimeException("Error in multi-threaded import", e);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
buildDocument(getVariableResolver(), null, null, root, true, null);
|
buildDocument(getVariableResolver(), null, null, root, true, null);
|
||||||
|
|
|
@ -16,15 +16,15 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.solr.handler.dataimport;
|
package org.apache.solr.handler.dataimport;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test DocBuilder with "threads"
|
* Test DocBuilder with "threads"
|
||||||
|
@ -60,6 +60,8 @@ public class TestDocBuilderThreaded extends AbstractDataImportHandlerTestCase {
|
||||||
DemoProcessor.entitiesInitied = 0;
|
DemoProcessor.entitiesInitied = 0;
|
||||||
DemoEvaluator.evaluated = 0;
|
DemoEvaluator.evaluated = 0;
|
||||||
MockDataSource.clearCache();
|
MockDataSource.clearCache();
|
||||||
|
assertU(delQ("*:*"));
|
||||||
|
assertU(commit());
|
||||||
super.tearDown();
|
super.tearDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,6 +88,23 @@ public class TestDocBuilderThreaded extends AbstractDataImportHandlerTestCase {
|
||||||
assertEquals("Evaluator was invoked less times than the number of rows",
|
assertEquals("Evaluator was invoked less times than the number of rows",
|
||||||
4, DemoEvaluator.evaluated);
|
4, DemoEvaluator.evaluated);
|
||||||
}
|
}
|
||||||
|
@Test
|
||||||
|
public void testContinue() throws Exception {
|
||||||
|
runFullImport(twoEntitiesWithFailingProcessor);
|
||||||
|
assertQ(req("*:*"), "//*[@numFound='0']"); // should rollback
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContinueThreaded() throws Exception {
|
||||||
|
runFullImport(twoThreadedEntitiesWithFailingProcessor);
|
||||||
|
assertQ(req("*:*"), "//*[@numFound='0']"); // should rollback
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFailingTransformerContinueThreaded() throws Exception {
|
||||||
|
runFullImport(twoThreadedEntitiesWithFailingTransformer);
|
||||||
|
assertQ(req("*:*"), "//*[@numFound='4']");
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private List<Map<String, Object>> getDetails4Worker(String aWorker) {
|
private List<Map<String, Object>> getDetails4Worker(String aWorker) {
|
||||||
|
@ -116,8 +135,7 @@ public class TestDocBuilderThreaded extends AbstractDataImportHandlerTestCase {
|
||||||
"</entity>" +
|
"</entity>" +
|
||||||
"</document>" +
|
"</document>" +
|
||||||
"</dataConfig>";
|
"</dataConfig>";
|
||||||
|
private final String twoEntitiesWithProcessor =
|
||||||
private final String twoEntitiesWithProcessor =
|
|
||||||
|
|
||||||
"<dataConfig> <dataSource type=\"MockDataSource\"/>\n" +
|
"<dataConfig> <dataSource type=\"MockDataSource\"/>\n" +
|
||||||
"<document>" +
|
"<document>" +
|
||||||
|
@ -138,7 +156,7 @@ public class TestDocBuilderThreaded extends AbstractDataImportHandlerTestCase {
|
||||||
"</entity>" +
|
"</entity>" +
|
||||||
"</document>" +
|
"</document>" +
|
||||||
"</dataConfig>";
|
"</dataConfig>";
|
||||||
|
|
||||||
private final String twoEntitiesWithEvaluatorProcessor =
|
private final String twoEntitiesWithEvaluatorProcessor =
|
||||||
|
|
||||||
"<dataConfig> <dataSource type=\"MockDataSource\"/>\n" +
|
"<dataConfig> <dataSource type=\"MockDataSource\"/>\n" +
|
||||||
|
@ -164,6 +182,89 @@ public class TestDocBuilderThreaded extends AbstractDataImportHandlerTestCase {
|
||||||
"</dataConfig>";
|
"</dataConfig>";
|
||||||
|
|
||||||
|
|
||||||
|
private final String twoThreadedEntitiesWithFailingProcessor =
|
||||||
|
|
||||||
|
"<dataConfig> <dataSource type=\"MockDataSource\"/>\n" +
|
||||||
|
"<document>" +
|
||||||
|
"<entity name=\"job\" processor=\"TestDocBuilderThreaded$DemoProcessor\" \n" +
|
||||||
|
" threads=\"1\" " +
|
||||||
|
" query=\"select * from y\"" +
|
||||||
|
" pk=\"id\" \n" +
|
||||||
|
" worker=\"id\" \n" +
|
||||||
|
" onError=\"continue\" " +
|
||||||
|
">" +
|
||||||
|
"<field column=\"id\" />\n" +
|
||||||
|
"<entity name=\"details\" processor=\"TestDocBuilderThreaded$FailingProcessor\" \n" +
|
||||||
|
"worker=\"${job.worker}\" \n" +
|
||||||
|
"query=\"${job.worker}\" \n" +
|
||||||
|
"transformer=\"TemplateTransformer\" " +
|
||||||
|
"onError=\"continue\" " +
|
||||||
|
"fail=\"yes\" " +
|
||||||
|
" >" +
|
||||||
|
"<field column=\"author_s\" />" +
|
||||||
|
"<field column=\"title_s\" />" +
|
||||||
|
" <field column=\"text_s\" />" +
|
||||||
|
" <field column=\"generated_id_s\" template=\"generated_${job.id}\" />" +
|
||||||
|
"</entity>" +
|
||||||
|
"</entity>" +
|
||||||
|
"</document>" +
|
||||||
|
"</dataConfig>";
|
||||||
|
|
||||||
|
private final String twoEntitiesWithFailingProcessor =
|
||||||
|
|
||||||
|
"<dataConfig> <dataSource type=\"MockDataSource\"/>\n" +
|
||||||
|
"<document>" +
|
||||||
|
"<entity name=\"job\" processor=\"TestDocBuilderThreaded$DemoProcessor\" \n" +
|
||||||
|
" query=\"select * from y\"" +
|
||||||
|
" pk=\"id\" \n" +
|
||||||
|
" worker=\"id\" \n" +
|
||||||
|
" onError=\"continue\" " +
|
||||||
|
">" +
|
||||||
|
"<field column=\"id\" />\n" +
|
||||||
|
"<entity name=\"details\" processor=\"TestDocBuilderThreaded$FailingProcessor\" \n" +
|
||||||
|
"worker=\"${job.worker}\" \n" +
|
||||||
|
"query=\"${job.worker}\" \n" +
|
||||||
|
"transformer=\"TemplateTransformer\" " +
|
||||||
|
"onError=\"continue\" " +
|
||||||
|
"fail=\"yes\" " +
|
||||||
|
" >" +
|
||||||
|
"<field column=\"author_s\" />" +
|
||||||
|
"<field column=\"title_s\" />" +
|
||||||
|
" <field column=\"text_s\" />" +
|
||||||
|
" <field column=\"generated_id_s\" template=\"generated_${job.id}\" />" +
|
||||||
|
"</entity>" +
|
||||||
|
"</entity>" +
|
||||||
|
"</document>" +
|
||||||
|
"</dataConfig>";
|
||||||
|
|
||||||
|
private final String twoThreadedEntitiesWithFailingTransformer =
|
||||||
|
|
||||||
|
"<dataConfig> <dataSource type=\"MockDataSource\"/>\n" +
|
||||||
|
"<document>" +
|
||||||
|
"<entity name=\"job\" processor=\"TestDocBuilderThreaded$DemoProcessor\" \n" +
|
||||||
|
" threads=\"1\" " +
|
||||||
|
" query=\"select * from y\"" +
|
||||||
|
" pk=\"id\" \n" +
|
||||||
|
" worker=\"id\" \n" +
|
||||||
|
" onError=\"continue\" " +
|
||||||
|
">" +
|
||||||
|
"<field column=\"id\" />\n" +
|
||||||
|
"<entity name=\"details\" \n" +
|
||||||
|
"worker=\"${job.worker}\" \n" +
|
||||||
|
"query=\"${job.worker}\" \n" +
|
||||||
|
"transformer=\"TestDocBuilderThreaded$FailingTransformer\" " +
|
||||||
|
"onError=\"continue\" " +
|
||||||
|
" >" +
|
||||||
|
"<field column=\"author_s\" />" +
|
||||||
|
"<field column=\"title_s\" />" +
|
||||||
|
" <field column=\"text_s\" />" +
|
||||||
|
" <field column=\"generated_id_s\" template=\"generated_${job.id}\" />" +
|
||||||
|
"</entity>" +
|
||||||
|
"</entity>" +
|
||||||
|
"</document>" +
|
||||||
|
"</dataConfig>";
|
||||||
|
|
||||||
|
|
||||||
public static class DemoProcessor extends SqlEntityProcessor {
|
public static class DemoProcessor extends SqlEntityProcessor {
|
||||||
|
|
||||||
public static int entitiesInitied = 0;
|
public static int entitiesInitied = 0;
|
||||||
|
@ -177,6 +278,23 @@ public class TestDocBuilderThreaded extends AbstractDataImportHandlerTestCase {
|
||||||
} else entitiesInitied++;
|
} else entitiesInitied++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
public static class FailingProcessor extends SqlEntityProcessor {
|
||||||
|
@Override
|
||||||
|
public void init(Context context) {
|
||||||
|
super.init(context);
|
||||||
|
String fail = context.getResolvedEntityAttribute("fail");
|
||||||
|
if (fail != null && fail.equalsIgnoreCase("yes")) {
|
||||||
|
throw new NullPointerException("I was told to");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class FailingTransformer extends Transformer {
|
||||||
|
@Override
|
||||||
|
public Object transformRow(Map<String, Object> row, Context context) {
|
||||||
|
throw new RuntimeException("Always fail");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static class DemoEvaluator extends Evaluator {
|
public static class DemoEvaluator extends Evaluator {
|
||||||
public static int evaluated = 0;
|
public static int evaluated = 0;
|
||||||
|
@ -196,4 +314,5 @@ public class TestDocBuilderThreaded extends AbstractDataImportHandlerTestCase {
|
||||||
return result.toString();
|
return result.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue