From 925733d1ef3ac6fbabc450804511c65a4c6424ac Mon Sep 17 00:00:00 2001 From: David Smiley Date: Fri, 5 Jan 2018 13:53:26 -0500 Subject: [PATCH] SOLR-11653: TimeRoutedAlias URP now auto-creates collections using new RoutedAliasCreateCollectionCmd --- solr/CHANGES.txt | 4 + .../java/org/apache/solr/cloud/Overseer.java | 4 + .../OverseerCollectionMessageHandler.java | 3 +- .../solr/cloud/OverseerTaskProcessor.java | 7 +- .../cloud/RoutedAliasCreateCollectionCmd.java | 182 ++++++++++++++ .../handler/admin/CollectionsHandler.java | 16 +- .../apache/solr/request/SolrRequestInfo.java | 12 +- .../TimeRoutedAliasUpdateProcessor.java | 228 ++++++++++++++++-- .../org/apache/solr/util/TimeZoneUtils.java | 18 ++ .../TimeRoutedAliasUpdateProcessorTest.java | 140 ++++++++--- .../solr/common/params/CollectionParams.java | 1 + 11 files changed, 539 insertions(+), 76 deletions(-) create mode 100644 solr/core/src/java/org/apache/solr/cloud/RoutedAliasCreateCollectionCmd.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 61551e0983f..221d6adb1c1 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -74,6 +74,10 @@ New Features * SOLR-11201: Implement autoscaling trigger for arbitrary metrics that creates events when a given metric breaches a threshold (shalin) +* SOLR-11653: TimeRoutedAlias URP now auto-creates new collections on the fly according to alias metadata + rules that sets the time interval for each collection. An internal Overseer command "ROUTEDALIAS_CREATECOLL" + was created to facilitate this. (David Smiley) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index d1bb13a12b3..3b9dd28f3b6 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -39,6 +39,7 @@ import org.apache.solr.cloud.overseer.SliceMutator; import org.apache.solr.cloud.overseer.ZkStateWriter; import org.apache.solr.cloud.overseer.ZkWriteCommand; import org.apache.solr.common.SolrCloseable; +import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkNodeProps; @@ -267,6 +268,9 @@ public class Overseer implements SolrCloseable { private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter, boolean enableBatching, ZkStateWriter.ZkWriteCallback callback) throws Exception { final String operation = message.getStr(QUEUE_OPERATION); + if (operation == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Message missing " + QUEUE_OPERATION + ":" + message); + } List zkWriteCommands = null; final Timer.Context timerContext = stats.time(operation); try { diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java index abfecab88fa..426c8796074 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java @@ -219,6 +219,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, .put(DELETE, new DeleteCollectionCmd(this)) .put(CREATEALIAS, new CreateAliasCmd(this)) .put(DELETEALIAS, new DeleteAliasCmd(this)) + .put(ROUTEDALIAS_CREATECOLL, new RoutedAliasCreateCollectionCmd(this)) .put(OVERSEERSTATUS, new OverseerStatusCmd(this)) .put(DELETESHARD, new DeleteShardCmd(this)) .put(DELETEREPLICA, new DeleteReplicaCmd(this)) @@ -232,7 +233,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, @Override @SuppressWarnings("unchecked") public SolrResponse processMessage(ZkNodeProps message, String operation) { - log.debug("OverseerCollectionMessageHandler.processMessage : "+ operation + " , "+ message.toString()); + log.debug("OverseerCollectionMessageHandler.processMessage : {} , {}", operation, message); NamedList results = new NamedList(); try { diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java index d014fc47715..86e356497a1 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java @@ -257,7 +257,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable { } if (runningZKTasks.contains(head.getId())) continue; final ZkNodeProps message = ZkNodeProps.load(head.getBytes()); - OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message); final String asyncId = message.getStr(ASYNC); if (hasLeftOverItems) { if (head.getId().equals(oldestItemInWorkQueue)) @@ -269,6 +268,12 @@ public class OverseerTaskProcessor implements Runnable, Closeable { } } String operation = message.getStr(Overseer.QUEUE_OPERATION); + if (operation == null) { + log.error("Msg does not have required " + Overseer.QUEUE_OPERATION + ": {}", message); + workQueue.remove(head); + continue; + } + OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message); OverseerMessageHandler.Lock lock = messageHandler.lockTask(message, taskBatch); if (lock == null) { log.debug("Exclusivity check failed for [{}]", message.toString()); diff --git a/solr/core/src/java/org/apache/solr/cloud/RoutedAliasCreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/RoutedAliasCreateCollectionCmd.java new file mode 100644 index 00000000000..607588c4fde --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/RoutedAliasCreateCollectionCmd.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud; + +import java.lang.invoke.MethodHandles; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; + +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.Aliases; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.handler.admin.CollectionsHandler; +import org.apache.solr.request.LocalSolrQueryRequest; +import org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor; +import org.apache.solr.util.TimeZoneUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF; +import static org.apache.solr.common.params.CommonParams.NAME; +import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.ROUTER_FIELD_METADATA; +import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.ROUTER_INTERVAL_METADATA; + +/** + * For "routed aliases", creates another collection and adds it to the alias. In some cases it will not + * add a new collection. + * If a collection is created, then collection creation info is returned. + * + * Note: this logic is within an Overseer because we want to leverage the mutual exclusion + * property afforded by the lock it obtains on the alias name. + * @since 7.3 + */ +public class RoutedAliasCreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public static final String IF_MOST_RECENT_COLL_NAME = "ifMostRecentCollName"; + + public static final String COLL_METAPREFIX = "collection-create."; + + private final OverseerCollectionMessageHandler ocmh; + + public RoutedAliasCreateCollectionCmd(OverseerCollectionMessageHandler ocmh) { + this.ocmh = ocmh; + } + + /* TODO: + There are a few classes related to time routed alias processing. We need to share some logic better. + */ + + + @Override + public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception { + //---- PARSE PRIMARY MESSAGE PARAMS + // important that we use NAME for the alias as that is what the Overseer will get a lock on before calling us + final String aliasName = message.getStr(NAME); + // the client believes this is the mostRecent collection name. We assert this if provided. + final String ifMostRecentCollName = message.getStr(IF_MOST_RECENT_COLL_NAME); // optional + + // TODO collection param (or intervalDateMath override?), useful for data capped collections + + //---- PARSE ALIAS INFO FROM ZK + final ZkStateReader.AliasesManager aliasesHolder = ocmh.zkStateReader.aliasesHolder; + final Aliases aliases = aliasesHolder.getAliases(); + final Map aliasMetadata = aliases.getCollectionAliasMetadata(aliasName); + if (aliasMetadata == null) { + throw newAliasMustExistException(aliasName); // if it did exist, we'd have a non-null map + } + + String routeField = aliasMetadata.get(ROUTER_FIELD_METADATA); + if (routeField == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "This command only works on time routed aliases. Expected alias metadata not found."); + } + String intervalDateMath = aliasMetadata.getOrDefault(ROUTER_INTERVAL_METADATA, "+1DAY"); + TimeZone intervalTimeZone = TimeZoneUtils.parseTimezone(aliasMetadata.get(CommonParams.TZ)); + + //TODO this is ugly; how can we organize the code related to this feature better? + final List> parsedCollections = + TimeRoutedAliasUpdateProcessor.parseCollections(aliasName, aliases, () -> newAliasMustExistException(aliasName)); + + //---- GET MOST RECENT COLL + final Map.Entry mostRecentEntry = parsedCollections.get(0); + final Instant mostRecentCollTimestamp = mostRecentEntry.getKey(); + final String mostRecentCollName = mostRecentEntry.getValue(); + if (ifMostRecentCollName != null) { + if (!mostRecentCollName.equals(ifMostRecentCollName)) { + // Possibly due to race conditions in URPs on multiple leaders calling us at the same time + String msg = IF_MOST_RECENT_COLL_NAME + " expected " + ifMostRecentCollName + " but it's " + mostRecentCollName; + if (parsedCollections.stream().map(Map.Entry::getValue).noneMatch(ifMostRecentCollName::equals)) { + msg += ". Furthermore this collection isn't in the list of collections referenced by the alias."; + } + log.info(msg); + results.add("message", msg); + return; + } + } else if (mostRecentCollTimestamp.isAfter(Instant.now())) { + final String msg = "Most recent collection is in the future, so we won't create another."; + log.info(msg); + results.add("message", msg); + return; + } + + //---- COMPUTE NEXT COLLECTION NAME + final Instant nextCollTimestamp = TimeRoutedAliasUpdateProcessor.computeNextCollTimestamp(mostRecentCollTimestamp, intervalDateMath, intervalTimeZone); + assert nextCollTimestamp.isAfter(mostRecentCollTimestamp); + final String createCollName = TimeRoutedAliasUpdateProcessor.formatCollectionNameFromInstant(aliasName, nextCollTimestamp); + + //---- CREATE THE COLLECTION + // Map alias metadata starting with a prefix to a create-collection API request + final ModifiableSolrParams createReqParams = new ModifiableSolrParams(); + for (Map.Entry e : aliasMetadata.entrySet()) { + if (e.getKey().startsWith(COLL_METAPREFIX)) { + createReqParams.set(e.getKey().substring(COLL_METAPREFIX.length()), e.getValue()); + } + } + if (createReqParams.get(COLL_CONF) == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "We require an explicit " + COLL_CONF ); + } + createReqParams.set(NAME, createCollName); + createReqParams.set("property." + TimeRoutedAliasUpdateProcessor.TIME_PARTITION_ALIAS_NAME_CORE_PROP, aliasName); + // a CollectionOperation reads params and produces a message (Map) that is supposed to be sent to the Overseer. + // Although we could create the Map without it, there are a fair amount of rules we don't want to reproduce. + final Map createMsgMap = CollectionsHandler.CollectionOperation.CREATE_OP.execute( + new LocalSolrQueryRequest(null, createReqParams), + null, + ocmh.overseer.getCoreContainer().getCollectionsHandler()); + createMsgMap.put(Overseer.QUEUE_OPERATION, "create"); + // Since we are running in the Overseer here, send the message directly to the Overseer CreateCollectionCmd + ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, new ZkNodeProps(createMsgMap), results); + + CollectionsHandler.waitForActiveCollection(createCollName, null, ocmh.overseer.getCoreContainer(), new OverseerSolrResponse(results)); + + //TODO delete some of the oldest collection(s) ? + + //---- UPDATE THE ALIAS + aliasesHolder.applyModificationAndExportToZk(curAliases -> { + final List curTargetCollections = curAliases.getCollectionAliasListMap().get(aliasName); + if (curTargetCollections.contains(createCollName)) { + return curAliases; + } else { + List newTargetCollections = new ArrayList<>(curTargetCollections.size() + 1); + // prepend it on purpose (thus reverse sorted). Solr alias resolution defaults to the first collection in a list + newTargetCollections.add(createCollName); + newTargetCollections.addAll(curTargetCollections); + return curAliases.cloneWithCollectionAlias(aliasName, StrUtils.join(newTargetCollections, ',')); + } + }); + + } + + private SolrException newAliasMustExistException(String aliasName) { + return new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Alias " + aliasName + " does not exist."); + } + +} diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index d339f27de3b..74d47647eef 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -260,16 +260,19 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission public static long DEFAULT_COLLECTION_OP_TIMEOUT = 180*1000; - void handleResponse(String operation, ZkNodeProps m, + //TODO rename to submitToOverseerRPC + public void handleResponse(String operation, ZkNodeProps m, SolrQueryResponse rsp) throws KeeperException, InterruptedException { handleResponse(operation, m, rsp, DEFAULT_COLLECTION_OP_TIMEOUT); } - private SolrResponse handleResponse(String operation, ZkNodeProps m, + //TODO rename to submitToOverseerRPC + public SolrResponse handleResponse(String operation, ZkNodeProps m, SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException { - long time = System.nanoTime(); - - if (m.containsKey(ASYNC) && m.get(ASYNC) != null) { + if (!m.containsKey(QUEUE_OPERATION)) { + throw new SolrException(ErrorCode.BAD_REQUEST, "missing key " + QUEUE_OPERATION); + } + if (m.get(ASYNC) != null) { String asyncId = m.getStr(ASYNC); @@ -297,6 +300,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission return response; } + long time = System.nanoTime(); QueueEvent event = coreContainer.getZkController() .getOverseerCollectionQueue() .offer(Utils.toJSON(m), timeout); @@ -1031,7 +1035,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission } } - private static void waitForActiveCollection(String collectionName, ZkNodeProps message, CoreContainer cc, SolrResponse response) + public static void waitForActiveCollection(String collectionName, ZkNodeProps message, CoreContainer cc, SolrResponse response) throws KeeperException, InterruptedException { if (response.getResponse().get("exception") != null) { diff --git a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java index f759c9174a8..f1a718dd5f6 100644 --- a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java +++ b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java @@ -101,17 +101,9 @@ public class SolrRequestInfo { } /** The TimeZone specified by the request, or null if none was specified */ - public TimeZone getClientTimeZone() { - + public TimeZone getClientTimeZone() { if (tz == null) { - String tzStr = req.getParams().get(CommonParams.TZ); - if (tzStr != null) { - tz = TimeZoneUtils.getTimeZone(tzStr); - if (null == tz) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, - "Solr JVM does not support TZ: " + tzStr); - } - } + tz = TimeZoneUtils.parseTimezone(req.getParams().get(CommonParams.TZ)); } return tz; } diff --git a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java index 91489125670..bc242ba7bfd 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java @@ -19,6 +19,7 @@ package org.apache.solr.update.processor; import java.io.IOException; import java.lang.invoke.MethodHandles; +import java.text.ParseException; import java.time.Instant; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; @@ -29,22 +30,34 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.TimeZone; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.solr.cloud.Overseer; +import org.apache.solr.cloud.RoutedAliasCreateCollectionCmd; import org.apache.solr.cloud.ZkController; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkCoreNodeProps; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.UpdateParams; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.SolrCore; +import org.apache.solr.handler.admin.CollectionsHandler; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.update.AddUpdateCommand; @@ -52,14 +65,18 @@ import org.apache.solr.update.CommitUpdateCommand; import org.apache.solr.update.DeleteUpdateCommand; import org.apache.solr.update.SolrCmdDistributor; import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase; +import org.apache.solr.util.DateMathParser; +import org.apache.solr.util.TimeZoneUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.solr.handler.admin.CollectionsHandler.DEFAULT_COLLECTION_OP_TIMEOUT; import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM; import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; /** - * Distributes update requests to rolling series of collections partitioned by a timestamp field. + * Distributes update requests to a rolling series of collections partitioned by a timestamp field. Issues + * requests to create new collections on-demand. * * Depends on this core having a special core property that points to the alias name that this collection is a part of. * And further requires certain metadata on the Alias. @@ -69,16 +86,15 @@ import static org.apache.solr.update.processor.DistributingUpdateProcessorFactor public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor { //TODO do we make this more generic to others who want to partition collections using something else? - // TODO auto add new collection partitions when cross a timestamp boundary. That needs to be coordinated to avoid - // race conditions, remembering that even the lead collection might have multiple instances of this URP - // (multiple shards or perhaps just multiple streams thus instances of this URP) - public static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param public static final String TIME_PARTITION_ALIAS_NAME_CORE_PROP = "timePartitionAliasName"; // core prop - public static final String ROUTER_FIELD_METADATA = "router.field"; // alias metadata + // alias metadata: + public static final String ROUTER_FIELD_METADATA = "router.field"; + public static final String ROUTER_MAX_FUTURE_TIME_METADATA = "router.maxFutureMs"; + public static final String ROUTER_INTERVAL_METADATA = "router.interval"; // This format must be compatible with collection name limitations - private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder() + public static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder() .append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("[_HH[_mm[_ss]]]") //brackets mean optional .parseDefaulting(ChronoField.HOUR_OF_DAY, 0) .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0) @@ -87,18 +103,26 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + // used to limit unnecessary concurrent collection creation requests + private static ConcurrentHashMap aliasToSemaphoreMap = new ConcurrentHashMap<>(4); + private final String thisCollection; private final String aliasName; private final String routeField; + private final long maxFutureMs; + private final String intervalDateMath; + private final TimeZone intervalTimeZone; - private final SolrCmdDistributor cmdDistrib; private final ZkController zkController; + private final SolrCmdDistributor cmdDistrib; + private final CollectionsHandler collHandler; private final SolrParams outParamsToLeader; private List> parsedCollectionsDesc; // k=timestamp (start), v=collection. Sorted descending private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc public static UpdateRequestProcessor wrap(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { + //TODO get from "Collection property" final String timePartitionAliasName = req.getCore().getCoreDescriptor() .getCoreProperty(TIME_PARTITION_ALIAS_NAME_CORE_PROP, null); final DistribPhase shardDistribPhase = @@ -126,12 +150,21 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor { CoreContainer cc = core.getCoreContainer(); zkController = cc.getZkController(); cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler()); + collHandler = cc.getCollectionsHandler(); final Map aliasMetadata = zkController.getZkStateReader().getAliases().getCollectionAliasMetadata(aliasName); if (aliasMetadata == null) { throw newAliasMustExistException(); // if it did exist, we'd have a non-null map } routeField = aliasMetadata.get(ROUTER_FIELD_METADATA); + intervalDateMath = aliasMetadata.getOrDefault(ROUTER_INTERVAL_METADATA, "+1DAY"); + String futureTimeStr = aliasMetadata.get(ROUTER_MAX_FUTURE_TIME_METADATA); + if (futureTimeStr != null) { + maxFutureMs = Long.parseLong(futureTimeStr); + } else { + maxFutureMs = TimeUnit.MINUTES.toMillis(10); + } + intervalTimeZone = TimeZoneUtils.parseTimezone(aliasMetadata.get(CommonParams.TZ)); ModifiableSolrParams outParams = new ModifiableSolrParams(req.getParams()); // Don't distribute these params; they will be distributed from the local processCommit separately. @@ -153,11 +186,59 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor { @Override public void processAdd(AddUpdateCommand cmd) throws IOException { final Object routeValue = cmd.getSolrInputDocument().getFieldValue(routeField); - final String targetCollection = findTargetCollectionGivenRouteKey(routeValue); - if (targetCollection == null) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, - "Doc " + cmd.getPrintableId() + " couldn't be routed with " + routeField + "=" + routeValue); - } + final Instant routeTimestamp = parseRouteKey(routeValue); + + updateParsedCollectionAliases(); + String targetCollection; + do { + targetCollection = findTargetCollectionGivenTimestamp(routeTimestamp); + + if (targetCollection == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Doc " + cmd.getPrintableId() + " couldn't be routed with " + routeField + "=" + routeTimestamp); + } + + // Note: the following rule is tempting but not necessary and is not compatible with + // only using this URP when the alias distrib phase is NONE; otherwise a doc may be routed to from a non-recent + // collection to the most recent only to then go there directly instead of realizing a new collection is needed. + // // If it's going to some other collection (not "this") then break to just send it there + // if (!thisCollection.equals(targetCollection)) { + // break; + // } + // Also tempting but not compatible: check that we're the leader, if not then break + + // If the doc goes to the most recent collection then do some checks below, otherwise break the loop. + final Instant mostRecentCollTimestamp = parsedCollectionsDesc.get(0).getKey(); + final String mostRecentCollName = parsedCollectionsDesc.get(0).getValue(); + if (!mostRecentCollName.equals(targetCollection)) { + break; + } + + // Check the doc isn't too far in the future + final Instant maxFutureTime = Instant.now().plusMillis(maxFutureMs); + if (routeTimestamp.isAfter(maxFutureTime)) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "The document's time routed key of " + routeValue + " is too far in the future given " + + ROUTER_MAX_FUTURE_TIME_METADATA + "=" + maxFutureMs); + } + + // Create a new collection? + final Instant nextCollTimestamp = computeNextCollTimestamp(mostRecentCollTimestamp, intervalDateMath, intervalTimeZone); + if (routeTimestamp.isBefore(nextCollTimestamp)) { + break; // thus we don't need another collection + } + + createCollectionAfter(mostRecentCollName); // *should* throw if fails for some reason but... + final boolean updated = updateParsedCollectionAliases(); + if (!updated) { // thus we didn't make progress... + // this is not expected, even in known failure cases, but we check just in case + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "We need to create a new time routed collection but for unknown reasons were unable to do so."); + } + // then retry the loop ... + } while(true); + assert targetCollection != null; + if (thisCollection.equals(targetCollection)) { // pass on through; we've reached the right collection super.processAdd(cmd); @@ -168,7 +249,23 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor { } } - protected String findTargetCollectionGivenRouteKey(Object routeKey) { + /** Computes the timestamp of the next collection given the timestamp of the one before. */ + public static Instant computeNextCollTimestamp(Instant fromTimestamp, String intervalDateMath, TimeZone intervalTimeZone) { + //TODO overload DateMathParser.parseMath to take tz and "now" + final DateMathParser dateMathParser = new DateMathParser(intervalTimeZone); + dateMathParser.setNow(Date.from(fromTimestamp)); + final Instant nextCollTimestamp; + try { + nextCollTimestamp = dateMathParser.parseMath(intervalDateMath).toInstant(); + } catch (ParseException e) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Invalid Date Math String:'" + intervalDateMath +'\'', e); + } + assert nextCollTimestamp.isAfter(fromTimestamp); + return nextCollTimestamp; + } + + private Instant parseRouteKey(Object routeKey) { final Instant docTimestamp; if (routeKey instanceof Instant) { docTimestamp = (Instant) routeKey; @@ -179,15 +276,30 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor { } else { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected type of routeKey: " + routeKey); } + return docTimestamp; + } + + /** + * Ensure {@link #parsedCollectionsAliases} is up to date. If it was modified, return true. + * Note that this will return true if some other alias was modified or if metadata was modified. These + * are spurious and the caller should be written to be tolerant of no material changes. + */ + private boolean updateParsedCollectionAliases() { final Aliases aliases = zkController.getZkStateReader().getAliases(); // note: might be different from last request if (this.parsedCollectionsAliases != aliases) { if (this.parsedCollectionsAliases != null) { - log.info("Observing possibly updated alias {}", aliasName); + log.debug("Observing possibly updated alias: {}", aliasName); } - this.parsedCollectionsDesc = doParseCollections(aliases); + this.parsedCollectionsDesc = parseCollections(aliasName, aliases, this::newAliasMustExistException); this.parsedCollectionsAliases = aliases; + return true; } - // iterates in reverse chronological order + return false; + } + + /** Given the route key, finds the collection. Returns null if too old to go in last one. */ + private String findTargetCollectionGivenTimestamp(Instant docTimestamp) { + // Lookup targetCollection given route key. Iterates in reverse chronological order. // We're O(N) here but N should be small, the loop is fast, and usually looking for 1st. for (Map.Entry entry : parsedCollectionsDesc) { Instant colStartTime = entry.getKey(); @@ -195,16 +307,77 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor { return entry.getValue(); //found it } } - return null; + return null; //not found } - /** Parses the timestamp from the collection list and returns them in reverse sorted order (newest 1st) */ - private List> doParseCollections(Aliases aliases) { + private void createCollectionAfter(String mostRecentCollName) { + // Invoke ROUTEDALIAS_CREATECOLL (in the Overseer, locked by alias name). It will create the collection + // and update the alias contingent on the most recent collection name being the same as + // what we think so here, otherwise it will return (without error). + // To avoid needless concurrent communication with the Overseer from this JVM, we + // maintain a Semaphore from an alias name keyed ConcurrentHashMap. + // Alternatively a Lock or CountDownLatch could have been used but they didn't seem + // to make it any easier. + + final Semaphore semaphore = aliasToSemaphoreMap.computeIfAbsent(aliasName, n -> new Semaphore(1)); + if (semaphore.tryAcquire()) { + try { + final String operation = CollectionParams.CollectionAction.ROUTEDALIAS_CREATECOLL.toLower(); + Map msg = new HashMap<>(); + msg.put(Overseer.QUEUE_OPERATION, operation); + msg.put(CollectionParams.NAME, aliasName); + msg.put(RoutedAliasCreateCollectionCmd.IF_MOST_RECENT_COLL_NAME, mostRecentCollName); + SolrQueryResponse rsp = new SolrQueryResponse(); + try { + this.collHandler.handleResponse( + operation, + new ZkNodeProps(msg), + rsp); + if (rsp.getException() != null) { + throw rsp.getException(); + } // otherwise don't care about the response. It's possible no collection was created because + // of a race and that's okay... we'll ultimately retry any way. + + // Ensure our view of the aliases has updated. If we didn't do this, our zkStateReader might + // not yet know about the new alias (thus won't see the newly added collection to it), and we might think + // we failed. + zkController.getZkStateReader().aliasesHolder.update(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + } finally { + semaphore.release(); // to signal we're done to anyone waiting on it + } + + } else { + // Failed to acquire permit because another URP instance on this JVM is creating a collection. + // So wait till it's available + log.debug("Collection creation is already in progress so we'll wait then try again."); + try { + if (semaphore.tryAcquire(DEFAULT_COLLECTION_OP_TIMEOUT, TimeUnit.MILLISECONDS)) { + semaphore.release(); // we don't actually want a permit so give it back + // return to continue... + } else { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "Waited too long for another update thread to be done with collection creation."); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "Interrupted waiting on collection creation.", e); // if we were interrupted, give up. + } + } + } + + /** Parses the timestamp from the collection list and returns them in reverse sorted order (most recent 1st) */ + public static List> parseCollections(String aliasName, Aliases aliases, Supplier aliasNotExist) { final List collections = aliases.getCollectionAliasListMap().get(aliasName); if (collections == null) { - throw newAliasMustExistException(); + throw aliasNotExist.get(); } - // note: I considered TreeMap but didn't like the log(N) just to grab the head when we use it later + // note: I considered TreeMap but didn't like the log(N) just to grab the most recent when we use it later List> result = new ArrayList<>(collections.size()); for (String collection : collections) { Instant colStartTime = parseInstantFromCollectionName(aliasName, collection); @@ -225,6 +398,17 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor { return DATE_TIME_FORMATTER.parse(dateTimePart, Instant::from); } + public static String formatCollectionNameFromInstant(String aliasName, Instant timestamp) { + String nextCollName = TimeRoutedAliasUpdateProcessor.DATE_TIME_FORMATTER.format(timestamp); + for (int i = 0; i < 3; i++) { // chop off seconds, minutes, hours + if (nextCollName.endsWith("_00")) { + nextCollName = nextCollName.substring(0, nextCollName.length()-3); + } + } + assert TimeRoutedAliasUpdateProcessor.DATE_TIME_FORMATTER.parse(nextCollName, Instant::from).equals(timestamp); + return aliasName + "_" + nextCollName; + } + @Override public void processDelete(DeleteUpdateCommand cmd) throws IOException { final List nodes = lookupShardLeadersOfCollections(); diff --git a/solr/core/src/java/org/apache/solr/util/TimeZoneUtils.java b/solr/core/src/java/org/apache/solr/util/TimeZoneUtils.java index 9d11f81f03e..0600a83170f 100644 --- a/solr/core/src/java/org/apache/solr/util/TimeZoneUtils.java +++ b/solr/core/src/java/org/apache/solr/util/TimeZoneUtils.java @@ -25,6 +25,8 @@ import java.util.Arrays; import java.util.regex.Pattern; import java.util.regex.Matcher; +import org.apache.solr.common.SolrException; + /** * Simple utilities for working with TimeZones * @see java.util.TimeZone @@ -82,4 +84,20 @@ public final class TimeZoneUtils { private static Pattern CUSTOM_ID_REGEX = Pattern.compile("GMT(?:\\+|\\-)(\\d{1,2})(?::?(\\d{2}))?"); + /** + * Parse the specified timezone ID. If null input then return UTC. If we can't resolve it then + * throw an exception. + */ + public static TimeZone parseTimezone(String tzStr) { + if (tzStr != null) { + TimeZone tz = getTimeZone(tzStr); + if (null == tz) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Solr JVM does not support TZ: " + tzStr); + } + return tz; + } else { + return DateMathParser.UTC; //TODO move to TimeZoneUtils + } + } } diff --git a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java index f7f200f58ae..db4b877657c 100644 --- a/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java +++ b/solr/core/src/test/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessorTest.java @@ -19,16 +19,20 @@ package org.apache.solr.update.processor; import java.io.IOException; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.List; -import java.util.function.UnaryOperator; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.apache.lucene.util.IOUtils; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.ConfigSetAdminRequest; import org.apache.solr.client.solrj.request.V2Request; @@ -39,12 +43,14 @@ import org.apache.solr.cloud.SolrCloudTestCase; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; -import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.NamedList; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.util.DefaultSolrThreadFactory; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -52,7 +58,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { static final String configName = "timeConfig"; static final String alias = "myalias"; - static final String timeField = "timestamp"; + static final String timeField = "timestamp_dt"; static final String intField = "integer_i"; static SolrClient solrClient; @@ -71,6 +77,14 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { IOUtils.close(solrClient); } + //TODO this is necessary when -Dtests.iters but why? Some other tests aren't affected + @Before + public void doBefore() throws Exception { + for (String col : CollectionAdminRequest.listCollections(solrClient)) { + CollectionAdminRequest.deleteCollection(col).process(solrClient); + } + } + @Test public void test() throws Exception { // First create a config using REST API. To do this, we create a collection with the name of the eventual config. @@ -91,18 +105,21 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { " 'fieldName':'" + intField + "'" + " }," + "}").build())); + // only sometimes test with "tolerant" URP + final String urpNames = "inc" + (random().nextBoolean() ? ",tolerant" : ""); checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config/params") .withMethod(SolrRequest.METHOD.POST) .withPayload("{" + " 'set' : {" + - " '_UPDATE' : {'processor':'inc,tolerant'}" + + " '_UPDATE' : {'processor':'" + urpNames + "'}" + " }" + "}").build())); CollectionAdminRequest.deleteCollection(configName).process(solrClient); // start with one collection and an alias for it final String col23rd = alias + "_2017-10-23"; - CollectionAdminRequest.createCollection(col23rd, configName, 1, 1) + CollectionAdminRequest.createCollection(col23rd, configName, 2, 2) + .setMaxShardsPerNode(2) .withProperty(TimeRoutedAliasUpdateProcessor.TIME_PARTITION_ALIAS_NAME_CORE_PROP, alias) .process(solrClient); @@ -112,30 +129,29 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { CollectionAdminRequest.createAlias(alias, col23rd).process(solrClient); //TODO use SOLR-11617 client API to set alias metadata final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader(); - UnaryOperator op = a -> a.cloneWithCollectionAliasMetadata(alias, TimeRoutedAliasUpdateProcessor.ROUTER_FIELD_METADATA, timeField); - zkStateReader.aliasesHolder.applyModificationAndExportToZk(op); + zkStateReader.aliasesHolder.applyModificationAndExportToZk(a -> + a.cloneWithCollectionAliasMetadata(alias, TimeRoutedAliasUpdateProcessor.ROUTER_FIELD_METADATA, timeField) + .cloneWithCollectionAliasMetadata(alias, "collection-create.collection.configName", configName) + .cloneWithCollectionAliasMetadata(alias, "collection-create.numShards", "1") + .cloneWithCollectionAliasMetadata(alias, "collection-create.replicationFactor", "1") + .cloneWithCollectionAliasMetadata(alias, "router.interval", "+1DAY")); // now we index a document - solrClient.add(alias, newDoc(Instant.parse("2017-10-23T00:00:00Z"))); + assertUpdateResponse(solrClient.add(alias, newDoc(Instant.parse("2017-10-23T00:00:00Z")))); solrClient.commit(alias); //assertDocRoutedToCol(lastDocId, col23rd); - assertInvariants(); + assertInvariants(col23rd); - // a document that is too old (throws exception... if we have a TolerantUpdateProcessor then we see it there) - try { - final UpdateResponse resp = solrClient.add(alias, newDoc(Instant.parse("2017-10-01T00:00:00Z"))); - final Object errors = resp.getResponseHeader().get("errors"); - assertTrue(errors != null && errors.toString().contains("couldn't be routed")); - } catch (SolrException e) { - assertTrue(e.getMessage().contains("couldn't be routed")); - } - numDocsDeletedOrFailed++; + // a document that is too old + testFailedDocument(Instant.parse("2017-10-01T00:00:00Z"), "couldn't be routed"); + + // a document which is too far into the future + testFailedDocument(Instant.now().plus(30, ChronoUnit.MINUTES), "too far in the future"); // add another collection, add to alias (soonest comes first) final String col24th = alias + "_2017-10-24"; - CollectionAdminRequest.createCollection(col24th, configName, 2, 2) // more shards and replicas now - .setMaxShardsPerNode(2) + CollectionAdminRequest.createCollection(col24th, configName, 1, 1) // more shards and replicas now .withProperty("timePartitionAliasName", alias) .process(solrClient); CollectionAdminRequest.createAlias(alias, col24th + "," + col23rd).process(solrClient); @@ -146,7 +162,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { newDoc(Instant.parse("2017-10-24T01:00:00Z")), newDoc(Instant.parse("2017-10-24T02:00:00Z")) ); - assertInvariants(); + assertInvariants(col24th, col23rd); // assert that the IncrementURP has updated all '0' to '1' final SolrDocumentList checkIncResults = solrClient.query(alias, params("q", "NOT " + intField + ":1")).getResults(); @@ -154,16 +170,45 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { //delete a random document id; ensure we don't find it int idToDelete = 1 + random().nextInt(lastDocId); - if (idToDelete == 2) { // #2 didn't make it - idToDelete++; + if (idToDelete == 2 || idToDelete == 3) { // these didn't make it + idToDelete = 4; } - solrClient.deleteById(alias, Integer.toString(idToDelete)); - solrClient.commit(alias); + assertUpdateResponse(solrClient.deleteById(alias, Integer.toString(idToDelete))); + assertUpdateResponse(solrClient.commit(alias)); numDocsDeletedOrFailed++; - assertInvariants(); + assertInvariants(col24th, col23rd); + + // delete the Oct23rd (save memory)... + // make sure we track that we are effectively deleting docs there + numDocsDeletedOrFailed += solrClient.query(col23rd, params("q", "*:*", "rows", "0")).getResults().getNumFound(); + // remove from alias + CollectionAdminRequest.createAlias(alias, col24th).process(solrClient); + // delete the collection + CollectionAdminRequest.deleteCollection(col23rd).process(solrClient); + + // now we're going to add documents that will trigger more collections to be created + // for 25th & 26th + addDocsAndCommit( + newDoc(Instant.parse("2017-10-24T03:00:00Z")), + newDoc(Instant.parse("2017-10-25T04:00:00Z")), + newDoc(Instant.parse("2017-10-26T05:00:00Z")) + ); + assertInvariants(alias + "_2017-10-26", alias + "_2017-10-25", col24th); } - private void checkNoError(NamedList response) { + private void testFailedDocument(Instant timestamp, String errorMsg) throws SolrServerException, IOException { + try { + final UpdateResponse resp = solrClient.add(alias, newDoc(timestamp)); + // if we have a TolerantUpdateProcessor then we see it there) + final Object errors = resp.getResponseHeader().get("errors"); // Tolerant URP + assertTrue(errors != null && errors.toString().contains(errorMsg)); + } catch (SolrException e) { + assertTrue(e.getMessage().contains(errorMsg)); + } + numDocsDeletedOrFailed++; + } + + private void checkNoError(NamedList response) { //TODO rename Object errors = response.get("errorMessages"); assertNull("" + errors, errors); } @@ -171,7 +216,8 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { /** Adds these documents and commits, returning when they are committed. * We randomly go about this in different ways. */ private void addDocsAndCommit(SolrInputDocument... solrInputDocuments) throws Exception { - // we assume these are not old docs! + // we assume all docs will be added (none too old/new to cause exception) + Collections.shuffle(Arrays.asList(solrInputDocuments), random()); // this is a list of the collections & the alias name. Use to pick randomly where to send. // (it doesn't matter where we send docs since the alias is honored at the URP level) @@ -182,15 +228,27 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { int commitWithin = random().nextBoolean() ? -1 : 500; // if -1, we commit explicitly instead int numDocsBefore = queryNumDocs(); if (random().nextBoolean()) { - // send in separate requests - for (SolrInputDocument solrInputDocument : solrInputDocuments) { - String col = collections.get(random().nextInt(collections.size())); - solrClient.add(col, solrInputDocument, commitWithin); + // Send in separate threads. Choose random collection & solrClient + try (CloudSolrClient solrClient = getCloudSolrClient(cluster)) { + ExecutorService exec = ExecutorUtil.newMDCAwareFixedThreadPool(1 + random().nextInt(2), + new DefaultSolrThreadFactory(getTestName())); + List> futures = new ArrayList<>(solrInputDocuments.length); + for (SolrInputDocument solrInputDocument : solrInputDocuments) { + String col = collections.get(random().nextInt(collections.size())); + futures.add(exec.submit(() -> solrClient.add(col, solrInputDocument, commitWithin))); + } + for (Future future : futures) { + assertUpdateResponse(future.get()); + } + // at this point there shouldn't be any tasks running + assertEquals(0, exec.shutdownNow().size()); } } else { // send in a batch. String col = collections.get(random().nextInt(collections.size())); - solrClient.add(col, Arrays.asList(solrInputDocuments), commitWithin); + try (CloudSolrClient solrClient = getCloudSolrClient(cluster)) { + assertUpdateResponse(solrClient.add(col, Arrays.asList(solrInputDocuments), commitWithin)); + } } String col = collections.get(random().nextInt(collections.size())); if (commitWithin == -1) { @@ -210,21 +268,30 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { } } + private void assertUpdateResponse(UpdateResponse rsp) { + // use of TolerantUpdateProcessor can cause non-thrown "errors" that we need to check for + List errors = (List) rsp.getResponseHeader().get("errors"); + assertTrue("Expected no errors: " + errors,errors == null || errors.isEmpty()); + } + private int queryNumDocs() throws SolrServerException, IOException { return (int) solrClient.query(alias, params("q", "*:*", "rows", "0")).getResults().getNumFound(); } - private void assertInvariants() throws IOException, SolrServerException { + private void assertInvariants(String... expectedColls) throws IOException, SolrServerException { final int expectNumFound = lastDocId - numDocsDeletedOrFailed; //lastDocId is effectively # generated docs final List cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias); assert !cols.isEmpty(); + assertArrayEquals("expected reverse sorted", + cols.stream().sorted(Collections.reverseOrder()).toArray(), + cols.toArray()); + int totalNumFound = 0; Instant colEndInstant = null; // exclusive end - for (String col : cols) { + for (String col : cols) { // ASSUMPTION: reverse sorted order final Instant colStartInstant = TimeRoutedAliasUpdateProcessor.parseInstantFromCollectionName(alias, col); - //TODO do this in parallel threads final QueryResponse colStatsResp = solrClient.query(col, params( "q", "*:*", "rows", "0", @@ -243,6 +310,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { colEndInstant = colStartInstant; // next older segment will max out at our current start time } assertEquals(expectNumFound, totalNumFound); + assertArrayEquals(expectedColls, cols.toArray()); } private SolrInputDocument newDoc(Instant timestamp) { diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java index 77dd454c0a0..9d5fc36bb67 100644 --- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java +++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java @@ -78,6 +78,7 @@ public interface CollectionParams { CREATEALIAS(true, LockLevel.COLLECTION), DELETEALIAS(true, LockLevel.COLLECTION), LISTALIASES(false, LockLevel.NONE), + ROUTEDALIAS_CREATECOLL(true, LockLevel.COLLECTION), SPLITSHARD(true, LockLevel.SHARD), DELETESHARD(true, LockLevel.SHARD), CREATESHARD(true, LockLevel.COLLECTION),