From 0f1da2bc14ef1bb79e21558247ef3c72e802924e Mon Sep 17 00:00:00 2001 From: Gus Heck Date: Fri, 11 Jan 2019 15:00:35 -0500 Subject: [PATCH] SOLR-13051 improve TRA update processor test - remove need to Thread.sleep() - better async mechanism linked to SolrCore lifecycle - add some additional tests to be a bit more thorough --- .../java/org/apache/solr/core/SolrCore.java | 25 ++ .../TimeRoutedAliasUpdateProcessor.java | 35 +- .../TimeRoutedAliasUpdateProcessorTest.java | 377 ++++++++++++------ 3 files changed, 300 insertions(+), 137 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java index 95342c34ca3..142ccab0af6 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrCore.java +++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java @@ -260,6 +260,7 @@ public final class SolrCore implements SolrInfoBean, SolrMetricProducer, Closeab static int boolean_query_max_clause_count = Integer.MIN_VALUE; + private ExecutorService coreAsyncTaskExecutor = ExecutorUtil.newMDCAwareCachedThreadPool("Core Async Task"); /** * The SolrResourceLoader used to load all resources for this core. @@ -1532,6 +1533,8 @@ public final class SolrCore implements SolrInfoBean, SolrMetricProducer, Closeab } log.info("{} CLOSING SolrCore {}", logid, this); + ExecutorUtil.shutdownAndAwaitTermination(coreAsyncTaskExecutor); + // stop reporting metrics try { coreMetricManager.close(); @@ -3166,4 +3169,26 @@ public final class SolrCore implements SolrInfoBean, SolrMetricProducer, Closeab }); return blobRef; } + + /** + * Run an arbitrary task in it's own thread. This is an expert option and is + * a method you should use with great care. It would be bad to run something that never stopped + * or run something that took a very long time. Typically this is intended for actions that take + * a few seconds, and therefore would be bad to wait for within a request, but but would not pose + * a significant hindrance to server shut down times. It is not intended for long running tasks + * and if you are using a Runnable with a loop in it, you are almost certainly doing it wrong. + *

+ * WARNING: Solr wil not be able to shut down gracefully until this task completes! + *

+ * A significant upside of using this method vs creating your own ExecutorService is that your code + * does not have to properly shutdown executors which typically is risky from a unit testing + * perspective since the test framework will complain if you don't carefully ensure the executor + * shuts down before the end of the test. Also the threads running this task are sure to have + * a proper MDC for logging. + * + * @param r the task to run + */ + public void runAsync(Runnable r) { + coreAsyncTaskExecutor.submit(r); + } } diff --git a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java index cc1ddb893b9..c28ac44bb83 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java @@ -25,7 +25,6 @@ import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import org.apache.solr.cloud.ZkController; @@ -52,12 +51,10 @@ import org.apache.solr.update.DeleteUpdateCommand; import org.apache.solr.update.SolrCmdDistributor; import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase; import org.apache.solr.util.DateMathParser; -import org.apache.solr.util.DefaultSolrThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.commons.lang3.StringUtils.isNotBlank; -import static org.apache.solr.common.util.ExecutorUtil.newMDCAwareSingleThreadExecutor; import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM; import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.CreationType.ASYNC_PREEMPTIVE; @@ -97,8 +94,10 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor { private List> parsedCollectionsDesc; // k=timestamp (start), v=collection. Sorted descending private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc - // This will be updated out in async creation threads see preemptiveAsync(Runnable r) for details - private volatile ExecutorService preemptiveCreationExecutor; + // This class is created once per request and the overseer methods prevent duplicate create requests + // from creating extra copies. All we need to track here is that we don't spam preemptive creates to + // the overseer multiple times from *this* request. + private volatile boolean preemptiveCreateOnceAlready = false; public static UpdateRequestProcessor wrap(SolrQueryRequest req, UpdateRequestProcessor next) { //TODO get from "Collection property" @@ -215,7 +214,8 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor { // This next line blocks until all collections required by the current document have been created return createAllRequiredCollections(docTimestamp, cmd.getPrintableId(), candidateCollectionDesc); case ASYNC_PREEMPTIVE: - if (preemptiveCreationExecutor == null) { + if (!preemptiveCreateOnceAlready) { + log.info("EXECUTING preemptive creation for {}", timeRoutedAlias.getAliasName()); // It's important not to add code between here and the prior call to findCandidateGivenTimestamp() // in processAdd() that invokes updateParsedCollectionAliases(). Doing so would update parsedCollectionsDesc // and create a race condition. We are relying on the fact that get(0) is returning the head of the parsed @@ -242,18 +242,8 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor { } private void preemptiveAsync(Runnable r) { - // Note: creating an executor and throwing it away is slightly expensive, but this is only likely to happen - // once per hour/day/week (depending on time slice size for the TRA). If the executor were retained, it - // would need to be shut down in a close hook to avoid test failures due to thread leaks in tests which is slightly - // more complicated from a code maintenance and readability stand point. An executor must used instead of a - // thread to ensure we pick up the proper MDC logging stuff from ExecutorUtil. - DefaultSolrThreadFactory threadFactory = new DefaultSolrThreadFactory("TRA-preemptive-creation"); - preemptiveCreationExecutor = newMDCAwareSingleThreadExecutor(threadFactory); - preemptiveCreationExecutor.execute(() -> { - r.run(); - preemptiveCreationExecutor.shutdown(); - preemptiveCreationExecutor = null; - }); + preemptiveCreateOnceAlready = true; + req.getCore().runAsync(r); } /** @@ -479,9 +469,11 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor { return targetCollectionDesc.getValue(); // we don't need another collection case ASYNC_PREEMPTIVE: // can happen when preemptive interval is longer than one time slice - String mostRecentCollName = this.parsedCollectionsDesc.get(0).getValue(); - preemptiveAsync(() -> createNextCollection(mostRecentCollName)); - return targetCollectionDesc.getValue(); + if (!preemptiveCreateOnceAlready) { + String mostRecentCollName = this.parsedCollectionsDesc.get(0).getValue(); + preemptiveAsync(() -> createNextCollection(mostRecentCollName)); + return targetCollectionDesc.getValue(); + } case SYNCHRONOUS: createNextCollection(targetCollectionDesc.getValue()); // *should* throw if fails for some reason but... if (!updateParsedCollectionAliases()) { // thus we didn't make progress... @@ -495,7 +487,6 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor { break; default: throw unknownCreateType(); - } } while (true); } diff --git a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java index e592654a9f8..7eeff5a87b3 100644 --- a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java +++ b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java @@ -29,14 +29,16 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.stream.Collectors; import org.apache.lucene.util.IOUtils; -import org.apache.lucene.util.LuceneTestCase.AwaitsFix; +import org.apache.lucene.util.LuceneTestCase; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.embedded.JettySolrRunner; @@ -54,14 +56,17 @@ import org.apache.solr.cloud.api.collections.TimeRoutedAlias; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreDescriptor; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; @@ -75,13 +80,12 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// This feature has a leak -@AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-12801") +@LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-13059") public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final String configName = "timeConfig"; private static final String alias = "myalias"; + private static final String alias2 = "myalias2"; private static final String timeField = "timestamp_dt"; private static final String intField = "integer_i"; @@ -125,7 +129,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { .setMaxShardsPerNode(2) .withProperty(TimeRoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, alias) .process(solrClient); - + cluster.waitForActiveCollection(col23rd, 2, 4); List retrievedConfigSetNames = new ConfigSetAdminRequest.List().process(solrClient).getConfigSets(); @@ -228,6 +232,8 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { CollectionAdminRequest.createCollection(configName, configName, 1, 1).process(solrClient); + // TODO: fix SOLR-13059, a where this wait isn't working ~0.3% of the time. + waitCol(1,configName); // manipulate the config... checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config") .withMethod(SolrRequest.METHOD.POST) @@ -289,9 +295,9 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { assertUpdateResponse(solrClient.commit(alias)); // wait for all the collections to exist... - waitCol("2017-10-23", numShards); - waitCol("2017-10-24", numShards); - waitCol("2017-10-25", numShards); + waitColAndAlias("2017-10-23", numShards, alias); + waitColAndAlias("2017-10-24", numShards, alias); + waitColAndAlias("2017-10-25", numShards, alias); // at this point we now have 3 collections with 4 shards each, and 3 replicas per shard for a total of // 36 total replicas, 1/3 of which are leaders. We will add 3 docs and each has a 33% chance of hitting a @@ -347,75 +353,108 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { .setMaxShardsPerNode(numReplicas)).setPreemptiveCreateWindow("3HOUR") .process(solrClient); - // cause some collections to be created - assertUpdateResponse(solrClient.add(alias, - sdoc("id","1","timestamp_dt", "2017-10-25T00:00:00Z") - )); - assertUpdateResponse(solrClient.commit(alias)); + // needed to verify that preemptive creation in one alias doesn't inhibit preemptive creation in another + CollectionAdminRequest.createTimeRoutedAlias(alias2, "2017-10-23T00:00:00Z", "+1DAY", timeField, + CollectionAdminRequest.createCollection("_unused_", configName, numShards, numReplicas) + .setMaxShardsPerNode(numReplicas)).setPreemptiveCreateWindow("3HOUR") + .process(solrClient); - // wait for all the collections to exist... - waitCol("2017-10-23", numShards); // This one should have already existed from the alias creation - waitCol("2017-10-24", numShards); // Create 1 - waitCol("2017-10-25", numShards); // Create 2nd synchronously (ensure this is not broken) - - // normal update, nothing special, no collection creation required. - List cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias); - assertEquals(3,cols.size()); - - assertNumDocs("2017-10-23", 0); - assertNumDocs("2017-10-24", 0); - assertNumDocs("2017-10-25", 1); - - // cause some collections to be created + addOneDocSynchCreation(numShards, alias); + addOneDocSynchCreation(numShards, alias2); + List cols; ModifiableSolrParams params = params(); - // TIME SENSITIVE SECTION BEGINS + // Using threads to ensure that two TRA's are simultaneously preemptively creating and don't + // interfere with each other + ExecutorService executorService = ExecutorUtil.newMDCAwareCachedThreadPool("TimeRoutedAliasProcessorTestx-testPreemptiveCreation"); - // In this section we intentionally rely on timing of a race condition but the gap in collection creation time vs - // requesting the list of aliases and adding a single doc should be very large (1-2 seconds vs a few ms so we - // should always win the race) This is necessary because we are testing that we can guard against specific race - // conditions that happen while a collection is being created. To test this without timing sensitivity we would - // need a means to pass a semaphore to the server that it can use to delay collection creation + Exception[] threadExceptions = new Exception[2]; + boolean[] threadStarted = new boolean[2]; + boolean[] threadFinished = new boolean[2]; + try { + CountDownLatch starter = new CountDownLatch(1); + executorService.submit(() -> { + threadStarted[0] = true; + try { + starter.await(); + concurrentUpdates(params, alias); + } catch (Exception e) { + threadExceptions[0] = e; + } + threadFinished[0] = true; + }); + + executorService.submit(() -> { + threadStarted[1] = true; + try { + starter.await(); + concurrentUpdates(params, alias2); + } catch (Exception e) { + threadExceptions[1] = e; + } + threadFinished[1] = true; + }); + starter.countDown(); + } finally { + ExecutorUtil.shutdownAndAwaitTermination(executorService); + } + + // threads are known to be terminated by now, check for exceptions + for (Exception threadException : threadExceptions) { + if (threadException != null) { + Thread.sleep(5000); // avoid spurious fails due to TRA thread not done yet + //noinspection ThrowFromFinallyBlock + throw threadException; + } + } + + // just for confidence that there's nothing dodgy about how the threads executed. + assertTrue(threadStarted[0]); + assertTrue(threadStarted[1]); + assertTrue(threadFinished[0]); + assertTrue(threadFinished[1]); + + // if one of these times out then the test has failed due to interference between aliases + waitColAndAlias("2017-10-26", numShards, alias); + waitColAndAlias("2017-10-26", numShards, alias2); + + // after this we can ignore alias2 + checkPreemptiveCase1(alias); + checkPreemptiveCase1(alias2); + + // Some designs contemplated with close hooks were not properly restricted to the core and would have + // failed after other cores with other TRAs were stopped. Make sure that we don't fall into that trap in + // the future. The basic problem with a close hook solution is that one either winds up putting the + // executor on the TRAUP where it's duplicated/initiated for every request, or putting it at the class level + // in which case the hook will remove it for all TRA's which can pass a single TRA test nicely but is not safe + // where multiple TRA's might come and go. // - // This section must NOT gain any Thread.sleep() statements, nor should it gain any long running operations + // Start and stop some cores that have TRA's... 2x2 used to ensure every jetty gets at least one - assertUpdateResponse(add(alias, Arrays.asList( - sdoc("id", "2", "timestamp_dt", "2017-10-24T00:00:00Z"), - sdoc("id", "3", "timestamp_dt", "2017-10-25T00:00:00Z"), - sdoc("id", "4", "timestamp_dt", "2017-10-23T00:00:00Z"), - sdoc("id", "5", "timestamp_dt", "2017-10-25T23:00:00Z")), // should cause preemptive creation - params)); - assertUpdateResponse(solrClient.commit(alias)); + CollectionAdminRequest.createTimeRoutedAlias("foo", "2017-10-23T00:00:00Z", "+1DAY", timeField, + CollectionAdminRequest.createCollection("_unused_", configName, 2, 2) + .setMaxShardsPerNode(numReplicas)).setPreemptiveCreateWindow("3HOUR") + .process(solrClient); - cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias); - assertEquals(3, cols.size()); - assertTrue("Preemptive creation appears to not be asynchronous anymore",!cols.contains("myalias_2017-10-26")); - assertNumDocs("2017-10-23", 1); - assertNumDocs("2017-10-24", 1); - assertNumDocs("2017-10-25", 3); + waitColAndAlias("2017-10-23",2, "foo"); + waitCoreCount("foo_2017-10-23", 1); // prove this works, for confidence in deletion checking below. + assertUpdateResponse(solrClient.add("foo", + sdoc("id","1","timestamp_dt", "2017-10-23T00:00:00Z") // no extra collections should be created + )); + assertUpdateResponse(solrClient.commit("foo")); - // Here we quickly add another doc in a separate request, before the collection creation has completed. - // This has the potential to incorrectly cause preemptive collection creation to run twice and create a - // second collection. TimeRoutedAliasUpdateProcessor is meant to guard against this race condition. - assertUpdateResponse(add(alias, Collections.singletonList( - sdoc("id", "6", "timestamp_dt", "2017-10-25T23:01:00Z")), // might cause duplicate preemptive creation - params)); - assertUpdateResponse(solrClient.commit(alias)); + List foo = solrClient.getClusterStateProvider().resolveAlias("foo"); - // TIME SENSITIVE SECTION ENDS + CollectionAdminRequest.deleteAlias("foo").process(solrClient); - waitCol("2017-10-26", numShards); + for (String colName : foo) { + CollectionAdminRequest.deleteCollection(colName).process(solrClient); + waitCoreCount(colName, 0); + } - cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias); - assertTrue("Preemptive creation happened twice and created a collection " + - "further in the future than the configured time slice!",!cols.contains("myalias_2017-10-27")); - - assertEquals(4, cols.size()); - assertNumDocs("2017-10-23", 1); - assertNumDocs("2017-10-24", 1); - assertNumDocs("2017-10-25", 4); - assertNumDocs("2017-10-26", 0); + // if the design for terminating our executor is correct create/delete above will not cause failures below + // continue testing... // now test with pre-create window longer than time slice, and forcing multiple creations. CollectionAdminRequest.setAliasProperty(alias) @@ -425,31 +464,31 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { sdoc("id", "7", "timestamp_dt", "2017-10-25T23:01:00Z")), // should cause preemptive creation of 10-27 now params)); assertUpdateResponse(solrClient.commit(alias)); - waitCol("2017-10-27", numShards); + waitColAndAlias("2017-10-27", numShards, alias); cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias); assertEquals(5,cols.size()); // only one created in async case - assertNumDocs("2017-10-23", 1); - assertNumDocs("2017-10-24", 1); - assertNumDocs("2017-10-25", 5); - assertNumDocs("2017-10-26", 0); - assertNumDocs("2017-10-27", 0); + assertNumDocs("2017-10-23", 1, alias); + assertNumDocs("2017-10-24", 1, alias); + assertNumDocs("2017-10-25", 5, alias); + assertNumDocs("2017-10-26", 0, alias); + assertNumDocs("2017-10-27", 0, alias); assertUpdateResponse(add(alias, Collections.singletonList( sdoc("id", "8", "timestamp_dt", "2017-10-25T23:01:00Z")), // should cause preemptive creation of 10-28 now params)); assertUpdateResponse(solrClient.commit(alias)); - waitCol("2017-10-27", numShards); - waitCol("2017-10-28", numShards); + waitColAndAlias("2017-10-27", numShards, alias); + waitColAndAlias("2017-10-28", numShards, alias); cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias); assertEquals(6,cols.size()); // Subsequent documents continue to create up to limit - assertNumDocs("2017-10-23", 1); - assertNumDocs("2017-10-24", 1); - assertNumDocs("2017-10-25", 6); - assertNumDocs("2017-10-26", 0); - assertNumDocs("2017-10-27", 0); - assertNumDocs("2017-10-28", 0); + assertNumDocs("2017-10-23", 1, alias); + assertNumDocs("2017-10-24", 1, alias); + assertNumDocs("2017-10-25", 6, alias); + assertNumDocs("2017-10-26", 0, alias); + assertNumDocs("2017-10-27", 0, alias); + assertNumDocs("2017-10-28", 0, alias); QueryResponse resp; resp = solrClient.query(alias, params( @@ -472,17 +511,17 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { sdoc("id", "12", "timestamp_dt", "2017-10-28T23:03:00Z")), // should be ignored due to in progress creation params)); assertUpdateResponse(solrClient.commit(alias)); - waitCol("2017-10-29", numShards); + waitColAndAlias("2017-10-29", numShards, alias); cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias); assertEquals(7,cols.size()); - assertNumDocs("2017-10-23", 1); - assertNumDocs("2017-10-24", 1); - assertNumDocs("2017-10-25", 6); - assertNumDocs("2017-10-26", 0); - assertNumDocs("2017-10-27", 1); - assertNumDocs("2017-10-28", 3); // should get through even though preemptive creation ignored it. - assertNumDocs("2017-10-29", 0); + assertNumDocs("2017-10-23", 1, alias); + assertNumDocs("2017-10-24", 1, alias); + assertNumDocs("2017-10-25", 6, alias); + assertNumDocs("2017-10-26", 0, alias); + assertNumDocs("2017-10-27", 1, alias); + assertNumDocs("2017-10-28", 3, alias); // should get through even though preemptive creation ignored it. + assertNumDocs("2017-10-29", 0, alias); resp = solrClient.query(alias, params( "q", "*:*", @@ -494,20 +533,20 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { sdoc("id", "13", "timestamp_dt", "2017-10-30T23:03:00Z")), // lucky? params)); assertUpdateResponse(solrClient.commit(alias)); - waitCol("2017-10-30", numShards); - waitCol("2017-10-31", numShards); // spooky! async case arising in middle of sync creation!! + waitColAndAlias("2017-10-30", numShards, alias); + waitColAndAlias("2017-10-31", numShards, alias); // spooky! async case arising in middle of sync creation!! cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias); assertEquals(9,cols.size()); - assertNumDocs("2017-10-23", 1); - assertNumDocs("2017-10-24", 1); - assertNumDocs("2017-10-25", 6); - assertNumDocs("2017-10-26", 0); - assertNumDocs("2017-10-27", 1); - assertNumDocs("2017-10-28", 3); // should get through even though preemptive creation ignored it. - assertNumDocs("2017-10-29", 0); - assertNumDocs("2017-10-30", 1); - assertNumDocs("2017-10-31", 0); + assertNumDocs("2017-10-23", 1, alias); + assertNumDocs("2017-10-24", 1, alias); + assertNumDocs("2017-10-25", 6, alias); + assertNumDocs("2017-10-26", 0, alias); + assertNumDocs("2017-10-27", 1, alias); + assertNumDocs("2017-10-28", 3, alias); // should get through even though preemptive creation ignored it. + assertNumDocs("2017-10-29", 0, alias); + assertNumDocs("2017-10-30", 1, alias); + assertNumDocs("2017-10-31", 0, alias); resp = solrClient.query(alias, params( "q", "*:*", @@ -517,17 +556,17 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { assertUpdateResponse(add(alias, Collections.singletonList( sdoc("id", "14", "timestamp_dt", "2017-10-31T23:01:00Z")), // should cause preemptive creation 11-01 params)); - waitCol("2017-11-01", numShards); + waitColAndAlias("2017-11-01", numShards, alias); assertUpdateResponse(add(alias, Collections.singletonList( sdoc("id", "15", "timestamp_dt", "2017-10-31T23:01:00Z")), // should cause preemptive creation 11-02 params)); - waitCol("2017-11-02", numShards); + waitColAndAlias("2017-11-02", numShards, alias); assertUpdateResponse(add(alias, Collections.singletonList( sdoc("id", "16", "timestamp_dt", "2017-10-31T23:01:00Z")), // should cause preemptive creation 11-03 params)); - waitCol("2017-11-03", numShards); + waitColAndAlias("2017-11-03", numShards, alias); assertUpdateResponse(add(alias, Collections.singletonList( sdoc("id", "17", "timestamp_dt", "2017-10-31T23:01:00Z")), // should NOT cause preemptive creation 11-04 @@ -539,13 +578,106 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { assertUpdateResponse(add(alias, Collections.singletonList( sdoc("id", "18", "timestamp_dt", "2017-11-01T23:01:00Z")), // should cause preemptive creation 11-04 params)); - waitCol("2017-11-04",numShards); - - Thread.sleep(2000); // allow the executor used in preemptive creation time to shut down. + waitColAndAlias("2017-11-04",numShards, alias); } - private void assertNumDocs(final String datePart, int expected) throws SolrServerException, IOException { + // used to verify a core has been deleted (count = 0) + private void waitCoreCount(String collection, int count) { + long start = System.nanoTime(); + CoreContainer coreContainer = cluster.getRandomJetty(random()).getCoreContainer(); + int coreFooCount; + do { + coreFooCount = 0; + List coreDescriptors = coreContainer.getCoreDescriptors(); + for (CoreDescriptor coreDescriptor : coreDescriptors) { + String collectionName = coreDescriptor.getCollectionName(); + if (collection.equals(collectionName)) { + coreFooCount ++; + } + } + if (NANOSECONDS.toSeconds(System.nanoTime() - start) > 10) { + fail("took over 10 seconds after collection creation to update aliases"); + } else { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + } while(coreFooCount != count); + } + + private void concurrentUpdates(ModifiableSolrParams params, String alias) throws SolrServerException, IOException { + // In this method we intentionally rely on timing of a race condition but the gap in collection creation time vs + // requesting the list of aliases and adding a single doc should be very large (1-2 seconds vs a few ms so we + // should always win the race) This is necessary because we are testing that we can guard against specific race + // conditions that happen while a collection is being created. To test this without timing sensitivity we would + // need a means to pass a semaphore to the server that it can use to delay collection creation + // + // This method must NOT gain any Thread.sleep() statements, nor should it gain any long running operations + assertUpdateResponse(add(alias, Arrays.asList( + sdoc("id", "2", "timestamp_dt", "2017-10-24T00:00:00Z"), + sdoc("id", "3", "timestamp_dt", "2017-10-25T00:00:00Z"), + sdoc("id", "4", "timestamp_dt", "2017-10-23T00:00:00Z"), + sdoc("id", "5", "timestamp_dt", "2017-10-25T23:00:00Z")), // should cause preemptive creation + params)); + assertUpdateResponse(solrClient.commit(alias)); + + List colsT1; + colsT1 = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias); + assertEquals(3, colsT1.size()); + assertTrue("Preemptive creation appears to not be asynchronous anymore", !colsT1.contains("myalias_2017-10-26")); + assertNumDocs("2017-10-23", 1, alias); + assertNumDocs("2017-10-24", 1, alias); + assertNumDocs("2017-10-25", 3, alias); + + // Here we quickly add another doc in a separate request, before the collection creation has completed. + // This has the potential to incorrectly cause preemptive collection creation to run twice and create a + // second collection. TimeRoutedAliasUpdateProcessor is meant to guard against this race condition. + assertUpdateResponse(add(alias, Collections.singletonList( + sdoc("id", "6", "timestamp_dt", "2017-10-25T23:01:00Z")), // might cause duplicate preemptive creation + params)); + assertUpdateResponse(solrClient.commit(alias)); + } + + private void checkPreemptiveCase1(String alias) throws SolrServerException, IOException { + List cols; + cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias); + assertTrue("Preemptive creation happened twice and created a collection " + + "further in the future than the configured time slice!",!cols.contains("myalias_2017-10-27")); + + assertEquals(4, cols.size()); + assertNumDocs("2017-10-23", 1, alias); + assertNumDocs("2017-10-24", 1, alias); + assertNumDocs("2017-10-25", 4, alias); + assertNumDocs("2017-10-26", 0, alias); + } + + private void addOneDocSynchCreation(int numShards, String alias) throws SolrServerException, IOException, InterruptedException { + // cause some collections to be created + assertUpdateResponse(solrClient.add(alias, + sdoc("id","1","timestamp_dt", "2017-10-25T00:00:00Z") + )); + assertUpdateResponse(solrClient.commit(alias)); + + // wait for all the collections to exist... + waitColAndAlias("2017-10-23", numShards, alias); // This one should have already existed from the alias creation + waitColAndAlias("2017-10-24", numShards, alias); // Create 1 + waitColAndAlias("2017-10-25", numShards, alias); // Create 2nd synchronously (ensure this is not broken) + + // normal update, nothing special, no collection creation required. + List cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias); + assertEquals(3,cols.size()); + + assertNumDocs("2017-10-23", 0, alias); + assertNumDocs("2017-10-24", 0, alias); + assertNumDocs("2017-10-25", 1, alias); + } + + private void assertNumDocs(final String datePart, int expected, String alias) throws SolrServerException, IOException { QueryResponse resp = solrClient.query(alias + "_" + datePart, params( "q", "*:*", "rows", "10")); @@ -572,9 +704,33 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { return leaders; } - private void waitCol(final String datePart, int slices) throws InterruptedException { + private void waitColAndAlias(final String datePart, int slices, String alias) throws InterruptedException { // collection to exist String collection = alias + "_" + datePart; + waitCol(slices, collection); + // and alias to be aware of collection + long start = System.nanoTime(); // mumble mumble precommit mumble mumble... + while (!haveCollection(alias, collection)) { + if (NANOSECONDS.toSeconds(System.nanoTime() - start) > 10) { + fail("took over 10 seconds after collection creation to update aliases"); + } else { + Thread.sleep(500); + } + } + } + + private boolean haveCollection(String alias, String collection) { + // separated into separate lines to make it easier to track down an NPE that occurred once + // 3000 runs if it shows up again... + CloudSolrClient solrClient = cluster.getSolrClient(); + ZkStateReader zkStateReader = solrClient.getZkStateReader(); + Aliases aliases = zkStateReader.getAliases(); + Map> collectionAliasListMap = aliases.getCollectionAliasListMap(); + List strings = collectionAliasListMap.get(alias); + return strings.contains(collection); + } + + private void waitCol(int slices, String collection) { waitForState("waiting for collections to be created", collection, (liveNodes, collectionState) -> { if (collectionState == null) { @@ -585,15 +741,6 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { int size = activeSlices.size(); return size == slices; }); - // and alias to be aware of collection - long start = System.nanoTime(); // mumble mumble precommit mumble mumble... - while (!cluster.getSolrClient().getZkStateReader().getAliases().getCollectionAliasListMap().get(alias).contains(collection)) { - if (NANOSECONDS.toSeconds(System.nanoTime() - start) > 10) { - fail("took over 10 seconds after collection creation to update aliases"); - } else { - Thread.sleep(500); - } - } } private void testFailedDocument(Instant timestamp, String errorMsg) throws SolrServerException, IOException {