mirror of https://github.com/apache/lucene.git
SOLR-12801 Revert breaking commit a3ec5b5fdf
and improve test
to be more complete. Also, add comments to make it easier for others to understand TimeRoutedAliasUpdateProcessorTest
This commit is contained in:
parent
c951578fca
commit
cf4d749410
|
@ -41,7 +41,6 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
import org.apache.solr.common.params.SolrParams;
|
import org.apache.solr.common.params.SolrParams;
|
||||||
import org.apache.solr.common.params.UpdateParams;
|
import org.apache.solr.common.params.UpdateParams;
|
||||||
import org.apache.solr.common.util.ExecutorUtil;
|
|
||||||
import org.apache.solr.core.CoreContainer;
|
import org.apache.solr.core.CoreContainer;
|
||||||
import org.apache.solr.core.SolrCore;
|
import org.apache.solr.core.SolrCore;
|
||||||
import org.apache.solr.handler.admin.CollectionsHandler;
|
import org.apache.solr.handler.admin.CollectionsHandler;
|
||||||
|
@ -97,9 +96,9 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
||||||
// never be updated by any async creation thread.
|
// never be updated by any async creation thread.
|
||||||
private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection. Sorted descending
|
private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection. Sorted descending
|
||||||
private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
|
private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
|
||||||
private volatile boolean executorRunning = false;
|
|
||||||
|
|
||||||
private ExecutorService preemptiveCreationWaitExecutor = newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("TRA-preemptive-creation-wait"));
|
// This will be updated out in async creation threads see preemptiveAsync(Runnable r) for details
|
||||||
|
private volatile ExecutorService preemptiveCreationExecutor;
|
||||||
|
|
||||||
public static UpdateRequestProcessor wrap(SolrQueryRequest req, UpdateRequestProcessor next) {
|
public static UpdateRequestProcessor wrap(SolrQueryRequest req, UpdateRequestProcessor next) {
|
||||||
//TODO get from "Collection property"
|
//TODO get from "Collection property"
|
||||||
|
@ -216,7 +215,7 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
||||||
// This next line blocks until all collections required by the current document have been created
|
// This next line blocks until all collections required by the current document have been created
|
||||||
return createAllRequiredCollections(docTimestamp, cmd.getPrintableId(), candidateCollectionDesc);
|
return createAllRequiredCollections(docTimestamp, cmd.getPrintableId(), candidateCollectionDesc);
|
||||||
case ASYNC_PREEMPTIVE:
|
case ASYNC_PREEMPTIVE:
|
||||||
if (!executorRunning) {
|
if (preemptiveCreationExecutor == null) {
|
||||||
// It's important not to add code between here and the prior call to findCandidateGivenTimestamp()
|
// It's important not to add code between here and the prior call to findCandidateGivenTimestamp()
|
||||||
// in processAdd() that invokes updateParsedCollectionAliases(). Doing so would update parsedCollectionsDesc
|
// in processAdd() that invokes updateParsedCollectionAliases(). Doing so would update parsedCollectionsDesc
|
||||||
// and create a race condition. We are relying on the fact that get(0) is returning the head of the parsed
|
// and create a race condition. We are relying on the fact that get(0) is returning the head of the parsed
|
||||||
|
@ -248,17 +247,13 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
||||||
// would need to be shut down in a close hook to avoid test failures due to thread leaks in tests which is slightly
|
// would need to be shut down in a close hook to avoid test failures due to thread leaks in tests which is slightly
|
||||||
// more complicated from a code maintenance and readability stand point. An executor must used instead of a
|
// more complicated from a code maintenance and readability stand point. An executor must used instead of a
|
||||||
// thread to ensure we pick up the proper MDC logging stuff from ExecutorUtil.
|
// thread to ensure we pick up the proper MDC logging stuff from ExecutorUtil.
|
||||||
executorRunning = true;
|
|
||||||
DefaultSolrThreadFactory threadFactory = new DefaultSolrThreadFactory("TRA-preemptive-creation");
|
DefaultSolrThreadFactory threadFactory = new DefaultSolrThreadFactory("TRA-preemptive-creation");
|
||||||
ExecutorService preemptiveCreationExecutor = newMDCAwareSingleThreadExecutor(threadFactory);
|
preemptiveCreationExecutor = newMDCAwareSingleThreadExecutor(threadFactory);
|
||||||
|
|
||||||
preemptiveCreationExecutor.execute(() -> {
|
preemptiveCreationExecutor.execute(() -> {
|
||||||
r.run();
|
r.run();
|
||||||
preemptiveCreationExecutor.shutdown();
|
preemptiveCreationExecutor.shutdown();
|
||||||
executorRunning = false;
|
preemptiveCreationExecutor = null;
|
||||||
});
|
});
|
||||||
|
|
||||||
preemptiveCreationWaitExecutor.submit(() -> ExecutorUtil.awaitTermination(preemptiveCreationExecutor));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -418,11 +413,7 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
||||||
try {
|
try {
|
||||||
cmdDistrib.close();
|
cmdDistrib.close();
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
super.doClose();
|
||||||
super.doClose();
|
|
||||||
} finally {
|
|
||||||
ExecutorUtil.shutdownAndAwaitTermination(preemptiveCreationWaitExecutor);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -366,6 +366,17 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
||||||
// cause some collections to be created
|
// cause some collections to be created
|
||||||
|
|
||||||
ModifiableSolrParams params = params();
|
ModifiableSolrParams params = params();
|
||||||
|
|
||||||
|
// TIME SENSITIVE SECTION BEGINS
|
||||||
|
|
||||||
|
// In this section we intentionally rely on timing of a race condition but the gap in collection creation time vs
|
||||||
|
// requesting the list of aliases and adding a single doc should be very large (1-2 seconds vs a few ms so we
|
||||||
|
// should always win the race) This is necessary because we are testing that we can guard against specific race
|
||||||
|
// conditions that happen while a collection is being created. To test this without timing sensitivity we would
|
||||||
|
// need a means to pass a semaphore to the server that it can use to delay collection creation
|
||||||
|
//
|
||||||
|
// This section must NOT gain any Thread.sleep() statements, nor should it gain any long running operations
|
||||||
|
|
||||||
assertUpdateResponse(add(alias, Arrays.asList(
|
assertUpdateResponse(add(alias, Arrays.asList(
|
||||||
sdoc("id", "2", "timestamp_dt", "2017-10-24T00:00:00Z"),
|
sdoc("id", "2", "timestamp_dt", "2017-10-24T00:00:00Z"),
|
||||||
sdoc("id", "3", "timestamp_dt", "2017-10-25T00:00:00Z"),
|
sdoc("id", "3", "timestamp_dt", "2017-10-25T00:00:00Z"),
|
||||||
|
@ -375,19 +386,29 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
||||||
assertUpdateResponse(solrClient.commit(alias));
|
assertUpdateResponse(solrClient.commit(alias));
|
||||||
|
|
||||||
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
||||||
assertEquals(4, cols.size());
|
assertEquals(3, cols.size());
|
||||||
|
assertTrue("Preemptive creation appears to not be asynchronous anymore",!cols.contains("myalias_2017-10-26"));
|
||||||
assertNumDocs("2017-10-23", 1);
|
assertNumDocs("2017-10-23", 1);
|
||||||
assertNumDocs("2017-10-24", 1);
|
assertNumDocs("2017-10-24", 1);
|
||||||
assertNumDocs("2017-10-25", 3);
|
assertNumDocs("2017-10-25", 3);
|
||||||
|
|
||||||
|
// Here we quickly add another doc in a separate request, before the collection creation has completed.
|
||||||
|
// This has the potential to incorrectly cause preemptive collection creation to run twice and create a
|
||||||
|
// second collection. TimeRoutedAliasUpdateProcessor is meant to guard against this race condition.
|
||||||
assertUpdateResponse(add(alias, Collections.singletonList(
|
assertUpdateResponse(add(alias, Collections.singletonList(
|
||||||
sdoc("id", "6", "timestamp_dt", "2017-10-25T23:01:00Z")), // might cause duplicate preemptive creation
|
sdoc("id", "6", "timestamp_dt", "2017-10-25T23:01:00Z")), // might cause duplicate preemptive creation
|
||||||
params));
|
params));
|
||||||
assertUpdateResponse(solrClient.commit(alias));
|
assertUpdateResponse(solrClient.commit(alias));
|
||||||
|
|
||||||
|
// TIME SENSITIVE SECTION ENDS
|
||||||
|
|
||||||
waitCol("2017-10-26", numShards);
|
waitCol("2017-10-26", numShards);
|
||||||
|
|
||||||
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
||||||
assertEquals(5, cols.size());
|
assertTrue("Preemptive creation happened twice and created a collection " +
|
||||||
|
"further in the future than the configured time slice!",!cols.contains("myalias_2017-10-27"));
|
||||||
|
|
||||||
|
assertEquals(4, cols.size());
|
||||||
assertNumDocs("2017-10-23", 1);
|
assertNumDocs("2017-10-23", 1);
|
||||||
assertNumDocs("2017-10-24", 1);
|
assertNumDocs("2017-10-24", 1);
|
||||||
assertNumDocs("2017-10-25", 4);
|
assertNumDocs("2017-10-25", 4);
|
||||||
|
@ -398,13 +419,13 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
||||||
.addProperty(TimeRoutedAlias.ROUTER_PREEMPTIVE_CREATE_MATH, "3DAY").process(solrClient);
|
.addProperty(TimeRoutedAlias.ROUTER_PREEMPTIVE_CREATE_MATH, "3DAY").process(solrClient);
|
||||||
|
|
||||||
assertUpdateResponse(add(alias, Collections.singletonList(
|
assertUpdateResponse(add(alias, Collections.singletonList(
|
||||||
sdoc("id", "7", "timestamp_dt", "2017-10-25T23:01:00Z")), // should cause preemptive creation now
|
sdoc("id", "7", "timestamp_dt", "2017-10-25T23:01:00Z")), // should cause preemptive creation of 10-27 now
|
||||||
params));
|
params));
|
||||||
assertUpdateResponse(solrClient.commit(alias));
|
assertUpdateResponse(solrClient.commit(alias));
|
||||||
waitCol("2017-10-27", numShards);
|
waitCol("2017-10-27", numShards);
|
||||||
|
|
||||||
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
||||||
assertEquals(6,cols.size()); // only one created in async case
|
assertEquals(5,cols.size()); // only one created in async case
|
||||||
assertNumDocs("2017-10-23", 1);
|
assertNumDocs("2017-10-23", 1);
|
||||||
assertNumDocs("2017-10-24", 1);
|
assertNumDocs("2017-10-24", 1);
|
||||||
assertNumDocs("2017-10-25", 5);
|
assertNumDocs("2017-10-25", 5);
|
||||||
|
@ -412,14 +433,14 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
||||||
assertNumDocs("2017-10-27", 0);
|
assertNumDocs("2017-10-27", 0);
|
||||||
|
|
||||||
assertUpdateResponse(add(alias, Collections.singletonList(
|
assertUpdateResponse(add(alias, Collections.singletonList(
|
||||||
sdoc("id", "8", "timestamp_dt", "2017-10-25T23:01:00Z")), // should cause preemptive creation now
|
sdoc("id", "8", "timestamp_dt", "2017-10-25T23:01:00Z")), // should cause preemptive creation of 10-28 now
|
||||||
params));
|
params));
|
||||||
assertUpdateResponse(solrClient.commit(alias));
|
assertUpdateResponse(solrClient.commit(alias));
|
||||||
waitCol("2017-10-27", numShards);
|
waitCol("2017-10-27", numShards);
|
||||||
waitCol("2017-10-28", numShards);
|
waitCol("2017-10-28", numShards);
|
||||||
|
|
||||||
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
||||||
assertEquals(7,cols.size()); // Subsequent documents continue to create up to limit
|
assertEquals(6,cols.size()); // Subsequent documents continue to create up to limit
|
||||||
assertNumDocs("2017-10-23", 1);
|
assertNumDocs("2017-10-23", 1);
|
||||||
assertNumDocs("2017-10-24", 1);
|
assertNumDocs("2017-10-24", 1);
|
||||||
assertNumDocs("2017-10-25", 6);
|
assertNumDocs("2017-10-25", 6);
|
||||||
|
@ -451,7 +472,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
||||||
waitCol("2017-10-29", numShards);
|
waitCol("2017-10-29", numShards);
|
||||||
|
|
||||||
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
||||||
assertEquals(8,cols.size());
|
assertEquals(7,cols.size());
|
||||||
assertNumDocs("2017-10-23", 1);
|
assertNumDocs("2017-10-23", 1);
|
||||||
assertNumDocs("2017-10-24", 1);
|
assertNumDocs("2017-10-24", 1);
|
||||||
assertNumDocs("2017-10-25", 6);
|
assertNumDocs("2017-10-25", 6);
|
||||||
|
@ -490,6 +511,35 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
||||||
"rows", "0"));
|
"rows", "0"));
|
||||||
assertEquals(13, resp.getResults().getNumFound());
|
assertEquals(13, resp.getResults().getNumFound());
|
||||||
|
|
||||||
|
assertUpdateResponse(add(alias, Collections.singletonList(
|
||||||
|
sdoc("id", "14", "timestamp_dt", "2017-10-31T23:01:00Z")), // should cause preemptive creation 11-01
|
||||||
|
params));
|
||||||
|
waitCol("2017-11-01", numShards);
|
||||||
|
|
||||||
|
assertUpdateResponse(add(alias, Collections.singletonList(
|
||||||
|
sdoc("id", "15", "timestamp_dt", "2017-10-31T23:01:00Z")), // should cause preemptive creation 11-02
|
||||||
|
params));
|
||||||
|
waitCol("2017-11-02", numShards);
|
||||||
|
|
||||||
|
assertUpdateResponse(add(alias, Collections.singletonList(
|
||||||
|
sdoc("id", "16", "timestamp_dt", "2017-10-31T23:01:00Z")), // should cause preemptive creation 11-03
|
||||||
|
params));
|
||||||
|
waitCol("2017-11-03", numShards);
|
||||||
|
|
||||||
|
assertUpdateResponse(add(alias, Collections.singletonList(
|
||||||
|
sdoc("id", "17", "timestamp_dt", "2017-10-31T23:01:00Z")), // should NOT cause preemptive creation 11-04
|
||||||
|
params));
|
||||||
|
|
||||||
|
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
||||||
|
assertTrue("Preemptive creation beyond ROUTER_PREEMPTIVE_CREATE_MATH setting of 3DAY!",!cols.contains("myalias_2017-11-04"));
|
||||||
|
|
||||||
|
assertUpdateResponse(add(alias, Collections.singletonList(
|
||||||
|
sdoc("id", "18", "timestamp_dt", "2017-11-01T23:01:00Z")), // should NOT cause preemptive creation 11-04
|
||||||
|
params));
|
||||||
|
waitCol("2017-11-04",numShards);
|
||||||
|
|
||||||
|
Thread.sleep(2000); // allow the executor used in preemptive creation time to shut down.
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertNumDocs(final String datePart, int expected) throws SolrServerException, IOException {
|
private void assertNumDocs(final String datePart, int expected) throws SolrServerException, IOException {
|
||||||
|
|
Loading…
Reference in New Issue