mirror of https://github.com/apache/lucene.git
SOLR-13051 improve TRA update processor test
- remove need to Thread.sleep() - better async mechanism linked to SolrCore lifecycle - add some additional tests to be a bit more thorough
This commit is contained in:
parent
69cbe29e78
commit
0f1da2bc14
|
@ -260,6 +260,7 @@ public final class SolrCore implements SolrInfoBean, SolrMetricProducer, Closeab
|
|||
|
||||
static int boolean_query_max_clause_count = Integer.MIN_VALUE;
|
||||
|
||||
private ExecutorService coreAsyncTaskExecutor = ExecutorUtil.newMDCAwareCachedThreadPool("Core Async Task");
|
||||
|
||||
/**
|
||||
* The SolrResourceLoader used to load all resources for this core.
|
||||
|
@ -1532,6 +1533,8 @@ public final class SolrCore implements SolrInfoBean, SolrMetricProducer, Closeab
|
|||
}
|
||||
log.info("{} CLOSING SolrCore {}", logid, this);
|
||||
|
||||
ExecutorUtil.shutdownAndAwaitTermination(coreAsyncTaskExecutor);
|
||||
|
||||
// stop reporting metrics
|
||||
try {
|
||||
coreMetricManager.close();
|
||||
|
@ -3166,4 +3169,26 @@ public final class SolrCore implements SolrInfoBean, SolrMetricProducer, Closeab
|
|||
});
|
||||
return blobRef;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run an arbitrary task in it's own thread. This is an expert option and is
|
||||
* a method you should use with great care. It would be bad to run something that never stopped
|
||||
* or run something that took a very long time. Typically this is intended for actions that take
|
||||
* a few seconds, and therefore would be bad to wait for within a request, but but would not pose
|
||||
* a significant hindrance to server shut down times. It is not intended for long running tasks
|
||||
* and if you are using a Runnable with a loop in it, you are almost certainly doing it wrong.
|
||||
* <p>
|
||||
* WARNING: Solr wil not be able to shut down gracefully until this task completes!
|
||||
* <p>
|
||||
* A significant upside of using this method vs creating your own ExecutorService is that your code
|
||||
* does not have to properly shutdown executors which typically is risky from a unit testing
|
||||
* perspective since the test framework will complain if you don't carefully ensure the executor
|
||||
* shuts down before the end of the test. Also the threads running this task are sure to have
|
||||
* a proper MDC for logging.
|
||||
*
|
||||
* @param r the task to run
|
||||
*/
|
||||
public void runAsync(Runnable r) {
|
||||
coreAsyncTaskExecutor.submit(r);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import java.util.Collections;
|
|||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
|
@ -52,12 +51,10 @@ import org.apache.solr.update.DeleteUpdateCommand;
|
|||
import org.apache.solr.update.SolrCmdDistributor;
|
||||
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
|
||||
import org.apache.solr.util.DateMathParser;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||
import static org.apache.solr.common.util.ExecutorUtil.newMDCAwareSingleThreadExecutor;
|
||||
import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM;
|
||||
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
|
||||
import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.CreationType.ASYNC_PREEMPTIVE;
|
||||
|
@ -97,8 +94,10 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
|||
private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection. Sorted descending
|
||||
private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
|
||||
|
||||
// This will be updated out in async creation threads see preemptiveAsync(Runnable r) for details
|
||||
private volatile ExecutorService preemptiveCreationExecutor;
|
||||
// This class is created once per request and the overseer methods prevent duplicate create requests
|
||||
// from creating extra copies. All we need to track here is that we don't spam preemptive creates to
|
||||
// the overseer multiple times from *this* request.
|
||||
private volatile boolean preemptiveCreateOnceAlready = false;
|
||||
|
||||
public static UpdateRequestProcessor wrap(SolrQueryRequest req, UpdateRequestProcessor next) {
|
||||
//TODO get from "Collection property"
|
||||
|
@ -215,7 +214,8 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
|||
// This next line blocks until all collections required by the current document have been created
|
||||
return createAllRequiredCollections(docTimestamp, cmd.getPrintableId(), candidateCollectionDesc);
|
||||
case ASYNC_PREEMPTIVE:
|
||||
if (preemptiveCreationExecutor == null) {
|
||||
if (!preemptiveCreateOnceAlready) {
|
||||
log.info("EXECUTING preemptive creation for {}", timeRoutedAlias.getAliasName());
|
||||
// It's important not to add code between here and the prior call to findCandidateGivenTimestamp()
|
||||
// in processAdd() that invokes updateParsedCollectionAliases(). Doing so would update parsedCollectionsDesc
|
||||
// and create a race condition. We are relying on the fact that get(0) is returning the head of the parsed
|
||||
|
@ -242,18 +242,8 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
|||
}
|
||||
|
||||
private void preemptiveAsync(Runnable r) {
|
||||
// Note: creating an executor and throwing it away is slightly expensive, but this is only likely to happen
|
||||
// once per hour/day/week (depending on time slice size for the TRA). If the executor were retained, it
|
||||
// would need to be shut down in a close hook to avoid test failures due to thread leaks in tests which is slightly
|
||||
// more complicated from a code maintenance and readability stand point. An executor must used instead of a
|
||||
// thread to ensure we pick up the proper MDC logging stuff from ExecutorUtil.
|
||||
DefaultSolrThreadFactory threadFactory = new DefaultSolrThreadFactory("TRA-preemptive-creation");
|
||||
preemptiveCreationExecutor = newMDCAwareSingleThreadExecutor(threadFactory);
|
||||
preemptiveCreationExecutor.execute(() -> {
|
||||
r.run();
|
||||
preemptiveCreationExecutor.shutdown();
|
||||
preemptiveCreationExecutor = null;
|
||||
});
|
||||
preemptiveCreateOnceAlready = true;
|
||||
req.getCore().runAsync(r);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -479,9 +469,11 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
|||
return targetCollectionDesc.getValue(); // we don't need another collection
|
||||
case ASYNC_PREEMPTIVE:
|
||||
// can happen when preemptive interval is longer than one time slice
|
||||
if (!preemptiveCreateOnceAlready) {
|
||||
String mostRecentCollName = this.parsedCollectionsDesc.get(0).getValue();
|
||||
preemptiveAsync(() -> createNextCollection(mostRecentCollName));
|
||||
return targetCollectionDesc.getValue();
|
||||
}
|
||||
case SYNCHRONOUS:
|
||||
createNextCollection(targetCollectionDesc.getValue()); // *should* throw if fails for some reason but...
|
||||
if (!updateParsedCollectionAliases()) { // thus we didn't make progress...
|
||||
|
@ -495,7 +487,6 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
|||
break;
|
||||
default:
|
||||
throw unknownCreateType();
|
||||
|
||||
}
|
||||
} while (true);
|
||||
}
|
||||
|
|
|
@ -29,14 +29,16 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
|
@ -54,14 +56,17 @@ import org.apache.solr.cloud.api.collections.TimeRoutedAlias;
|
|||
import org.apache.solr.common.SolrDocumentList;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.Aliases;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.CoreDescriptor;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.response.SolrQueryResponse;
|
||||
|
@ -75,13 +80,12 @@ import org.junit.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
// This feature has a leak
|
||||
@AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-12801")
|
||||
@LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-13059")
|
||||
public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static final String configName = "timeConfig";
|
||||
private static final String alias = "myalias";
|
||||
private static final String alias2 = "myalias2";
|
||||
private static final String timeField = "timestamp_dt";
|
||||
private static final String intField = "integer_i";
|
||||
|
||||
|
@ -228,6 +232,8 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
|||
|
||||
CollectionAdminRequest.createCollection(configName, configName, 1, 1).process(solrClient);
|
||||
|
||||
// TODO: fix SOLR-13059, a where this wait isn't working ~0.3% of the time.
|
||||
waitCol(1,configName);
|
||||
// manipulate the config...
|
||||
checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config")
|
||||
.withMethod(SolrRequest.METHOD.POST)
|
||||
|
@ -289,9 +295,9 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
|||
assertUpdateResponse(solrClient.commit(alias));
|
||||
|
||||
// wait for all the collections to exist...
|
||||
waitCol("2017-10-23", numShards);
|
||||
waitCol("2017-10-24", numShards);
|
||||
waitCol("2017-10-25", numShards);
|
||||
waitColAndAlias("2017-10-23", numShards, alias);
|
||||
waitColAndAlias("2017-10-24", numShards, alias);
|
||||
waitColAndAlias("2017-10-25", numShards, alias);
|
||||
|
||||
// at this point we now have 3 collections with 4 shards each, and 3 replicas per shard for a total of
|
||||
// 36 total replicas, 1/3 of which are leaders. We will add 3 docs and each has a 33% chance of hitting a
|
||||
|
@ -347,75 +353,108 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
|||
.setMaxShardsPerNode(numReplicas)).setPreemptiveCreateWindow("3HOUR")
|
||||
.process(solrClient);
|
||||
|
||||
// cause some collections to be created
|
||||
assertUpdateResponse(solrClient.add(alias,
|
||||
sdoc("id","1","timestamp_dt", "2017-10-25T00:00:00Z")
|
||||
));
|
||||
assertUpdateResponse(solrClient.commit(alias));
|
||||
// needed to verify that preemptive creation in one alias doesn't inhibit preemptive creation in another
|
||||
CollectionAdminRequest.createTimeRoutedAlias(alias2, "2017-10-23T00:00:00Z", "+1DAY", timeField,
|
||||
CollectionAdminRequest.createCollection("_unused_", configName, numShards, numReplicas)
|
||||
.setMaxShardsPerNode(numReplicas)).setPreemptiveCreateWindow("3HOUR")
|
||||
.process(solrClient);
|
||||
|
||||
// wait for all the collections to exist...
|
||||
waitCol("2017-10-23", numShards); // This one should have already existed from the alias creation
|
||||
waitCol("2017-10-24", numShards); // Create 1
|
||||
waitCol("2017-10-25", numShards); // Create 2nd synchronously (ensure this is not broken)
|
||||
|
||||
// normal update, nothing special, no collection creation required.
|
||||
List<String> cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
||||
assertEquals(3,cols.size());
|
||||
|
||||
assertNumDocs("2017-10-23", 0);
|
||||
assertNumDocs("2017-10-24", 0);
|
||||
assertNumDocs("2017-10-25", 1);
|
||||
|
||||
// cause some collections to be created
|
||||
addOneDocSynchCreation(numShards, alias);
|
||||
addOneDocSynchCreation(numShards, alias2);
|
||||
|
||||
List<String> cols;
|
||||
ModifiableSolrParams params = params();
|
||||
|
||||
// TIME SENSITIVE SECTION BEGINS
|
||||
// Using threads to ensure that two TRA's are simultaneously preemptively creating and don't
|
||||
// interfere with each other
|
||||
ExecutorService executorService = ExecutorUtil.newMDCAwareCachedThreadPool("TimeRoutedAliasProcessorTestx-testPreemptiveCreation");
|
||||
|
||||
// In this section we intentionally rely on timing of a race condition but the gap in collection creation time vs
|
||||
// requesting the list of aliases and adding a single doc should be very large (1-2 seconds vs a few ms so we
|
||||
// should always win the race) This is necessary because we are testing that we can guard against specific race
|
||||
// conditions that happen while a collection is being created. To test this without timing sensitivity we would
|
||||
// need a means to pass a semaphore to the server that it can use to delay collection creation
|
||||
Exception[] threadExceptions = new Exception[2];
|
||||
boolean[] threadStarted = new boolean[2];
|
||||
boolean[] threadFinished = new boolean[2];
|
||||
try {
|
||||
CountDownLatch starter = new CountDownLatch(1);
|
||||
executorService.submit(() -> {
|
||||
threadStarted[0] = true;
|
||||
try {
|
||||
starter.await();
|
||||
concurrentUpdates(params, alias);
|
||||
} catch (Exception e) {
|
||||
threadExceptions[0] = e;
|
||||
}
|
||||
threadFinished[0] = true;
|
||||
});
|
||||
|
||||
executorService.submit(() -> {
|
||||
threadStarted[1] = true;
|
||||
try {
|
||||
starter.await();
|
||||
concurrentUpdates(params, alias2);
|
||||
} catch (Exception e) {
|
||||
threadExceptions[1] = e;
|
||||
}
|
||||
threadFinished[1] = true;
|
||||
});
|
||||
starter.countDown();
|
||||
} finally {
|
||||
ExecutorUtil.shutdownAndAwaitTermination(executorService);
|
||||
}
|
||||
|
||||
// threads are known to be terminated by now, check for exceptions
|
||||
for (Exception threadException : threadExceptions) {
|
||||
if (threadException != null) {
|
||||
Thread.sleep(5000); // avoid spurious fails due to TRA thread not done yet
|
||||
//noinspection ThrowFromFinallyBlock
|
||||
throw threadException;
|
||||
}
|
||||
}
|
||||
|
||||
// just for confidence that there's nothing dodgy about how the threads executed.
|
||||
assertTrue(threadStarted[0]);
|
||||
assertTrue(threadStarted[1]);
|
||||
assertTrue(threadFinished[0]);
|
||||
assertTrue(threadFinished[1]);
|
||||
|
||||
// if one of these times out then the test has failed due to interference between aliases
|
||||
waitColAndAlias("2017-10-26", numShards, alias);
|
||||
waitColAndAlias("2017-10-26", numShards, alias2);
|
||||
|
||||
// after this we can ignore alias2
|
||||
checkPreemptiveCase1(alias);
|
||||
checkPreemptiveCase1(alias2);
|
||||
|
||||
// Some designs contemplated with close hooks were not properly restricted to the core and would have
|
||||
// failed after other cores with other TRAs were stopped. Make sure that we don't fall into that trap in
|
||||
// the future. The basic problem with a close hook solution is that one either winds up putting the
|
||||
// executor on the TRAUP where it's duplicated/initiated for every request, or putting it at the class level
|
||||
// in which case the hook will remove it for all TRA's which can pass a single TRA test nicely but is not safe
|
||||
// where multiple TRA's might come and go.
|
||||
//
|
||||
// This section must NOT gain any Thread.sleep() statements, nor should it gain any long running operations
|
||||
// Start and stop some cores that have TRA's... 2x2 used to ensure every jetty gets at least one
|
||||
|
||||
assertUpdateResponse(add(alias, Arrays.asList(
|
||||
sdoc("id", "2", "timestamp_dt", "2017-10-24T00:00:00Z"),
|
||||
sdoc("id", "3", "timestamp_dt", "2017-10-25T00:00:00Z"),
|
||||
sdoc("id", "4", "timestamp_dt", "2017-10-23T00:00:00Z"),
|
||||
sdoc("id", "5", "timestamp_dt", "2017-10-25T23:00:00Z")), // should cause preemptive creation
|
||||
params));
|
||||
assertUpdateResponse(solrClient.commit(alias));
|
||||
CollectionAdminRequest.createTimeRoutedAlias("foo", "2017-10-23T00:00:00Z", "+1DAY", timeField,
|
||||
CollectionAdminRequest.createCollection("_unused_", configName, 2, 2)
|
||||
.setMaxShardsPerNode(numReplicas)).setPreemptiveCreateWindow("3HOUR")
|
||||
.process(solrClient);
|
||||
|
||||
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
||||
assertEquals(3, cols.size());
|
||||
assertTrue("Preemptive creation appears to not be asynchronous anymore",!cols.contains("myalias_2017-10-26"));
|
||||
assertNumDocs("2017-10-23", 1);
|
||||
assertNumDocs("2017-10-24", 1);
|
||||
assertNumDocs("2017-10-25", 3);
|
||||
waitColAndAlias("2017-10-23",2, "foo");
|
||||
waitCoreCount("foo_2017-10-23", 1); // prove this works, for confidence in deletion checking below.
|
||||
assertUpdateResponse(solrClient.add("foo",
|
||||
sdoc("id","1","timestamp_dt", "2017-10-23T00:00:00Z") // no extra collections should be created
|
||||
));
|
||||
assertUpdateResponse(solrClient.commit("foo"));
|
||||
|
||||
// Here we quickly add another doc in a separate request, before the collection creation has completed.
|
||||
// This has the potential to incorrectly cause preemptive collection creation to run twice and create a
|
||||
// second collection. TimeRoutedAliasUpdateProcessor is meant to guard against this race condition.
|
||||
assertUpdateResponse(add(alias, Collections.singletonList(
|
||||
sdoc("id", "6", "timestamp_dt", "2017-10-25T23:01:00Z")), // might cause duplicate preemptive creation
|
||||
params));
|
||||
assertUpdateResponse(solrClient.commit(alias));
|
||||
List<String> foo = solrClient.getClusterStateProvider().resolveAlias("foo");
|
||||
|
||||
// TIME SENSITIVE SECTION ENDS
|
||||
CollectionAdminRequest.deleteAlias("foo").process(solrClient);
|
||||
|
||||
waitCol("2017-10-26", numShards);
|
||||
for (String colName : foo) {
|
||||
CollectionAdminRequest.deleteCollection(colName).process(solrClient);
|
||||
waitCoreCount(colName, 0);
|
||||
}
|
||||
|
||||
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
||||
assertTrue("Preemptive creation happened twice and created a collection " +
|
||||
"further in the future than the configured time slice!",!cols.contains("myalias_2017-10-27"));
|
||||
|
||||
assertEquals(4, cols.size());
|
||||
assertNumDocs("2017-10-23", 1);
|
||||
assertNumDocs("2017-10-24", 1);
|
||||
assertNumDocs("2017-10-25", 4);
|
||||
assertNumDocs("2017-10-26", 0);
|
||||
// if the design for terminating our executor is correct create/delete above will not cause failures below
|
||||
// continue testing...
|
||||
|
||||
// now test with pre-create window longer than time slice, and forcing multiple creations.
|
||||
CollectionAdminRequest.setAliasProperty(alias)
|
||||
|
@ -425,31 +464,31 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
|||
sdoc("id", "7", "timestamp_dt", "2017-10-25T23:01:00Z")), // should cause preemptive creation of 10-27 now
|
||||
params));
|
||||
assertUpdateResponse(solrClient.commit(alias));
|
||||
waitCol("2017-10-27", numShards);
|
||||
waitColAndAlias("2017-10-27", numShards, alias);
|
||||
|
||||
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
||||
assertEquals(5,cols.size()); // only one created in async case
|
||||
assertNumDocs("2017-10-23", 1);
|
||||
assertNumDocs("2017-10-24", 1);
|
||||
assertNumDocs("2017-10-25", 5);
|
||||
assertNumDocs("2017-10-26", 0);
|
||||
assertNumDocs("2017-10-27", 0);
|
||||
assertNumDocs("2017-10-23", 1, alias);
|
||||
assertNumDocs("2017-10-24", 1, alias);
|
||||
assertNumDocs("2017-10-25", 5, alias);
|
||||
assertNumDocs("2017-10-26", 0, alias);
|
||||
assertNumDocs("2017-10-27", 0, alias);
|
||||
|
||||
assertUpdateResponse(add(alias, Collections.singletonList(
|
||||
sdoc("id", "8", "timestamp_dt", "2017-10-25T23:01:00Z")), // should cause preemptive creation of 10-28 now
|
||||
params));
|
||||
assertUpdateResponse(solrClient.commit(alias));
|
||||
waitCol("2017-10-27", numShards);
|
||||
waitCol("2017-10-28", numShards);
|
||||
waitColAndAlias("2017-10-27", numShards, alias);
|
||||
waitColAndAlias("2017-10-28", numShards, alias);
|
||||
|
||||
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
||||
assertEquals(6,cols.size()); // Subsequent documents continue to create up to limit
|
||||
assertNumDocs("2017-10-23", 1);
|
||||
assertNumDocs("2017-10-24", 1);
|
||||
assertNumDocs("2017-10-25", 6);
|
||||
assertNumDocs("2017-10-26", 0);
|
||||
assertNumDocs("2017-10-27", 0);
|
||||
assertNumDocs("2017-10-28", 0);
|
||||
assertNumDocs("2017-10-23", 1, alias);
|
||||
assertNumDocs("2017-10-24", 1, alias);
|
||||
assertNumDocs("2017-10-25", 6, alias);
|
||||
assertNumDocs("2017-10-26", 0, alias);
|
||||
assertNumDocs("2017-10-27", 0, alias);
|
||||
assertNumDocs("2017-10-28", 0, alias);
|
||||
|
||||
QueryResponse resp;
|
||||
resp = solrClient.query(alias, params(
|
||||
|
@ -472,17 +511,17 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
|||
sdoc("id", "12", "timestamp_dt", "2017-10-28T23:03:00Z")), // should be ignored due to in progress creation
|
||||
params));
|
||||
assertUpdateResponse(solrClient.commit(alias));
|
||||
waitCol("2017-10-29", numShards);
|
||||
waitColAndAlias("2017-10-29", numShards, alias);
|
||||
|
||||
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
||||
assertEquals(7,cols.size());
|
||||
assertNumDocs("2017-10-23", 1);
|
||||
assertNumDocs("2017-10-24", 1);
|
||||
assertNumDocs("2017-10-25", 6);
|
||||
assertNumDocs("2017-10-26", 0);
|
||||
assertNumDocs("2017-10-27", 1);
|
||||
assertNumDocs("2017-10-28", 3); // should get through even though preemptive creation ignored it.
|
||||
assertNumDocs("2017-10-29", 0);
|
||||
assertNumDocs("2017-10-23", 1, alias);
|
||||
assertNumDocs("2017-10-24", 1, alias);
|
||||
assertNumDocs("2017-10-25", 6, alias);
|
||||
assertNumDocs("2017-10-26", 0, alias);
|
||||
assertNumDocs("2017-10-27", 1, alias);
|
||||
assertNumDocs("2017-10-28", 3, alias); // should get through even though preemptive creation ignored it.
|
||||
assertNumDocs("2017-10-29", 0, alias);
|
||||
|
||||
resp = solrClient.query(alias, params(
|
||||
"q", "*:*",
|
||||
|
@ -494,20 +533,20 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
|||
sdoc("id", "13", "timestamp_dt", "2017-10-30T23:03:00Z")), // lucky?
|
||||
params));
|
||||
assertUpdateResponse(solrClient.commit(alias));
|
||||
waitCol("2017-10-30", numShards);
|
||||
waitCol("2017-10-31", numShards); // spooky! async case arising in middle of sync creation!!
|
||||
waitColAndAlias("2017-10-30", numShards, alias);
|
||||
waitColAndAlias("2017-10-31", numShards, alias); // spooky! async case arising in middle of sync creation!!
|
||||
|
||||
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
||||
assertEquals(9,cols.size());
|
||||
assertNumDocs("2017-10-23", 1);
|
||||
assertNumDocs("2017-10-24", 1);
|
||||
assertNumDocs("2017-10-25", 6);
|
||||
assertNumDocs("2017-10-26", 0);
|
||||
assertNumDocs("2017-10-27", 1);
|
||||
assertNumDocs("2017-10-28", 3); // should get through even though preemptive creation ignored it.
|
||||
assertNumDocs("2017-10-29", 0);
|
||||
assertNumDocs("2017-10-30", 1);
|
||||
assertNumDocs("2017-10-31", 0);
|
||||
assertNumDocs("2017-10-23", 1, alias);
|
||||
assertNumDocs("2017-10-24", 1, alias);
|
||||
assertNumDocs("2017-10-25", 6, alias);
|
||||
assertNumDocs("2017-10-26", 0, alias);
|
||||
assertNumDocs("2017-10-27", 1, alias);
|
||||
assertNumDocs("2017-10-28", 3, alias); // should get through even though preemptive creation ignored it.
|
||||
assertNumDocs("2017-10-29", 0, alias);
|
||||
assertNumDocs("2017-10-30", 1, alias);
|
||||
assertNumDocs("2017-10-31", 0, alias);
|
||||
|
||||
resp = solrClient.query(alias, params(
|
||||
"q", "*:*",
|
||||
|
@ -517,17 +556,17 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
|||
assertUpdateResponse(add(alias, Collections.singletonList(
|
||||
sdoc("id", "14", "timestamp_dt", "2017-10-31T23:01:00Z")), // should cause preemptive creation 11-01
|
||||
params));
|
||||
waitCol("2017-11-01", numShards);
|
||||
waitColAndAlias("2017-11-01", numShards, alias);
|
||||
|
||||
assertUpdateResponse(add(alias, Collections.singletonList(
|
||||
sdoc("id", "15", "timestamp_dt", "2017-10-31T23:01:00Z")), // should cause preemptive creation 11-02
|
||||
params));
|
||||
waitCol("2017-11-02", numShards);
|
||||
waitColAndAlias("2017-11-02", numShards, alias);
|
||||
|
||||
assertUpdateResponse(add(alias, Collections.singletonList(
|
||||
sdoc("id", "16", "timestamp_dt", "2017-10-31T23:01:00Z")), // should cause preemptive creation 11-03
|
||||
params));
|
||||
waitCol("2017-11-03", numShards);
|
||||
waitColAndAlias("2017-11-03", numShards, alias);
|
||||
|
||||
assertUpdateResponse(add(alias, Collections.singletonList(
|
||||
sdoc("id", "17", "timestamp_dt", "2017-10-31T23:01:00Z")), // should NOT cause preemptive creation 11-04
|
||||
|
@ -539,13 +578,106 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
|||
assertUpdateResponse(add(alias, Collections.singletonList(
|
||||
sdoc("id", "18", "timestamp_dt", "2017-11-01T23:01:00Z")), // should cause preemptive creation 11-04
|
||||
params));
|
||||
waitCol("2017-11-04",numShards);
|
||||
|
||||
Thread.sleep(2000); // allow the executor used in preemptive creation time to shut down.
|
||||
waitColAndAlias("2017-11-04",numShards, alias);
|
||||
|
||||
}
|
||||
|
||||
private void assertNumDocs(final String datePart, int expected) throws SolrServerException, IOException {
|
||||
// used to verify a core has been deleted (count = 0)
|
||||
private void waitCoreCount(String collection, int count) {
|
||||
long start = System.nanoTime();
|
||||
CoreContainer coreContainer = cluster.getRandomJetty(random()).getCoreContainer();
|
||||
int coreFooCount;
|
||||
do {
|
||||
coreFooCount = 0;
|
||||
List<CoreDescriptor> coreDescriptors = coreContainer.getCoreDescriptors();
|
||||
for (CoreDescriptor coreDescriptor : coreDescriptors) {
|
||||
String collectionName = coreDescriptor.getCollectionName();
|
||||
if (collection.equals(collectionName)) {
|
||||
coreFooCount ++;
|
||||
}
|
||||
}
|
||||
if (NANOSECONDS.toSeconds(System.nanoTime() - start) > 10) {
|
||||
fail("took over 10 seconds after collection creation to update aliases");
|
||||
} else {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
} while(coreFooCount != count);
|
||||
}
|
||||
|
||||
private void concurrentUpdates(ModifiableSolrParams params, String alias) throws SolrServerException, IOException {
|
||||
// In this method we intentionally rely on timing of a race condition but the gap in collection creation time vs
|
||||
// requesting the list of aliases and adding a single doc should be very large (1-2 seconds vs a few ms so we
|
||||
// should always win the race) This is necessary because we are testing that we can guard against specific race
|
||||
// conditions that happen while a collection is being created. To test this without timing sensitivity we would
|
||||
// need a means to pass a semaphore to the server that it can use to delay collection creation
|
||||
//
|
||||
// This method must NOT gain any Thread.sleep() statements, nor should it gain any long running operations
|
||||
assertUpdateResponse(add(alias, Arrays.asList(
|
||||
sdoc("id", "2", "timestamp_dt", "2017-10-24T00:00:00Z"),
|
||||
sdoc("id", "3", "timestamp_dt", "2017-10-25T00:00:00Z"),
|
||||
sdoc("id", "4", "timestamp_dt", "2017-10-23T00:00:00Z"),
|
||||
sdoc("id", "5", "timestamp_dt", "2017-10-25T23:00:00Z")), // should cause preemptive creation
|
||||
params));
|
||||
assertUpdateResponse(solrClient.commit(alias));
|
||||
|
||||
List<String> colsT1;
|
||||
colsT1 = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
||||
assertEquals(3, colsT1.size());
|
||||
assertTrue("Preemptive creation appears to not be asynchronous anymore", !colsT1.contains("myalias_2017-10-26"));
|
||||
assertNumDocs("2017-10-23", 1, alias);
|
||||
assertNumDocs("2017-10-24", 1, alias);
|
||||
assertNumDocs("2017-10-25", 3, alias);
|
||||
|
||||
// Here we quickly add another doc in a separate request, before the collection creation has completed.
|
||||
// This has the potential to incorrectly cause preemptive collection creation to run twice and create a
|
||||
// second collection. TimeRoutedAliasUpdateProcessor is meant to guard against this race condition.
|
||||
assertUpdateResponse(add(alias, Collections.singletonList(
|
||||
sdoc("id", "6", "timestamp_dt", "2017-10-25T23:01:00Z")), // might cause duplicate preemptive creation
|
||||
params));
|
||||
assertUpdateResponse(solrClient.commit(alias));
|
||||
}
|
||||
|
||||
private void checkPreemptiveCase1(String alias) throws SolrServerException, IOException {
|
||||
List<String> cols;
|
||||
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
||||
assertTrue("Preemptive creation happened twice and created a collection " +
|
||||
"further in the future than the configured time slice!",!cols.contains("myalias_2017-10-27"));
|
||||
|
||||
assertEquals(4, cols.size());
|
||||
assertNumDocs("2017-10-23", 1, alias);
|
||||
assertNumDocs("2017-10-24", 1, alias);
|
||||
assertNumDocs("2017-10-25", 4, alias);
|
||||
assertNumDocs("2017-10-26", 0, alias);
|
||||
}
|
||||
|
||||
private void addOneDocSynchCreation(int numShards, String alias) throws SolrServerException, IOException, InterruptedException {
|
||||
// cause some collections to be created
|
||||
assertUpdateResponse(solrClient.add(alias,
|
||||
sdoc("id","1","timestamp_dt", "2017-10-25T00:00:00Z")
|
||||
));
|
||||
assertUpdateResponse(solrClient.commit(alias));
|
||||
|
||||
// wait for all the collections to exist...
|
||||
waitColAndAlias("2017-10-23", numShards, alias); // This one should have already existed from the alias creation
|
||||
waitColAndAlias("2017-10-24", numShards, alias); // Create 1
|
||||
waitColAndAlias("2017-10-25", numShards, alias); // Create 2nd synchronously (ensure this is not broken)
|
||||
|
||||
// normal update, nothing special, no collection creation required.
|
||||
List<String> cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
||||
assertEquals(3,cols.size());
|
||||
|
||||
assertNumDocs("2017-10-23", 0, alias);
|
||||
assertNumDocs("2017-10-24", 0, alias);
|
||||
assertNumDocs("2017-10-25", 1, alias);
|
||||
}
|
||||
|
||||
private void assertNumDocs(final String datePart, int expected, String alias) throws SolrServerException, IOException {
|
||||
QueryResponse resp = solrClient.query(alias + "_" + datePart, params(
|
||||
"q", "*:*",
|
||||
"rows", "10"));
|
||||
|
@ -572,9 +704,33 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
|||
return leaders;
|
||||
}
|
||||
|
||||
private void waitCol(final String datePart, int slices) throws InterruptedException {
|
||||
private void waitColAndAlias(final String datePart, int slices, String alias) throws InterruptedException {
|
||||
// collection to exist
|
||||
String collection = alias + "_" + datePart;
|
||||
waitCol(slices, collection);
|
||||
// and alias to be aware of collection
|
||||
long start = System.nanoTime(); // mumble mumble precommit mumble mumble...
|
||||
while (!haveCollection(alias, collection)) {
|
||||
if (NANOSECONDS.toSeconds(System.nanoTime() - start) > 10) {
|
||||
fail("took over 10 seconds after collection creation to update aliases");
|
||||
} else {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean haveCollection(String alias, String collection) {
|
||||
// separated into separate lines to make it easier to track down an NPE that occurred once
|
||||
// 3000 runs if it shows up again...
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
ZkStateReader zkStateReader = solrClient.getZkStateReader();
|
||||
Aliases aliases = zkStateReader.getAliases();
|
||||
Map<String, List<String>> collectionAliasListMap = aliases.getCollectionAliasListMap();
|
||||
List<String> strings = collectionAliasListMap.get(alias);
|
||||
return strings.contains(collection);
|
||||
}
|
||||
|
||||
private void waitCol(int slices, String collection) {
|
||||
waitForState("waiting for collections to be created", collection,
|
||||
(liveNodes, collectionState) -> {
|
||||
if (collectionState == null) {
|
||||
|
@ -585,15 +741,6 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
|||
int size = activeSlices.size();
|
||||
return size == slices;
|
||||
});
|
||||
// and alias to be aware of collection
|
||||
long start = System.nanoTime(); // mumble mumble precommit mumble mumble...
|
||||
while (!cluster.getSolrClient().getZkStateReader().getAliases().getCollectionAliasListMap().get(alias).contains(collection)) {
|
||||
if (NANOSECONDS.toSeconds(System.nanoTime() - start) > 10) {
|
||||
fail("took over 10 seconds after collection creation to update aliases");
|
||||
} else {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void testFailedDocument(Instant timestamp, String errorMsg) throws SolrServerException, IOException {
|
||||
|
|
Loading…
Reference in New Issue