mirror of https://github.com/apache/lucene.git
SOLR-2947: fix multi-threaded DIH bug (introduced w/SOLR-2382)
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1245014 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c2eb461acc
commit
37bb87e70d
|
@ -297,20 +297,32 @@ public class DocBuilder {
|
||||||
addStatusMessage("Rolledback");
|
addStatusMessage("Rolledback");
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private void doFullDump() {
|
private void doFullDump() {
|
||||||
addStatusMessage("Full Dump Started");
|
addStatusMessage("Full Dump Started");
|
||||||
if (dataImporter.getConfig().isMultiThreaded && !verboseDebug) {
|
if (dataImporter.getConfig().isMultiThreaded && !verboseDebug) {
|
||||||
|
EntityRunner entityRunner = null;
|
||||||
try {
|
try {
|
||||||
LOG.info("running multithreaded full-import");
|
LOG.info("running multithreaded full-import");
|
||||||
new EntityRunner(root,null).run(null,Context.FULL_DUMP,null);
|
entityRunner = new EntityRunner(root, null);
|
||||||
|
entityRunner.run(null, Context.FULL_DUMP, null);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException("Error in multi-threaded import", e);
|
throw new RuntimeException("Error in multi-threaded import", e);
|
||||||
|
} finally {
|
||||||
|
if (entityRunner != null) {
|
||||||
|
List<EntityRunner> closure = new ArrayList<EntityRunner>();
|
||||||
|
closure.add(entityRunner);
|
||||||
|
for (int i = 0; i < closure.size(); i++) {
|
||||||
|
assert(!closure.get(i).entityProcessorWrapper.isEmpty());
|
||||||
|
closure.addAll(closure.get(i).entityProcessorWrapper.iterator().next().children.values());
|
||||||
|
}
|
||||||
|
for (EntityRunner er : closure) {
|
||||||
|
er.entityProcessor.destroy();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
buildDocument(getVariableResolver(), null, null, root, true, null);
|
buildDocument(getVariableResolver(), null, null, root, true, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
@ -470,7 +482,6 @@ public class DocBuilder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
entityProcessor.destroy();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,32 +0,0 @@
|
||||||
<dataConfig>
|
|
||||||
<dataSource type="MockDataSource" />
|
|
||||||
<document>
|
|
||||||
<entity
|
|
||||||
name="PARENT"
|
|
||||||
processor="SqlEntityProcessor"
|
|
||||||
cacheName="PARENT"
|
|
||||||
cachePk="id"
|
|
||||||
query="SELECT * FROM PARENT"
|
|
||||||
>
|
|
||||||
<entity
|
|
||||||
name="CHILD_1"
|
|
||||||
processor="SqlEntityProcessor"
|
|
||||||
cacheImpl="SortedMapBackedCache"
|
|
||||||
cacheName="CHILD"
|
|
||||||
cachePk="id"
|
|
||||||
cacheLookup="PARENT.id"
|
|
||||||
fieldNames="id, child1a_mult_s, child1b_s"
|
|
||||||
fieldTypes="BIGDECIMAL, STRING, STRING"
|
|
||||||
query="SELECT * FROM CHILD_1"
|
|
||||||
/>
|
|
||||||
<entity
|
|
||||||
name="CHILD_2"
|
|
||||||
processor="SqlEntityProcessor"
|
|
||||||
cacheImpl="SortedMapBackedCache"
|
|
||||||
cachePk="id"
|
|
||||||
cacheLookup="PARENT.id"
|
|
||||||
query="SELECT * FROM CHILD_2"
|
|
||||||
/>
|
|
||||||
</entity>
|
|
||||||
</document>
|
|
||||||
</dataConfig>
|
|
|
@ -0,0 +1,21 @@
|
||||||
|
package org.apache.solr.handler.dataimport;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.nullValue;
|
||||||
|
|
||||||
|
import java.util.IdentityHashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
|
||||||
|
public class DestroyCountCache extends SortedMapBackedCache {
|
||||||
|
static Map<DIHCache,DIHCache> destroyed = new IdentityHashMap<DIHCache,DIHCache>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
super.destroy();
|
||||||
|
Assert.assertThat(destroyed.put(this, this), nullValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
public DestroyCountCache() {}
|
||||||
|
|
||||||
|
}
|
|
@ -4,16 +4,47 @@ import java.math.BigDecimal;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.*;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestEphemeralCache extends AbstractDataImportHandlerTestCase {
|
public class TestEphemeralCache extends AbstractDataImportHandlerTestCase {
|
||||||
|
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void beforeClass() throws Exception {
|
public static void beforeClass() throws Exception {
|
||||||
initCore("dataimport-solrconfig.xml", "dataimport-schema.xml");
|
initCore("dataimport-solrconfig.xml", "dataimport-schema.xml");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testEphemeralCache() throws Exception {
|
@Before
|
||||||
|
public void reset() {
|
||||||
|
DestroyCountCache.destroyed.clear();
|
||||||
|
setupMockData();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSingleThreaded() throws Exception {
|
||||||
|
assertFullImport(getDataConfigDotXml(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithThreadedParamEqualOne() throws Exception {
|
||||||
|
assertFullImport(getDataConfigDotXml(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Ignore("TODO: fix included in SOLR-3011")
|
||||||
|
@Test
|
||||||
|
public void testMultiThreaded() throws Exception {
|
||||||
|
// Try between 2 and 6 threads
|
||||||
|
int numThreads = random.nextInt(4) + 2;
|
||||||
|
System.out.println("TRYING " + numThreads);
|
||||||
|
assertFullImport(getDataConfigDotXml(numThreads));
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private void setupMockData() {
|
||||||
List parentRows = new ArrayList();
|
List parentRows = new ArrayList();
|
||||||
parentRows.add(createMap("id", new BigDecimal("1"), "parent_s", "one"));
|
parentRows.add(createMap("id", new BigDecimal("1"), "parent_s", "one"));
|
||||||
parentRows.add(createMap("id", new BigDecimal("2"), "parent_s", "two"));
|
parentRows.add(createMap("id", new BigDecimal("2"), "parent_s", "two"));
|
||||||
|
@ -49,7 +80,47 @@ public class TestEphemeralCache extends AbstractDataImportHandlerTestCase {
|
||||||
MockDataSource.setIterator("SELECT * FROM CHILD_1", child1Rows.iterator());
|
MockDataSource.setIterator("SELECT * FROM CHILD_1", child1Rows.iterator());
|
||||||
MockDataSource.setIterator("SELECT * FROM CHILD_2", child2Rows.iterator());
|
MockDataSource.setIterator("SELECT * FROM CHILD_2", child2Rows.iterator());
|
||||||
|
|
||||||
runFullImport(loadDataConfig("dataimport-cache-ephemeral.xml"));
|
}
|
||||||
|
private String getDataConfigDotXml(int numThreads) {
|
||||||
|
return
|
||||||
|
"<dataConfig>" +
|
||||||
|
" <dataSource type=\"MockDataSource\" />" +
|
||||||
|
" <document>" +
|
||||||
|
" <entity " +
|
||||||
|
" name=\"PARENT\"" +
|
||||||
|
" processor=\"SqlEntityProcessor\"" +
|
||||||
|
" cacheImpl=\"org.apache.solr.handler.dataimport.DestroyCountCache\"" +
|
||||||
|
" cacheName=\"PARENT\"" +
|
||||||
|
" query=\"SELECT * FROM PARENT\" " +
|
||||||
|
(numThreads==0 ? "" : "threads=\"" + numThreads + "\" ") +
|
||||||
|
" >" +
|
||||||
|
" <entity" +
|
||||||
|
" name=\"CHILD_1\"" +
|
||||||
|
" processor=\"SqlEntityProcessor\"" +
|
||||||
|
" cacheImpl=\"org.apache.solr.handler.dataimport.DestroyCountCache\"" +
|
||||||
|
" cacheName=\"CHILD\"" +
|
||||||
|
" cachePk=\"id\"" +
|
||||||
|
" cacheLookup=\"PARENT.id\"" +
|
||||||
|
" fieldNames=\"id, child1a_mult_s, child1b_s\"" +
|
||||||
|
" fieldTypes=\"BIGDECIMAL, STRING, STRING\"" +
|
||||||
|
" query=\"SELECT * FROM CHILD_1\" " +
|
||||||
|
" />" +
|
||||||
|
" <entity" +
|
||||||
|
" name=\"CHILD_2\"" +
|
||||||
|
" processor=\"SqlEntityProcessor\"" +
|
||||||
|
" cacheImpl=\"org.apache.solr.handler.dataimport.DestroyCountCache\"" +
|
||||||
|
" cachePk=\"id\"" +
|
||||||
|
" cacheLookup=\"PARENT.id\"" +
|
||||||
|
" query=\"SELECT * FROM CHILD_2\" " +
|
||||||
|
" />" +
|
||||||
|
" </entity>" +
|
||||||
|
" </document>" +
|
||||||
|
"</dataConfig>"
|
||||||
|
;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertFullImport(String dataConfig) throws Exception {
|
||||||
|
runFullImport(dataConfig);
|
||||||
|
|
||||||
assertQ(req("*:*"), "//*[@numFound='5']");
|
assertQ(req("*:*"), "//*[@numFound='5']");
|
||||||
assertQ(req("id:1"), "//*[@numFound='1']");
|
assertQ(req("id:1"), "//*[@numFound='1']");
|
||||||
|
@ -63,6 +134,7 @@ public class TestEphemeralCache extends AbstractDataImportHandlerTestCase {
|
||||||
assertQ(req("child1a_mult_s:uno"), "//*[@numFound='1']");
|
assertQ(req("child1a_mult_s:uno"), "//*[@numFound='1']");
|
||||||
assertQ(req("child1a_mult_s:(uno OR one)"), "//*[@numFound='1']");
|
assertQ(req("child1a_mult_s:(uno OR one)"), "//*[@numFound='1']");
|
||||||
|
|
||||||
|
assertThat(DestroyCountCache.destroyed.size(), is(3));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue