SOLR-11653: TimeRoutedAlias URP now auto-creates collections using new RoutedAliasCreateCollectionCmd

This commit is contained in:
David Smiley 2018-01-05 13:53:26 -05:00
parent 3980aea18d
commit 925733d1ef
11 changed files with 539 additions and 76 deletions

View File

@ -74,6 +74,10 @@ New Features
* SOLR-11201: Implement autoscaling trigger for arbitrary metrics that creates events when
a given metric breaches a threshold (shalin)
* SOLR-11653: TimeRoutedAlias URP now auto-creates new collections on the fly according to alias metadata
rules that sets the time interval for each collection. An internal Overseer command "ROUTEDALIAS_CREATECOLL"
was created to facilitate this. (David Smiley)
Bug Fixes
----------------------

View File

@ -39,6 +39,7 @@ import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.cloud.overseer.ZkStateWriter;
import org.apache.solr.cloud.overseer.ZkWriteCommand;
import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
@ -267,6 +268,9 @@ public class Overseer implements SolrCloseable {
private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter, boolean enableBatching, ZkStateWriter.ZkWriteCallback callback) throws Exception {
final String operation = message.getStr(QUEUE_OPERATION);
if (operation == null) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Message missing " + QUEUE_OPERATION + ":" + message);
}
List<ZkWriteCommand> zkWriteCommands = null;
final Timer.Context timerContext = stats.time(operation);
try {

View File

@ -219,6 +219,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
.put(DELETE, new DeleteCollectionCmd(this))
.put(CREATEALIAS, new CreateAliasCmd(this))
.put(DELETEALIAS, new DeleteAliasCmd(this))
.put(ROUTEDALIAS_CREATECOLL, new RoutedAliasCreateCollectionCmd(this))
.put(OVERSEERSTATUS, new OverseerStatusCmd(this))
.put(DELETESHARD, new DeleteShardCmd(this))
.put(DELETEREPLICA, new DeleteReplicaCmd(this))
@ -232,7 +233,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
@Override
@SuppressWarnings("unchecked")
public SolrResponse processMessage(ZkNodeProps message, String operation) {
log.debug("OverseerCollectionMessageHandler.processMessage : "+ operation + " , "+ message.toString());
log.debug("OverseerCollectionMessageHandler.processMessage : {} , {}", operation, message);
NamedList results = new NamedList();
try {

View File

@ -257,7 +257,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
}
if (runningZKTasks.contains(head.getId())) continue;
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message);
final String asyncId = message.getStr(ASYNC);
if (hasLeftOverItems) {
if (head.getId().equals(oldestItemInWorkQueue))
@ -269,6 +268,12 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
}
}
String operation = message.getStr(Overseer.QUEUE_OPERATION);
if (operation == null) {
log.error("Msg does not have required " + Overseer.QUEUE_OPERATION + ": {}", message);
workQueue.remove(head);
continue;
}
OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message);
OverseerMessageHandler.Lock lock = messageHandler.lockTask(message, taskBatch);
if (lock == null) {
log.debug("Exclusivity check failed for [{}]", message.toString());

View File

@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor;
import org.apache.solr.util.TimeZoneUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.ROUTER_FIELD_METADATA;
import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.ROUTER_INTERVAL_METADATA;
/**
* For "routed aliases", creates another collection and adds it to the alias. In some cases it will not
* add a new collection.
* If a collection is created, then collection creation info is returned.
*
* Note: this logic is within an Overseer because we want to leverage the mutual exclusion
* property afforded by the lock it obtains on the alias name.
* @since 7.3
*/
public class RoutedAliasCreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String IF_MOST_RECENT_COLL_NAME = "ifMostRecentCollName";
public static final String COLL_METAPREFIX = "collection-create.";
private final OverseerCollectionMessageHandler ocmh;
public RoutedAliasCreateCollectionCmd(OverseerCollectionMessageHandler ocmh) {
this.ocmh = ocmh;
}
/* TODO:
There are a few classes related to time routed alias processing. We need to share some logic better.
*/
@Override
public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
//---- PARSE PRIMARY MESSAGE PARAMS
// important that we use NAME for the alias as that is what the Overseer will get a lock on before calling us
final String aliasName = message.getStr(NAME);
// the client believes this is the mostRecent collection name. We assert this if provided.
final String ifMostRecentCollName = message.getStr(IF_MOST_RECENT_COLL_NAME); // optional
// TODO collection param (or intervalDateMath override?), useful for data capped collections
//---- PARSE ALIAS INFO FROM ZK
final ZkStateReader.AliasesManager aliasesHolder = ocmh.zkStateReader.aliasesHolder;
final Aliases aliases = aliasesHolder.getAliases();
final Map<String, String> aliasMetadata = aliases.getCollectionAliasMetadata(aliasName);
if (aliasMetadata == null) {
throw newAliasMustExistException(aliasName); // if it did exist, we'd have a non-null map
}
String routeField = aliasMetadata.get(ROUTER_FIELD_METADATA);
if (routeField == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"This command only works on time routed aliases. Expected alias metadata not found.");
}
String intervalDateMath = aliasMetadata.getOrDefault(ROUTER_INTERVAL_METADATA, "+1DAY");
TimeZone intervalTimeZone = TimeZoneUtils.parseTimezone(aliasMetadata.get(CommonParams.TZ));
//TODO this is ugly; how can we organize the code related to this feature better?
final List<Map.Entry<Instant, String>> parsedCollections =
TimeRoutedAliasUpdateProcessor.parseCollections(aliasName, aliases, () -> newAliasMustExistException(aliasName));
//---- GET MOST RECENT COLL
final Map.Entry<Instant, String> mostRecentEntry = parsedCollections.get(0);
final Instant mostRecentCollTimestamp = mostRecentEntry.getKey();
final String mostRecentCollName = mostRecentEntry.getValue();
if (ifMostRecentCollName != null) {
if (!mostRecentCollName.equals(ifMostRecentCollName)) {
// Possibly due to race conditions in URPs on multiple leaders calling us at the same time
String msg = IF_MOST_RECENT_COLL_NAME + " expected " + ifMostRecentCollName + " but it's " + mostRecentCollName;
if (parsedCollections.stream().map(Map.Entry::getValue).noneMatch(ifMostRecentCollName::equals)) {
msg += ". Furthermore this collection isn't in the list of collections referenced by the alias.";
}
log.info(msg);
results.add("message", msg);
return;
}
} else if (mostRecentCollTimestamp.isAfter(Instant.now())) {
final String msg = "Most recent collection is in the future, so we won't create another.";
log.info(msg);
results.add("message", msg);
return;
}
//---- COMPUTE NEXT COLLECTION NAME
final Instant nextCollTimestamp = TimeRoutedAliasUpdateProcessor.computeNextCollTimestamp(mostRecentCollTimestamp, intervalDateMath, intervalTimeZone);
assert nextCollTimestamp.isAfter(mostRecentCollTimestamp);
final String createCollName = TimeRoutedAliasUpdateProcessor.formatCollectionNameFromInstant(aliasName, nextCollTimestamp);
//---- CREATE THE COLLECTION
// Map alias metadata starting with a prefix to a create-collection API request
final ModifiableSolrParams createReqParams = new ModifiableSolrParams();
for (Map.Entry<String, String> e : aliasMetadata.entrySet()) {
if (e.getKey().startsWith(COLL_METAPREFIX)) {
createReqParams.set(e.getKey().substring(COLL_METAPREFIX.length()), e.getValue());
}
}
if (createReqParams.get(COLL_CONF) == null) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"We require an explicit " + COLL_CONF );
}
createReqParams.set(NAME, createCollName);
createReqParams.set("property." + TimeRoutedAliasUpdateProcessor.TIME_PARTITION_ALIAS_NAME_CORE_PROP, aliasName);
// a CollectionOperation reads params and produces a message (Map) that is supposed to be sent to the Overseer.
// Although we could create the Map without it, there are a fair amount of rules we don't want to reproduce.
final Map<String, Object> createMsgMap = CollectionsHandler.CollectionOperation.CREATE_OP.execute(
new LocalSolrQueryRequest(null, createReqParams),
null,
ocmh.overseer.getCoreContainer().getCollectionsHandler());
createMsgMap.put(Overseer.QUEUE_OPERATION, "create");
// Since we are running in the Overseer here, send the message directly to the Overseer CreateCollectionCmd
ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, new ZkNodeProps(createMsgMap), results);
CollectionsHandler.waitForActiveCollection(createCollName, null, ocmh.overseer.getCoreContainer(), new OverseerSolrResponse(results));
//TODO delete some of the oldest collection(s) ?
//---- UPDATE THE ALIAS
aliasesHolder.applyModificationAndExportToZk(curAliases -> {
final List<String> curTargetCollections = curAliases.getCollectionAliasListMap().get(aliasName);
if (curTargetCollections.contains(createCollName)) {
return curAliases;
} else {
List<String> newTargetCollections = new ArrayList<>(curTargetCollections.size() + 1);
// prepend it on purpose (thus reverse sorted). Solr alias resolution defaults to the first collection in a list
newTargetCollections.add(createCollName);
newTargetCollections.addAll(curTargetCollections);
return curAliases.cloneWithCollectionAlias(aliasName, StrUtils.join(newTargetCollections, ','));
}
});
}
private SolrException newAliasMustExistException(String aliasName) {
return new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Alias " + aliasName + " does not exist.");
}
}

View File

@ -260,16 +260,19 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
public static long DEFAULT_COLLECTION_OP_TIMEOUT = 180*1000;
void handleResponse(String operation, ZkNodeProps m,
//TODO rename to submitToOverseerRPC
public void handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp) throws KeeperException, InterruptedException {
handleResponse(operation, m, rsp, DEFAULT_COLLECTION_OP_TIMEOUT);
}
private SolrResponse handleResponse(String operation, ZkNodeProps m,
//TODO rename to submitToOverseerRPC
public SolrResponse handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException {
long time = System.nanoTime();
if (m.containsKey(ASYNC) && m.get(ASYNC) != null) {
if (!m.containsKey(QUEUE_OPERATION)) {
throw new SolrException(ErrorCode.BAD_REQUEST, "missing key " + QUEUE_OPERATION);
}
if (m.get(ASYNC) != null) {
String asyncId = m.getStr(ASYNC);
@ -297,6 +300,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
return response;
}
long time = System.nanoTime();
QueueEvent event = coreContainer.getZkController()
.getOverseerCollectionQueue()
.offer(Utils.toJSON(m), timeout);
@ -1031,7 +1035,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
}
}
private static void waitForActiveCollection(String collectionName, ZkNodeProps message, CoreContainer cc, SolrResponse response)
public static void waitForActiveCollection(String collectionName, ZkNodeProps message, CoreContainer cc, SolrResponse response)
throws KeeperException, InterruptedException {
if (response.getResponse().get("exception") != null) {

View File

@ -101,17 +101,9 @@ public class SolrRequestInfo {
}
/** The TimeZone specified by the request, or null if none was specified */
public TimeZone getClientTimeZone() {
public TimeZone getClientTimeZone() {
if (tz == null) {
String tzStr = req.getParams().get(CommonParams.TZ);
if (tzStr != null) {
tz = TimeZoneUtils.getTimeZone(tzStr);
if (null == tz) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Solr JVM does not support TZ: " + tzStr);
}
}
tz = TimeZoneUtils.parseTimezone(req.getParams().get(CommonParams.TZ));
}
return tz;
}

View File

@ -19,6 +19,7 @@ package org.apache.solr.update.processor;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.text.ParseException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
@ -29,22 +30,34 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.RoutedAliasCreateCollectionCmd;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.AddUpdateCommand;
@ -52,14 +65,18 @@ import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.SolrCmdDistributor;
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
import org.apache.solr.util.DateMathParser;
import org.apache.solr.util.TimeZoneUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.handler.admin.CollectionsHandler.DEFAULT_COLLECTION_OP_TIMEOUT;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
/**
* Distributes update requests to rolling series of collections partitioned by a timestamp field.
* Distributes update requests to a rolling series of collections partitioned by a timestamp field. Issues
* requests to create new collections on-demand.
*
* Depends on this core having a special core property that points to the alias name that this collection is a part of.
* And further requires certain metadata on the Alias.
@ -69,16 +86,15 @@ import static org.apache.solr.update.processor.DistributingUpdateProcessorFactor
public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
//TODO do we make this more generic to others who want to partition collections using something else?
// TODO auto add new collection partitions when cross a timestamp boundary. That needs to be coordinated to avoid
// race conditions, remembering that even the lead collection might have multiple instances of this URP
// (multiple shards or perhaps just multiple streams thus instances of this URP)
public static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param
public static final String TIME_PARTITION_ALIAS_NAME_CORE_PROP = "timePartitionAliasName"; // core prop
public static final String ROUTER_FIELD_METADATA = "router.field"; // alias metadata
// alias metadata:
public static final String ROUTER_FIELD_METADATA = "router.field";
public static final String ROUTER_MAX_FUTURE_TIME_METADATA = "router.maxFutureMs";
public static final String ROUTER_INTERVAL_METADATA = "router.interval";
// This format must be compatible with collection name limitations
private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
public static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
.append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("[_HH[_mm[_ss]]]") //brackets mean optional
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
@ -87,18 +103,26 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
// used to limit unnecessary concurrent collection creation requests
private static ConcurrentHashMap<String, Semaphore> aliasToSemaphoreMap = new ConcurrentHashMap<>(4);
private final String thisCollection;
private final String aliasName;
private final String routeField;
private final long maxFutureMs;
private final String intervalDateMath;
private final TimeZone intervalTimeZone;
private final SolrCmdDistributor cmdDistrib;
private final ZkController zkController;
private final SolrCmdDistributor cmdDistrib;
private final CollectionsHandler collHandler;
private final SolrParams outParamsToLeader;
private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection. Sorted descending
private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
public static UpdateRequestProcessor wrap(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
//TODO get from "Collection property"
final String timePartitionAliasName = req.getCore().getCoreDescriptor()
.getCoreProperty(TIME_PARTITION_ALIAS_NAME_CORE_PROP, null);
final DistribPhase shardDistribPhase =
@ -126,12 +150,21 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
CoreContainer cc = core.getCoreContainer();
zkController = cc.getZkController();
cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
collHandler = cc.getCollectionsHandler();
final Map<String, String> aliasMetadata = zkController.getZkStateReader().getAliases().getCollectionAliasMetadata(aliasName);
if (aliasMetadata == null) {
throw newAliasMustExistException(); // if it did exist, we'd have a non-null map
}
routeField = aliasMetadata.get(ROUTER_FIELD_METADATA);
intervalDateMath = aliasMetadata.getOrDefault(ROUTER_INTERVAL_METADATA, "+1DAY");
String futureTimeStr = aliasMetadata.get(ROUTER_MAX_FUTURE_TIME_METADATA);
if (futureTimeStr != null) {
maxFutureMs = Long.parseLong(futureTimeStr);
} else {
maxFutureMs = TimeUnit.MINUTES.toMillis(10);
}
intervalTimeZone = TimeZoneUtils.parseTimezone(aliasMetadata.get(CommonParams.TZ));
ModifiableSolrParams outParams = new ModifiableSolrParams(req.getParams());
// Don't distribute these params; they will be distributed from the local processCommit separately.
@ -153,11 +186,59 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
final Object routeValue = cmd.getSolrInputDocument().getFieldValue(routeField);
final String targetCollection = findTargetCollectionGivenRouteKey(routeValue);
if (targetCollection == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Doc " + cmd.getPrintableId() + " couldn't be routed with " + routeField + "=" + routeValue);
}
final Instant routeTimestamp = parseRouteKey(routeValue);
updateParsedCollectionAliases();
String targetCollection;
do {
targetCollection = findTargetCollectionGivenTimestamp(routeTimestamp);
if (targetCollection == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Doc " + cmd.getPrintableId() + " couldn't be routed with " + routeField + "=" + routeTimestamp);
}
// Note: the following rule is tempting but not necessary and is not compatible with
// only using this URP when the alias distrib phase is NONE; otherwise a doc may be routed to from a non-recent
// collection to the most recent only to then go there directly instead of realizing a new collection is needed.
// // If it's going to some other collection (not "this") then break to just send it there
// if (!thisCollection.equals(targetCollection)) {
// break;
// }
// Also tempting but not compatible: check that we're the leader, if not then break
// If the doc goes to the most recent collection then do some checks below, otherwise break the loop.
final Instant mostRecentCollTimestamp = parsedCollectionsDesc.get(0).getKey();
final String mostRecentCollName = parsedCollectionsDesc.get(0).getValue();
if (!mostRecentCollName.equals(targetCollection)) {
break;
}
// Check the doc isn't too far in the future
final Instant maxFutureTime = Instant.now().plusMillis(maxFutureMs);
if (routeTimestamp.isAfter(maxFutureTime)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"The document's time routed key of " + routeValue + " is too far in the future given " +
ROUTER_MAX_FUTURE_TIME_METADATA + "=" + maxFutureMs);
}
// Create a new collection?
final Instant nextCollTimestamp = computeNextCollTimestamp(mostRecentCollTimestamp, intervalDateMath, intervalTimeZone);
if (routeTimestamp.isBefore(nextCollTimestamp)) {
break; // thus we don't need another collection
}
createCollectionAfter(mostRecentCollName); // *should* throw if fails for some reason but...
final boolean updated = updateParsedCollectionAliases();
if (!updated) { // thus we didn't make progress...
// this is not expected, even in known failure cases, but we check just in case
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"We need to create a new time routed collection but for unknown reasons were unable to do so.");
}
// then retry the loop ...
} while(true);
assert targetCollection != null;
if (thisCollection.equals(targetCollection)) {
// pass on through; we've reached the right collection
super.processAdd(cmd);
@ -168,7 +249,23 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
}
}
protected String findTargetCollectionGivenRouteKey(Object routeKey) {
/** Computes the timestamp of the next collection given the timestamp of the one before. */
public static Instant computeNextCollTimestamp(Instant fromTimestamp, String intervalDateMath, TimeZone intervalTimeZone) {
//TODO overload DateMathParser.parseMath to take tz and "now"
final DateMathParser dateMathParser = new DateMathParser(intervalTimeZone);
dateMathParser.setNow(Date.from(fromTimestamp));
final Instant nextCollTimestamp;
try {
nextCollTimestamp = dateMathParser.parseMath(intervalDateMath).toInstant();
} catch (ParseException e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Invalid Date Math String:'" + intervalDateMath +'\'', e);
}
assert nextCollTimestamp.isAfter(fromTimestamp);
return nextCollTimestamp;
}
private Instant parseRouteKey(Object routeKey) {
final Instant docTimestamp;
if (routeKey instanceof Instant) {
docTimestamp = (Instant) routeKey;
@ -179,15 +276,30 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
} else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected type of routeKey: " + routeKey);
}
return docTimestamp;
}
/**
* Ensure {@link #parsedCollectionsAliases} is up to date. If it was modified, return true.
* Note that this will return true if some other alias was modified or if metadata was modified. These
* are spurious and the caller should be written to be tolerant of no material changes.
*/
private boolean updateParsedCollectionAliases() {
final Aliases aliases = zkController.getZkStateReader().getAliases(); // note: might be different from last request
if (this.parsedCollectionsAliases != aliases) {
if (this.parsedCollectionsAliases != null) {
log.info("Observing possibly updated alias {}", aliasName);
log.debug("Observing possibly updated alias: {}", aliasName);
}
this.parsedCollectionsDesc = doParseCollections(aliases);
this.parsedCollectionsDesc = parseCollections(aliasName, aliases, this::newAliasMustExistException);
this.parsedCollectionsAliases = aliases;
return true;
}
// iterates in reverse chronological order
return false;
}
/** Given the route key, finds the collection. Returns null if too old to go in last one. */
private String findTargetCollectionGivenTimestamp(Instant docTimestamp) {
// Lookup targetCollection given route key. Iterates in reverse chronological order.
// We're O(N) here but N should be small, the loop is fast, and usually looking for 1st.
for (Map.Entry<Instant, String> entry : parsedCollectionsDesc) {
Instant colStartTime = entry.getKey();
@ -195,16 +307,77 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
return entry.getValue(); //found it
}
}
return null;
return null; //not found
}
/** Parses the timestamp from the collection list and returns them in reverse sorted order (newest 1st) */
private List<Map.Entry<Instant,String>> doParseCollections(Aliases aliases) {
private void createCollectionAfter(String mostRecentCollName) {
// Invoke ROUTEDALIAS_CREATECOLL (in the Overseer, locked by alias name). It will create the collection
// and update the alias contingent on the most recent collection name being the same as
// what we think so here, otherwise it will return (without error).
// To avoid needless concurrent communication with the Overseer from this JVM, we
// maintain a Semaphore from an alias name keyed ConcurrentHashMap.
// Alternatively a Lock or CountDownLatch could have been used but they didn't seem
// to make it any easier.
final Semaphore semaphore = aliasToSemaphoreMap.computeIfAbsent(aliasName, n -> new Semaphore(1));
if (semaphore.tryAcquire()) {
try {
final String operation = CollectionParams.CollectionAction.ROUTEDALIAS_CREATECOLL.toLower();
Map<String, Object> msg = new HashMap<>();
msg.put(Overseer.QUEUE_OPERATION, operation);
msg.put(CollectionParams.NAME, aliasName);
msg.put(RoutedAliasCreateCollectionCmd.IF_MOST_RECENT_COLL_NAME, mostRecentCollName);
SolrQueryResponse rsp = new SolrQueryResponse();
try {
this.collHandler.handleResponse(
operation,
new ZkNodeProps(msg),
rsp);
if (rsp.getException() != null) {
throw rsp.getException();
} // otherwise don't care about the response. It's possible no collection was created because
// of a race and that's okay... we'll ultimately retry any way.
// Ensure our view of the aliases has updated. If we didn't do this, our zkStateReader might
// not yet know about the new alias (thus won't see the newly added collection to it), and we might think
// we failed.
zkController.getZkStateReader().aliasesHolder.update();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
} finally {
semaphore.release(); // to signal we're done to anyone waiting on it
}
} else {
// Failed to acquire permit because another URP instance on this JVM is creating a collection.
// So wait till it's available
log.debug("Collection creation is already in progress so we'll wait then try again.");
try {
if (semaphore.tryAcquire(DEFAULT_COLLECTION_OP_TIMEOUT, TimeUnit.MILLISECONDS)) {
semaphore.release(); // we don't actually want a permit so give it back
// return to continue...
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Waited too long for another update thread to be done with collection creation.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Interrupted waiting on collection creation.", e); // if we were interrupted, give up.
}
}
}
/** Parses the timestamp from the collection list and returns them in reverse sorted order (most recent 1st) */
public static List<Map.Entry<Instant,String>> parseCollections(String aliasName, Aliases aliases, Supplier<SolrException> aliasNotExist) {
final List<String> collections = aliases.getCollectionAliasListMap().get(aliasName);
if (collections == null) {
throw newAliasMustExistException();
throw aliasNotExist.get();
}
// note: I considered TreeMap but didn't like the log(N) just to grab the head when we use it later
// note: I considered TreeMap but didn't like the log(N) just to grab the most recent when we use it later
List<Map.Entry<Instant,String>> result = new ArrayList<>(collections.size());
for (String collection : collections) {
Instant colStartTime = parseInstantFromCollectionName(aliasName, collection);
@ -225,6 +398,17 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
return DATE_TIME_FORMATTER.parse(dateTimePart, Instant::from);
}
public static String formatCollectionNameFromInstant(String aliasName, Instant timestamp) {
String nextCollName = TimeRoutedAliasUpdateProcessor.DATE_TIME_FORMATTER.format(timestamp);
for (int i = 0; i < 3; i++) { // chop off seconds, minutes, hours
if (nextCollName.endsWith("_00")) {
nextCollName = nextCollName.substring(0, nextCollName.length()-3);
}
}
assert TimeRoutedAliasUpdateProcessor.DATE_TIME_FORMATTER.parse(nextCollName, Instant::from).equals(timestamp);
return aliasName + "_" + nextCollName;
}
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();

View File

@ -25,6 +25,8 @@ import java.util.Arrays;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
import org.apache.solr.common.SolrException;
/**
* Simple utilities for working with TimeZones
* @see java.util.TimeZone
@ -82,4 +84,20 @@ public final class TimeZoneUtils {
private static Pattern CUSTOM_ID_REGEX = Pattern.compile("GMT(?:\\+|\\-)(\\d{1,2})(?::?(\\d{2}))?");
/**
* Parse the specified timezone ID. If null input then return UTC. If we can't resolve it then
* throw an exception.
*/
public static TimeZone parseTimezone(String tzStr) {
if (tzStr != null) {
TimeZone tz = getTimeZone(tzStr);
if (null == tz) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Solr JVM does not support TZ: " + tzStr);
}
return tz;
} else {
return DateMathParser.UTC; //TODO move to TimeZoneUtils
}
}
}

View File

@ -19,16 +19,20 @@ package org.apache.solr.update.processor;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.function.UnaryOperator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
import org.apache.solr.client.solrj.request.V2Request;
@ -39,12 +43,14 @@ import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@ -52,7 +58,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
static final String configName = "timeConfig";
static final String alias = "myalias";
static final String timeField = "timestamp";
static final String timeField = "timestamp_dt";
static final String intField = "integer_i";
static SolrClient solrClient;
@ -71,6 +77,14 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
IOUtils.close(solrClient);
}
//TODO this is necessary when -Dtests.iters but why? Some other tests aren't affected
@Before
public void doBefore() throws Exception {
for (String col : CollectionAdminRequest.listCollections(solrClient)) {
CollectionAdminRequest.deleteCollection(col).process(solrClient);
}
}
@Test
public void test() throws Exception {
// First create a config using REST API. To do this, we create a collection with the name of the eventual config.
@ -91,18 +105,21 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
" 'fieldName':'" + intField + "'" +
" }," +
"}").build()));
// only sometimes test with "tolerant" URP
final String urpNames = "inc" + (random().nextBoolean() ? ",tolerant" : "");
checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config/params")
.withMethod(SolrRequest.METHOD.POST)
.withPayload("{" +
" 'set' : {" +
" '_UPDATE' : {'processor':'inc,tolerant'}" +
" '_UPDATE' : {'processor':'" + urpNames + "'}" +
" }" +
"}").build()));
CollectionAdminRequest.deleteCollection(configName).process(solrClient);
// start with one collection and an alias for it
final String col23rd = alias + "_2017-10-23";
CollectionAdminRequest.createCollection(col23rd, configName, 1, 1)
CollectionAdminRequest.createCollection(col23rd, configName, 2, 2)
.setMaxShardsPerNode(2)
.withProperty(TimeRoutedAliasUpdateProcessor.TIME_PARTITION_ALIAS_NAME_CORE_PROP, alias)
.process(solrClient);
@ -112,30 +129,29 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
CollectionAdminRequest.createAlias(alias, col23rd).process(solrClient);
//TODO use SOLR-11617 client API to set alias metadata
final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
UnaryOperator<Aliases> op = a -> a.cloneWithCollectionAliasMetadata(alias, TimeRoutedAliasUpdateProcessor.ROUTER_FIELD_METADATA, timeField);
zkStateReader.aliasesHolder.applyModificationAndExportToZk(op);
zkStateReader.aliasesHolder.applyModificationAndExportToZk(a ->
a.cloneWithCollectionAliasMetadata(alias, TimeRoutedAliasUpdateProcessor.ROUTER_FIELD_METADATA, timeField)
.cloneWithCollectionAliasMetadata(alias, "collection-create.collection.configName", configName)
.cloneWithCollectionAliasMetadata(alias, "collection-create.numShards", "1")
.cloneWithCollectionAliasMetadata(alias, "collection-create.replicationFactor", "1")
.cloneWithCollectionAliasMetadata(alias, "router.interval", "+1DAY"));
// now we index a document
solrClient.add(alias, newDoc(Instant.parse("2017-10-23T00:00:00Z")));
assertUpdateResponse(solrClient.add(alias, newDoc(Instant.parse("2017-10-23T00:00:00Z"))));
solrClient.commit(alias);
//assertDocRoutedToCol(lastDocId, col23rd);
assertInvariants();
assertInvariants(col23rd);
// a document that is too old (throws exception... if we have a TolerantUpdateProcessor then we see it there)
try {
final UpdateResponse resp = solrClient.add(alias, newDoc(Instant.parse("2017-10-01T00:00:00Z")));
final Object errors = resp.getResponseHeader().get("errors");
assertTrue(errors != null && errors.toString().contains("couldn't be routed"));
} catch (SolrException e) {
assertTrue(e.getMessage().contains("couldn't be routed"));
}
numDocsDeletedOrFailed++;
// a document that is too old
testFailedDocument(Instant.parse("2017-10-01T00:00:00Z"), "couldn't be routed");
// a document which is too far into the future
testFailedDocument(Instant.now().plus(30, ChronoUnit.MINUTES), "too far in the future");
// add another collection, add to alias (soonest comes first)
final String col24th = alias + "_2017-10-24";
CollectionAdminRequest.createCollection(col24th, configName, 2, 2) // more shards and replicas now
.setMaxShardsPerNode(2)
CollectionAdminRequest.createCollection(col24th, configName, 1, 1) // more shards and replicas now
.withProperty("timePartitionAliasName", alias)
.process(solrClient);
CollectionAdminRequest.createAlias(alias, col24th + "," + col23rd).process(solrClient);
@ -146,7 +162,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
newDoc(Instant.parse("2017-10-24T01:00:00Z")),
newDoc(Instant.parse("2017-10-24T02:00:00Z"))
);
assertInvariants();
assertInvariants(col24th, col23rd);
// assert that the IncrementURP has updated all '0' to '1'
final SolrDocumentList checkIncResults = solrClient.query(alias, params("q", "NOT " + intField + ":1")).getResults();
@ -154,16 +170,45 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
//delete a random document id; ensure we don't find it
int idToDelete = 1 + random().nextInt(lastDocId);
if (idToDelete == 2) { // #2 didn't make it
idToDelete++;
if (idToDelete == 2 || idToDelete == 3) { // these didn't make it
idToDelete = 4;
}
solrClient.deleteById(alias, Integer.toString(idToDelete));
solrClient.commit(alias);
assertUpdateResponse(solrClient.deleteById(alias, Integer.toString(idToDelete)));
assertUpdateResponse(solrClient.commit(alias));
numDocsDeletedOrFailed++;
assertInvariants();
assertInvariants(col24th, col23rd);
// delete the Oct23rd (save memory)...
// make sure we track that we are effectively deleting docs there
numDocsDeletedOrFailed += solrClient.query(col23rd, params("q", "*:*", "rows", "0")).getResults().getNumFound();
// remove from alias
CollectionAdminRequest.createAlias(alias, col24th).process(solrClient);
// delete the collection
CollectionAdminRequest.deleteCollection(col23rd).process(solrClient);
// now we're going to add documents that will trigger more collections to be created
// for 25th & 26th
addDocsAndCommit(
newDoc(Instant.parse("2017-10-24T03:00:00Z")),
newDoc(Instant.parse("2017-10-25T04:00:00Z")),
newDoc(Instant.parse("2017-10-26T05:00:00Z"))
);
assertInvariants(alias + "_2017-10-26", alias + "_2017-10-25", col24th);
}
private void checkNoError(NamedList<Object> response) {
private void testFailedDocument(Instant timestamp, String errorMsg) throws SolrServerException, IOException {
try {
final UpdateResponse resp = solrClient.add(alias, newDoc(timestamp));
// if we have a TolerantUpdateProcessor then we see it there)
final Object errors = resp.getResponseHeader().get("errors"); // Tolerant URP
assertTrue(errors != null && errors.toString().contains(errorMsg));
} catch (SolrException e) {
assertTrue(e.getMessage().contains(errorMsg));
}
numDocsDeletedOrFailed++;
}
private void checkNoError(NamedList<Object> response) { //TODO rename
Object errors = response.get("errorMessages");
assertNull("" + errors, errors);
}
@ -171,7 +216,8 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
/** Adds these documents and commits, returning when they are committed.
* We randomly go about this in different ways. */
private void addDocsAndCommit(SolrInputDocument... solrInputDocuments) throws Exception {
// we assume these are not old docs!
// we assume all docs will be added (none too old/new to cause exception)
Collections.shuffle(Arrays.asList(solrInputDocuments), random());
// this is a list of the collections & the alias name. Use to pick randomly where to send.
// (it doesn't matter where we send docs since the alias is honored at the URP level)
@ -182,15 +228,27 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
int commitWithin = random().nextBoolean() ? -1 : 500; // if -1, we commit explicitly instead
int numDocsBefore = queryNumDocs();
if (random().nextBoolean()) {
// send in separate requests
for (SolrInputDocument solrInputDocument : solrInputDocuments) {
String col = collections.get(random().nextInt(collections.size()));
solrClient.add(col, solrInputDocument, commitWithin);
// Send in separate threads. Choose random collection & solrClient
try (CloudSolrClient solrClient = getCloudSolrClient(cluster)) {
ExecutorService exec = ExecutorUtil.newMDCAwareFixedThreadPool(1 + random().nextInt(2),
new DefaultSolrThreadFactory(getTestName()));
List<Future<UpdateResponse>> futures = new ArrayList<>(solrInputDocuments.length);
for (SolrInputDocument solrInputDocument : solrInputDocuments) {
String col = collections.get(random().nextInt(collections.size()));
futures.add(exec.submit(() -> solrClient.add(col, solrInputDocument, commitWithin)));
}
for (Future<UpdateResponse> future : futures) {
assertUpdateResponse(future.get());
}
// at this point there shouldn't be any tasks running
assertEquals(0, exec.shutdownNow().size());
}
} else {
// send in a batch.
String col = collections.get(random().nextInt(collections.size()));
solrClient.add(col, Arrays.asList(solrInputDocuments), commitWithin);
try (CloudSolrClient solrClient = getCloudSolrClient(cluster)) {
assertUpdateResponse(solrClient.add(col, Arrays.asList(solrInputDocuments), commitWithin));
}
}
String col = collections.get(random().nextInt(collections.size()));
if (commitWithin == -1) {
@ -210,21 +268,30 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
}
}
private void assertUpdateResponse(UpdateResponse rsp) {
// use of TolerantUpdateProcessor can cause non-thrown "errors" that we need to check for
List errors = (List) rsp.getResponseHeader().get("errors");
assertTrue("Expected no errors: " + errors,errors == null || errors.isEmpty());
}
private int queryNumDocs() throws SolrServerException, IOException {
return (int) solrClient.query(alias, params("q", "*:*", "rows", "0")).getResults().getNumFound();
}
private void assertInvariants() throws IOException, SolrServerException {
private void assertInvariants(String... expectedColls) throws IOException, SolrServerException {
final int expectNumFound = lastDocId - numDocsDeletedOrFailed; //lastDocId is effectively # generated docs
final List<String> cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assert !cols.isEmpty();
assertArrayEquals("expected reverse sorted",
cols.stream().sorted(Collections.reverseOrder()).toArray(),
cols.toArray());
int totalNumFound = 0;
Instant colEndInstant = null; // exclusive end
for (String col : cols) {
for (String col : cols) { // ASSUMPTION: reverse sorted order
final Instant colStartInstant = TimeRoutedAliasUpdateProcessor.parseInstantFromCollectionName(alias, col);
//TODO do this in parallel threads
final QueryResponse colStatsResp = solrClient.query(col, params(
"q", "*:*",
"rows", "0",
@ -243,6 +310,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
colEndInstant = colStartInstant; // next older segment will max out at our current start time
}
assertEquals(expectNumFound, totalNumFound);
assertArrayEquals(expectedColls, cols.toArray());
}
private SolrInputDocument newDoc(Instant timestamp) {

View File

@ -78,6 +78,7 @@ public interface CollectionParams {
CREATEALIAS(true, LockLevel.COLLECTION),
DELETEALIAS(true, LockLevel.COLLECTION),
LISTALIASES(false, LockLevel.NONE),
ROUTEDALIAS_CREATECOLL(true, LockLevel.COLLECTION),
SPLITSHARD(true, LockLevel.SHARD),
DELETESHARD(true, LockLevel.SHARD),
CREATESHARD(true, LockLevel.COLLECTION),