SOLR-12801: Wait for executor to finish shutdown.

This commit is contained in:
markrmiller 2018-11-30 23:58:28 -06:00 committed by markrmiller
parent d8f482f5fb
commit a3ec5b5fdf
2 changed files with 20 additions and 11 deletions

View File

@ -41,6 +41,7 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.admin.CollectionsHandler;
@ -96,9 +97,9 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
// never be updated by any async creation thread.
private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection. Sorted descending
private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
private volatile boolean executorRunning = false;
// This will be updated out in async creation threads see preemptiveAsync(Runnable r) for details
private volatile ExecutorService preemptiveCreationExecutor;
private ExecutorService preemptiveCreationWaitExecutor = newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("TRA-preemptive-creation-wait"));
public static UpdateRequestProcessor wrap(SolrQueryRequest req, UpdateRequestProcessor next) {
//TODO get from "Collection property"
@ -215,7 +216,7 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
// This next line blocks until all collections required by the current document have been created
return createAllRequiredCollections(docTimestamp, cmd.getPrintableId(), candidateCollectionDesc);
case ASYNC_PREEMPTIVE:
if (preemptiveCreationExecutor == null) {
if (!executorRunning) {
// It's important not to add code between here and the prior call to findCandidateGivenTimestamp()
// in processAdd() that invokes updateParsedCollectionAliases(). Doing so would update parsedCollectionsDesc
// and create a race condition. We are relying on the fact that get(0) is returning the head of the parsed
@ -247,13 +248,17 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
// would need to be shut down in a close hook to avoid test failures due to thread leaks in tests which is slightly
// more complicated from a code maintenance and readability stand point. An executor must used instead of a
// thread to ensure we pick up the proper MDC logging stuff from ExecutorUtil.
executorRunning = true;
DefaultSolrThreadFactory threadFactory = new DefaultSolrThreadFactory("TRA-preemptive-creation");
preemptiveCreationExecutor = newMDCAwareSingleThreadExecutor(threadFactory);
ExecutorService preemptiveCreationExecutor = newMDCAwareSingleThreadExecutor(threadFactory);
preemptiveCreationExecutor.execute(() -> {
r.run();
preemptiveCreationExecutor.shutdown();
preemptiveCreationExecutor = null;
executorRunning = false;
});
preemptiveCreationWaitExecutor.submit(() -> ExecutorUtil.awaitTermination(preemptiveCreationExecutor));
}
/**
@ -413,7 +418,11 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
try {
cmdDistrib.close();
} finally {
super.doClose();
try {
super.doClose();
} finally {
ExecutorUtil.shutdownAndAwaitTermination(preemptiveCreationWaitExecutor);
}
}
}

View File

@ -375,7 +375,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
assertUpdateResponse(solrClient.commit(alias));
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(3,cols.size());
assertEquals(4, cols.size());
assertNumDocs("2017-10-23", 1);
assertNumDocs("2017-10-24", 1);
assertNumDocs("2017-10-25", 3);
@ -387,7 +387,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
waitCol("2017-10-26", numShards);
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(4,cols.size());
assertEquals(5, cols.size());
assertNumDocs("2017-10-23", 1);
assertNumDocs("2017-10-24", 1);
assertNumDocs("2017-10-25", 4);
@ -404,7 +404,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
waitCol("2017-10-27", numShards);
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(5,cols.size()); // only one created in async case
assertEquals(6,cols.size()); // only one created in async case
assertNumDocs("2017-10-23", 1);
assertNumDocs("2017-10-24", 1);
assertNumDocs("2017-10-25", 5);
@ -419,7 +419,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
waitCol("2017-10-28", numShards);
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(6,cols.size()); // Subsequent documents continue to create up to limit
assertEquals(7,cols.size()); // Subsequent documents continue to create up to limit
assertNumDocs("2017-10-23", 1);
assertNumDocs("2017-10-24", 1);
assertNumDocs("2017-10-25", 6);
@ -451,7 +451,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
waitCol("2017-10-29", numShards);
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(7,cols.size());
assertEquals(8,cols.size());
assertNumDocs("2017-10-23", 1);
assertNumDocs("2017-10-24", 1);
assertNumDocs("2017-10-25", 6);