SOLR-12357: TRA preemptiveCreateMath option.

Simplified test utility TrackingUpdateProcessorFactory.
Reverted some attempts the TRA used to make in avoiding overseer communication (too complicated).
Closes #433
This commit is contained in:
David Smiley 2018-09-06 23:38:44 -04:00
parent 9e04375dc1
commit 21d130c3ed
10 changed files with 550 additions and 252 deletions

View File

@ -216,6 +216,9 @@ New Features
* SOLR-12612: Cluster properties restriction of known keys only is relaxed, and now unknown properties starting with "ext." * SOLR-12612: Cluster properties restriction of known keys only is relaxed, and now unknown properties starting with "ext."
will be allowed. This allows custom to plugins set their own cluster properties. (Jeffery Yuan via Tomás Fernández Löbbe) will be allowed. This allows custom to plugins set their own cluster properties. (Jeffery Yuan via Tomás Fernández Löbbe)
* SOLR-12357: Time Routed Aliases now have a preemptiveCreateMath option to preemptively and asynchronously create the
next collection in advance as new data gets within this time window of the end. (Gus Heck, David Smiley)
Bug Fixes Bug Fixes
---------------------- ----------------------

View File

@ -75,7 +75,12 @@ public class MaintainRoutedAliasCmd implements OverseerCollectionMessageHandler.
this.ocmh = ocmh; this.ocmh = ocmh;
} }
/** Invokes this command from the client. If there's a problem it will throw an exception. */ /**
* Invokes this command from the client. If there's a problem it will throw an exception.
* Please note that is important to never add async to this invocation. This method must
* block (up to the standard OCP timeout) to prevent large batches of add's from sending a message
* to the overseer for every document added in TimeRoutedAliasUpdateProcessor.
*/
public static NamedList remoteInvoke(CollectionsHandler collHandler, String aliasName, String mostRecentCollName) public static NamedList remoteInvoke(CollectionsHandler collHandler, String aliasName, String mostRecentCollName)
throws Exception { throws Exception {
final String operation = CollectionParams.CollectionAction.MAINTAINROUTEDALIAS.toLower(); final String operation = CollectionParams.CollectionAction.MAINTAINROUTEDALIAS.toLower();

View File

@ -17,6 +17,7 @@
package org.apache.solr.cloud.api.collections; package org.apache.solr.cloud.api.collections;
import java.text.ParseException;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
@ -63,6 +64,7 @@ public class TimeRoutedAlias {
public static final String ROUTER_START = ROUTER_PREFIX + "start"; public static final String ROUTER_START = ROUTER_PREFIX + "start";
public static final String ROUTER_INTERVAL = ROUTER_PREFIX + "interval"; public static final String ROUTER_INTERVAL = ROUTER_PREFIX + "interval";
public static final String ROUTER_MAX_FUTURE = ROUTER_PREFIX + "maxFutureMs"; public static final String ROUTER_MAX_FUTURE = ROUTER_PREFIX + "maxFutureMs";
public static final String ROUTER_PREEMPTIVE_CREATE_MATH = ROUTER_PREFIX + "preemptiveCreateMath";
public static final String ROUTER_AUTO_DELETE_AGE = ROUTER_PREFIX + "autoDeleteAge"; public static final String ROUTER_AUTO_DELETE_AGE = ROUTER_PREFIX + "autoDeleteAge";
public static final String CREATE_COLLECTION_PREFIX = "create-collection."; public static final String CREATE_COLLECTION_PREFIX = "create-collection.";
// plus TZ and NAME // plus TZ and NAME
@ -84,6 +86,7 @@ public class TimeRoutedAlias {
public static final List<String> OPTIONAL_ROUTER_PARAMS = Collections.unmodifiableList(Arrays.asList( public static final List<String> OPTIONAL_ROUTER_PARAMS = Collections.unmodifiableList(Arrays.asList(
ROUTER_MAX_FUTURE, ROUTER_MAX_FUTURE,
ROUTER_AUTO_DELETE_AGE, ROUTER_AUTO_DELETE_AGE,
ROUTER_PREEMPTIVE_CREATE_MATH,
TZ)); // kinda special TZ)); // kinda special
static Predicate<String> PARAM_IS_PROP = static Predicate<String> PARAM_IS_PROP =
@ -126,6 +129,7 @@ public class TimeRoutedAlias {
private final String routeField; private final String routeField;
private final String intervalMath; // ex: +1DAY private final String intervalMath; // ex: +1DAY
private final long maxFutureMs; private final long maxFutureMs;
private final String preemptiveCreateMath;
private final String autoDeleteAgeMath; // ex: /DAY-30DAYS *optional* private final String autoDeleteAgeMath; // ex: /DAY-30DAYS *optional*
private final TimeZone timeZone; private final TimeZone timeZone;
@ -141,6 +145,9 @@ public class TimeRoutedAlias {
//optional: //optional:
maxFutureMs = params.getLong(ROUTER_MAX_FUTURE, TimeUnit.MINUTES.toMillis(10)); maxFutureMs = params.getLong(ROUTER_MAX_FUTURE, TimeUnit.MINUTES.toMillis(10));
// the date math configured is an interval to be subtracted from the most recent collection's time stamp
String pcmTmp = params.get(ROUTER_PREEMPTIVE_CREATE_MATH);
preemptiveCreateMath = pcmTmp != null ? (pcmTmp.startsWith("-") ? pcmTmp : "-" + pcmTmp) : null;
autoDeleteAgeMath = params.get(ROUTER_AUTO_DELETE_AGE); // no default autoDeleteAgeMath = params.get(ROUTER_AUTO_DELETE_AGE); // no default
timeZone = TimeZoneUtils.parseTimezone(aliasMetadata.get(CommonParams.TZ)); timeZone = TimeZoneUtils.parseTimezone(aliasMetadata.get(CommonParams.TZ));
@ -167,6 +174,13 @@ public class TimeRoutedAlias {
throw new SolrException(BAD_REQUEST, "bad " + TimeRoutedAlias.ROUTER_AUTO_DELETE_AGE + ", " + e, e); throw new SolrException(BAD_REQUEST, "bad " + TimeRoutedAlias.ROUTER_AUTO_DELETE_AGE + ", " + e, e);
} }
} }
if (preemptiveCreateMath != null) {
try {
new DateMathParser().parseMath(preemptiveCreateMath);
} catch (ParseException e) {
throw new SolrException(BAD_REQUEST, "Invalid date math for preemptiveCreateMath:" + preemptiveCreateMath);
}
}
if (maxFutureMs < 0) { if (maxFutureMs < 0) {
throw new SolrException(BAD_REQUEST, ROUTER_MAX_FUTURE + " must be >= 0"); throw new SolrException(BAD_REQUEST, ROUTER_MAX_FUTURE + " must be >= 0");
@ -189,6 +203,10 @@ public class TimeRoutedAlias {
return maxFutureMs; return maxFutureMs;
} }
public String getPreemptiveCreateWindow() {
return preemptiveCreateMath;
}
public String getAutoDeleteAgeMath() { public String getAutoDeleteAgeMath() {
return autoDeleteAgeMath; return autoDeleteAgeMath;
} }
@ -204,6 +222,7 @@ public class TimeRoutedAlias {
.add("routeField", routeField) .add("routeField", routeField)
.add("intervalMath", intervalMath) .add("intervalMath", intervalMath)
.add("maxFutureMs", maxFutureMs) .add("maxFutureMs", maxFutureMs)
.add("preemptiveCreateMath", preemptiveCreateMath)
.add("autoDeleteAgeMath", autoDeleteAgeMath) .add("autoDeleteAgeMath", autoDeleteAgeMath)
.add("timeZone", timeZone) .add("timeZone", timeZone)
.toString(); .toString();

View File

@ -50,7 +50,7 @@ public class DistributedUpdateProcessorFactory
public UpdateRequestProcessor getInstance(SolrQueryRequest req, public UpdateRequestProcessor getInstance(SolrQueryRequest req,
SolrQueryResponse rsp, UpdateRequestProcessor next) { SolrQueryResponse rsp, UpdateRequestProcessor next) {
// note: will sometimes return DURP (no overhead) instead of wrapping // note: will sometimes return DURP (no overhead) instead of wrapping
return TimeRoutedAliasUpdateProcessor.wrap(req, rsp, return TimeRoutedAliasUpdateProcessor.wrap(req,
new DistributedUpdateProcessor(req, rsp, next)); new DistributedUpdateProcessor(req, rsp, next));
} }

View File

@ -19,17 +19,15 @@ package org.apache.solr.update.processor;
import java.io.IOException; import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.text.ParseException;
import java.time.Instant; import java.time.Instant;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController; import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.api.collections.MaintainRoutedAliasCmd; import org.apache.solr.cloud.api.collections.MaintainRoutedAliasCmd;
import org.apache.solr.cloud.api.collections.TimeRoutedAlias; import org.apache.solr.cloud.api.collections.TimeRoutedAlias;
@ -47,19 +45,24 @@ import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.admin.CollectionsHandler; import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.SchemaField; import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.AddUpdateCommand; import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand; import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand; import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.SolrCmdDistributor; import org.apache.solr.update.SolrCmdDistributor;
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase; import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
import org.apache.solr.util.DateMathParser;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.solr.handler.admin.CollectionsHandler.DEFAULT_COLLECTION_OP_TIMEOUT; import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.solr.common.util.ExecutorUtil.newMDCAwareSingleThreadExecutor;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM; import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.CreationType.ASYNC_PREEMPTIVE;
import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.CreationType.NONE;
import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.CreationType.SYNCHRONOUS;
/** /**
* Distributes update requests to a rolling series of collections partitioned by a timestamp field. Issues * Distributes update requests to a rolling series of collections partitioned by a timestamp field. Issues
@ -73,33 +76,31 @@ import static org.apache.solr.update.processor.DistributingUpdateProcessorFactor
* @since 7.2.0 * @since 7.2.0
*/ */
public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor { public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
//TODO do we make this more generic to others who want to partition collections using something else? //TODO do we make this more generic to others who want to partition collections using something else besides time?
public static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param
private static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
// To avoid needless/redundant concurrent communication with the Overseer from this JVM, we // refs to std infrastructure
// maintain a Semaphore from an alias name keyed ConcurrentHashMap. private final SolrQueryRequest req;
// Alternatively a Lock or CountDownLatch could have been used but they didn't seem
// to make it any easier.
private static ConcurrentHashMap<String, Semaphore> aliasToSemaphoreMap = new ConcurrentHashMap<>(4);
private final String thisCollection;
private final TimeRoutedAlias timeRoutedAlias;
private final ZkController zkController;
private final SolrCmdDistributor cmdDistrib; private final SolrCmdDistributor cmdDistrib;
private final CollectionsHandler collHandler; private final CollectionsHandler collHandler;
private final SolrParams outParamsToLeader; private final ZkController zkController;
private final CloudDescriptor cloudDesc;
// Stuff specific to this class
private final String thisCollection;
private final TimeRoutedAlias timeRoutedAlias;
private final SolrParams outParamsToLeader;
// These two fields may be updated within the calling thread during processing but should
// never be updated by any async creation thread.
private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection. Sorted descending private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection. Sorted descending
private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
private SolrQueryRequest req;
public static UpdateRequestProcessor wrap(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { // This will be updated out in async creation threads see preemptiveAsync(Runnable r) for details
private volatile ExecutorService preemptiveCreationExecutor;
public static UpdateRequestProcessor wrap(SolrQueryRequest req, UpdateRequestProcessor next) {
//TODO get from "Collection property" //TODO get from "Collection property"
final String aliasName = req.getCore().getCoreDescriptor() final String aliasName = req.getCore().getCoreDescriptor()
.getCoreProperty(TimeRoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, null); .getCoreProperty(TimeRoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, null);
@ -113,18 +114,17 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
// if shardDistribPhase is not NONE, then the phase is after the scope of this URP // if shardDistribPhase is not NONE, then the phase is after the scope of this URP
return next; return next;
} else { } else {
return new TimeRoutedAliasUpdateProcessor(req, rsp, next, aliasName, aliasDistribPhase); return new TimeRoutedAliasUpdateProcessor(req, next, aliasName, aliasDistribPhase);
} }
} }
protected TimeRoutedAliasUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next, private TimeRoutedAliasUpdateProcessor(SolrQueryRequest req, UpdateRequestProcessor next,
String aliasName, String aliasName,
DistribPhase aliasDistribPhase) { DistribPhase aliasDistribPhase) {
super(next); super(next);
assert aliasDistribPhase == DistribPhase.NONE; assert aliasDistribPhase == DistribPhase.NONE;
final SolrCore core = req.getCore(); final SolrCore core = req.getCore();
cloudDesc = core.getCoreDescriptor().getCloudDescriptor(); this.thisCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
this.thisCollection = cloudDesc.getCollectionName();
this.req = req; this.req = req;
CoreContainer cc = core.getCoreContainer(); CoreContainer cc = core.getCoreContainer();
zkController = cc.getZkController(); zkController = cc.getZkController();
@ -164,71 +164,141 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
@Override @Override
public void processAdd(AddUpdateCommand cmd) throws IOException { public void processAdd(AddUpdateCommand cmd) throws IOException {
SolrInputDocument solrInputDocument = cmd.getSolrInputDocument(); final Instant docTimestamp =
final Object routeValue = solrInputDocument.getFieldValue(timeRoutedAlias.getRouteField()); parseRouteKey(cmd.getSolrInputDocument().getFieldValue(timeRoutedAlias.getRouteField()));
final Instant routeTimestamp = parseRouteKey(routeValue);
// TODO: maybe in some cases the user would want to ignore/warn instead?
if (docTimestamp.isAfter(Instant.now().plusMillis(timeRoutedAlias.getMaxFutureMs()))) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"The document's time routed key of " + docTimestamp + " is too far in the future given " +
TimeRoutedAlias.ROUTER_MAX_FUTURE + "=" + timeRoutedAlias.getMaxFutureMs());
}
// to avoid potential for race conditions, this next method should not get called again unless
// we have created a collection synchronously
updateParsedCollectionAliases(); updateParsedCollectionAliases();
String targetCollection;
do { // typically we don't loop; it's only when we need to create a collection
targetCollection = findTargetCollectionGivenTimestamp(routeTimestamp);
if (targetCollection == null) { String targetCollection = createCollectionsIfRequired(docTimestamp, cmd);
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Doc " + cmd.getPrintableId() + " couldn't be routed with " + timeRoutedAlias.getRouteField() + "=" + routeTimestamp);
}
// Note: the following rule is tempting but not necessary and is not compatible with
// only using this URP when the alias distrib phase is NONE; otherwise a doc may be routed to from a non-recent
// collection to the most recent only to then go there directly instead of realizing a new collection is needed.
// // If it's going to some other collection (not "this") then break to just send it there
// if (!thisCollection.equals(targetCollection)) {
// break;
// }
// Also tempting but not compatible: check that we're the leader, if not then break
// If the doc goes to the most recent collection then do some checks below, otherwise break the loop.
final Instant mostRecentCollTimestamp = parsedCollectionsDesc.get(0).getKey();
final String mostRecentCollName = parsedCollectionsDesc.get(0).getValue();
if (!mostRecentCollName.equals(targetCollection)) {
break;
}
// Check the doc isn't too far in the future
final Instant maxFutureTime = Instant.now().plusMillis(timeRoutedAlias.getMaxFutureMs());
if (routeTimestamp.isAfter(maxFutureTime)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"The document's time routed key of " + routeValue + " is too far in the future given " +
TimeRoutedAlias.ROUTER_MAX_FUTURE + "=" + timeRoutedAlias.getMaxFutureMs());
}
// Create a new collection?
final Instant nextCollTimestamp = timeRoutedAlias.computeNextCollTimestamp(mostRecentCollTimestamp);
if (routeTimestamp.isBefore(nextCollTimestamp)) {
break; // thus we don't need another collection
}
createCollectionAfter(mostRecentCollName); // *should* throw if fails for some reason but...
final boolean updated = updateParsedCollectionAliases();
if (!updated) { // thus we didn't make progress...
// this is not expected, even in known failure cases, but we check just in case
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"We need to create a new time routed collection but for unknown reasons were unable to do so.");
}
// then retry the loop ...
} while(true);
assert targetCollection != null;
if (thisCollection.equals(targetCollection)) { if (thisCollection.equals(targetCollection)) {
// pass on through; we've reached the right collection // pass on through; we've reached the right collection
super.processAdd(cmd); super.processAdd(cmd);
} else { } else {
// send to the right collection // send to the right collection
SolrCmdDistributor.Node targetLeaderNode = routeDocToSlice(targetCollection, solrInputDocument); SolrCmdDistributor.Node targetLeaderNode = routeDocToSlice(targetCollection, cmd.getSolrInputDocument());
cmdDistrib.distribAdd(cmd, Collections.singletonList(targetLeaderNode), new ModifiableSolrParams(outParamsToLeader)); cmdDistrib.distribAdd(cmd, Collections.singletonList(targetLeaderNode), new ModifiableSolrParams(outParamsToLeader));
} }
} }
/**
* Create any required collections and return the name of the collection to which the current document should be sent.
*
* @param docTimestamp the date for the document taken from the field specified in the TRA config
* @param cmd The initial calculated destination collection.
* @return The name of the proper destination collection for the document which may or may not be a
* newly created collection
*/
private String createCollectionsIfRequired(Instant docTimestamp, AddUpdateCommand cmd) {
// Even though it is possible that multiple requests hit this code in the 1-2 sec that
// it takes to create a collection, it's an established anti-pattern to feed data with a very large number
// of client connections. This in mind, we only guard against spamming the overseer within a batch of
// updates. We are intentionally tolerating a low level of redundant requests in favor of simpler code. Most
// super-sized installations with many update clients will likely be multi-tenant and multiple tenants
// probably don't write to the same alias. As such, we have deferred any solution to the "many clients causing
// collection creation simultaneously" problem until such time as someone actually has that problem in a
// real world use case that isn't just an anti-pattern.
Map.Entry<Instant, String> candidateCollectionDesc = findCandidateGivenTimestamp(docTimestamp, cmd.getPrintableId());
String candidateCollectionName = candidateCollectionDesc.getValue();
try {
switch (typeOfCreationRequired(docTimestamp, candidateCollectionDesc.getKey())) {
case SYNCHRONOUS:
// This next line blocks until all collections required by the current document have been created
return createAllRequiredCollections(docTimestamp, cmd.getPrintableId(), candidateCollectionDesc);
case ASYNC_PREEMPTIVE:
if (preemptiveCreationExecutor == null) {
// It's important not to add code between here and the prior call to findCandidateGivenTimestamp()
// in processAdd() that invokes updateParsedCollectionAliases(). Doing so would update parsedCollectionsDesc
// and create a race condition. We are relying on the fact that get(0) is returning the head of the parsed
// collections that existed when candidateCollectionDesc was created. If this class updates it's notion of
// parsedCollectionsDesc since candidateCollectionDesc was chosen, we could create collection n+2
// instead of collection n+1.
String mostRecentCollName = this.parsedCollectionsDesc.get(0).getValue();
// This line does not block and the document can be added immediately
preemptiveAsync(() -> createNextCollection(mostRecentCollName));
}
return candidateCollectionName;
case NONE:
return candidateCollectionName; // could use fall through, but fall through is fiddly for later editors.
default:
throw unknownCreateType();
}
// do nothing if creationType == NONE
} catch (SolrException e) {
throw e;
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
private void preemptiveAsync(Runnable r) {
// Note: creating an executor and throwing it away is slightly expensive, but this is only likely to happen
// once per hour/day/week (depending on time slice size for the TRA). If the executor were retained, it
// would need to be shut down in a close hook to avoid test failures due to thread leaks in tests which is slightly
// more complicated from a code maintenance and readability stand point. An executor must used instead of a
// thread to ensure we pick up the proper MDC logging stuff from ExecutorUtil.
DefaultSolrThreadFactory threadFactory = new DefaultSolrThreadFactory("TRA-preemptive-creation");
preemptiveCreationExecutor = newMDCAwareSingleThreadExecutor(threadFactory);
preemptiveCreationExecutor.execute(() -> {
r.run();
preemptiveCreationExecutor.shutdown();
preemptiveCreationExecutor = null;
});
}
/**
* Determine if the a new collection will be required based on the document timestamp. Passing null for
* preemptiveCreateInterval tells you if the document is beyond all existing collections with a response of
* {@link CreationType#NONE} or {@link CreationType#SYNCHRONOUS}, and passing a valid date math for
* preemptiveCreateMath additionally distinguishes the case where the document is close enough to the end of
* the TRA to trigger preemptive creation but not beyond all existing collections with a value of
* {@link CreationType#ASYNC_PREEMPTIVE}.
*
* @param docTimeStamp The timestamp from the document
* @param targetCollectionTimestamp The timestamp for the presently selected destination collection
* @return a {@code CreationType} indicating if and how to create a collection
*/
private CreationType typeOfCreationRequired(Instant docTimeStamp, Instant targetCollectionTimestamp) {
final Instant nextCollTimestamp = timeRoutedAlias.computeNextCollTimestamp(targetCollectionTimestamp);
if (!docTimeStamp.isBefore(nextCollTimestamp)) {
// current document is destined for a collection that doesn't exist, must create the destination
// to proceed with this add command
return SYNCHRONOUS;
}
if (isNotBlank(timeRoutedAlias.getPreemptiveCreateWindow())) {
Instant preemptNextColCreateTime =
calcPreemptNextColCreateTime(timeRoutedAlias.getPreemptiveCreateWindow(), nextCollTimestamp);
if (!docTimeStamp.isBefore(preemptNextColCreateTime)) {
return ASYNC_PREEMPTIVE;
}
}
return NONE;
}
private Instant calcPreemptNextColCreateTime(String preemptiveCreateMath, Instant nextCollTimestamp) {
DateMathParser dateMathParser = new DateMathParser();
dateMathParser.setNow(Date.from(nextCollTimestamp));
try {
return dateMathParser.parseMath(preemptiveCreateMath).toInstant();
} catch (ParseException e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Invalid Preemptive Create Window Math:'" + preemptiveCreateMath + '\'', e);
}
}
private Instant parseRouteKey(Object routeKey) { private Instant parseRouteKey(Object routeKey) {
final Instant docTimestamp; final Instant docTimestamp;
if (routeKey instanceof Instant) { if (routeKey instanceof Instant) {
@ -261,61 +331,43 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
return false; return false;
} }
/** Given the route key, finds the collection. Returns null if too old to go in last one. */ /**
private String findTargetCollectionGivenTimestamp(Instant docTimestamp) { * Given the route key, finds the correct collection or returns the most recent collection if the doc
* is in the future. Future docs will potentially cause creation of a collection that does not yet exist
* or an error if they exceed the maxFutureMs setting.
*
* @throws SolrException if the doc is too old to be stored in the TRA
*/
private Map.Entry<Instant, String> findCandidateGivenTimestamp(Instant docTimestamp, String printableId) {
// Lookup targetCollection given route key. Iterates in reverse chronological order. // Lookup targetCollection given route key. Iterates in reverse chronological order.
// We're O(N) here but N should be small, the loop is fast, and usually looking for 1st. // We're O(N) here but N should be small, the loop is fast, and usually looking for 1st.
for (Map.Entry<Instant, String> entry : parsedCollectionsDesc) { for (Map.Entry<Instant, String> entry : parsedCollectionsDesc) {
Instant colStartTime = entry.getKey(); Instant colStartTime = entry.getKey();
if (!docTimestamp.isBefore(colStartTime)) { // i.e. docTimeStamp is >= the colStartTime if (!docTimestamp.isBefore(colStartTime)) { // i.e. docTimeStamp is >= the colStartTime
return entry.getValue(); //found it return entry; //found it
} }
} }
return null; //not found throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Doc " + printableId + " couldn't be routed with " + timeRoutedAlias.getRouteField() + "=" + docTimestamp);
} }
private void createCollectionAfter(String mostRecentCollName) { private void createNextCollection(String mostRecentCollName) {
// Invoke ROUTEDALIAS_CREATECOLL (in the Overseer, locked by alias name). It will create the collection // Invoke ROUTEDALIAS_CREATECOLL (in the Overseer, locked by alias name). It will create the collection
// and update the alias contingent on the most recent collection name being the same as // and update the alias contingent on the most recent collection name being the same as
// what we think so here, otherwise it will return (without error). // what we think so here, otherwise it will return (without error).
try {
MaintainRoutedAliasCmd.remoteInvoke(collHandler, getAliasName(), mostRecentCollName);
// we don't care about the response. It's possible no collection was created because
// of a race and that's okay... we'll ultimately retry any way.
// (see docs on aliasToSemaphoreMap) // Ensure our view of the aliases has updated. If we didn't do this, our zkStateReader might
final Semaphore semaphore = aliasToSemaphoreMap.computeIfAbsent(getAliasName(), n -> new Semaphore(1)); // not yet know about the new alias (thus won't see the newly added collection to it), and we might think
if (semaphore.tryAcquire()) { // we failed.
try { zkController.getZkStateReader().aliasesManager.update();
MaintainRoutedAliasCmd.remoteInvoke(collHandler, getAliasName(), mostRecentCollName); } catch (RuntimeException e) {
// we don't care about the response. It's possible no collection was created because throw e;
// of a race and that's okay... we'll ultimately retry any way. } catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
// Ensure our view of the aliases has updated. If we didn't do this, our zkStateReader might
// not yet know about the new alias (thus won't see the newly added collection to it), and we might think
// we failed.
zkController.getZkStateReader().aliasesManager.update();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} finally {
semaphore.release(); // to signal we're done to anyone waiting on it
}
} else {
// Failed to acquire permit because another URP instance on this JVM is creating a collection.
// So wait till it's available
log.debug("Collection creation is already in progress so we'll wait then try again.");
try {
if (semaphore.tryAcquire(DEFAULT_COLLECTION_OP_TIMEOUT, TimeUnit.MILLISECONDS)) {
semaphore.release(); // we don't actually want a permit so give it back
// return to continue...
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Waited too long for another update thread to be done with collection creation.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Interrupted waiting on collection creation.", e); // if we were interrupted, give up.
}
} }
} }
@ -404,4 +456,61 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
collection, slice.getName(), DistributedUpdateProcessor.MAX_RETRIES_ON_FORWARD_DEAULT); collection, slice.getName(), DistributedUpdateProcessor.MAX_RETRIES_ON_FORWARD_DEAULT);
} }
/**
* Create as many collections as required. This method loops to allow for the possibility that the docTimestamp
* requires more than one collection to be created. Since multiple threads may be invoking maintain on separate
* requests to the same alias, we must pass in the name of the collection that this thread believes to be the most
* recent collection. This assumption is checked when the command is executed in the overseer. When this method
* finds that all collections required have been created it returns the (possibly new) most recent collection.
* The return value is ignored by the calling code in the async preemptive case.
*
* @param docTimestamp the timestamp from the document that determines routing
* @param printableId an identifier for the add command used in error messages
* @param targetCollectionDesc the descriptor for the presently selected collection which should also be
* the most recent collection in all cases where this method is invoked.
* @return The latest collection, including collections created during maintenance
*/
private String createAllRequiredCollections( Instant docTimestamp, String printableId,
Map.Entry<Instant, String> targetCollectionDesc) {
do {
switch(typeOfCreationRequired(docTimestamp, targetCollectionDesc.getKey())) {
case NONE:
return targetCollectionDesc.getValue(); // we don't need another collection
case ASYNC_PREEMPTIVE:
// can happen when preemptive interval is longer than one time slice
String mostRecentCollName = this.parsedCollectionsDesc.get(0).getValue();
preemptiveAsync(() -> createNextCollection(mostRecentCollName));
return targetCollectionDesc.getValue();
case SYNCHRONOUS:
createNextCollection(targetCollectionDesc.getValue()); // *should* throw if fails for some reason but...
if (!updateParsedCollectionAliases()) { // thus we didn't make progress...
// this is not expected, even in known failure cases, but we check just in case
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"We need to create a new time routed collection but for unknown reasons were unable to do so.");
}
// then retry the loop ... have to do find again in case other requests also added collections
// that were made visible when we called updateParsedCollectionAliases()
targetCollectionDesc = findCandidateGivenTimestamp(docTimestamp, printableId);
break;
default:
throw unknownCreateType();
}
} while (true);
}
private SolrException unknownCreateType() {
return new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown creation type while adding " +
"document to a Time Routed Alias! This is a bug caused when a creation type has been added but " +
"not all code has been updated to handle it.");
}
enum CreationType {
NONE,
ASYNC_PREEMPTIVE,
SYNCHRONOUS
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.solr.update.processor; package org.apache.solr.update.processor;
import java.io.IOException; import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Instant; import java.time.Instant;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.ArrayList; import java.util.ArrayList;
@ -33,7 +34,6 @@ import java.util.concurrent.Future;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.embedded.JettySolrRunner;
@ -54,6 +54,7 @@ import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.ExecutorUtil;
@ -69,15 +70,20 @@ import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase { public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
static final String configName = "timeConfig"; private static final String configName = "timeConfig";
static final String alias = "myalias"; private static final String alias = "myalias";
static final String timeField = "timestamp_dt"; private static final String timeField = "timestamp_dt";
static final String intField = "integer_i"; private static final String intField = "integer_i";
static SolrClient solrClient; private static CloudSolrClient solrClient;
private int lastDocId = 0; private int lastDocId = 0;
private int numDocsDeletedOrFailed = 0; private int numDocsDeletedOrFailed = 0;
@ -88,13 +94,11 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
} }
@Before @Before
public void doBefore() throws Exception { public void doBefore() {
solrClient = getCloudSolrClient(cluster); solrClient = getCloudSolrClient(cluster);
//log this to help debug potential causes of problems //log this to help debug potential causes of problems
System.out.println("SolrClient: " + solrClient); log.info("SolrClient: {}", solrClient);
if (solrClient instanceof CloudSolrClient) { log.info("ClusterStateProvider {}",solrClient.getClusterStateProvider());
System.out.println(((CloudSolrClient) solrClient).getClusterStateProvider());
}
} }
@After @After
@ -231,10 +235,9 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
" 'add-updateprocessor' : {" + " 'add-updateprocessor' : {" +
" 'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" + " 'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" +
" }," + " }," +
// if other tracking tests are written, add another TUPF with a unique group name, don't re-use this one!
// See TrackingUpdateProcessorFactory javadocs for details... // See TrackingUpdateProcessorFactory javadocs for details...
" 'add-updateprocessor' : {" + " 'add-updateprocessor' : {" +
" 'name':'tracking-testSliceRouting', 'class':'solr.TrackingUpdateProcessorFactory', 'group':'testSliceRouting'" + " 'name':'tracking-testSliceRouting', 'class':'solr.TrackingUpdateProcessorFactory', 'group':'" + getTrackUpdatesGroupName() + "'" +
" }," + " }," +
" 'add-updateprocessor' : {" + // for testing " 'add-updateprocessor' : {" + // for testing
" 'name':'inc', 'class':'" + IncrementURPFactory.class.getName() + "'," + " 'name':'inc', 'class':'" + IncrementURPFactory.class.getName() + "'," +
@ -265,8 +268,8 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
* @throws Exception when it blows up unexpectedly :) * @throws Exception when it blows up unexpectedly :)
*/ */
@Slow @Slow
@Nightly
@Test @Test
@LogLevel("org.apache.solr.update.processor.TrackingUpdateProcessorFactory=DEBUG")
public void testSliceRouting() throws Exception { public void testSliceRouting() throws Exception {
String configName = TimeRoutedAliasUpdateProcessorTest.configName + getTestName(); String configName = TimeRoutedAliasUpdateProcessorTest.configName + getTestName();
createConfigSet(configName); createConfigSet(configName);
@ -294,8 +297,10 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
// leader randomly and not causing a failure if the code is broken, but as a whole this test will therefore only have // leader randomly and not causing a failure if the code is broken, but as a whole this test will therefore only have
// about a 3.6% false positive rate (0.33^3). If that's not good enough, add more docs or more replicas per shard :). // about a 3.6% false positive rate (0.33^3). If that's not good enough, add more docs or more replicas per shard :).
final String trackGroupName = getTrackUpdatesGroupName();
final List<UpdateCommand> updateCommands;
try { try {
TrackingUpdateProcessorFactory.startRecording(getTestName()); TrackingUpdateProcessorFactory.startRecording(trackGroupName);
// cause some collections to be created // cause some collections to be created
@ -306,7 +311,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
sdoc("id", "4", "timestamp_dt", "2017-10-23T00:00:00Z")), sdoc("id", "4", "timestamp_dt", "2017-10-23T00:00:00Z")),
params)); params));
} finally { } finally {
TrackingUpdateProcessorFactory.stopRecording(getTestName()); updateCommands = TrackingUpdateProcessorFactory.stopRecording(trackGroupName);
} }
try (CloudSolrClient cloudSolrClient = getCloudSolrClient(cluster)) { try (CloudSolrClient cloudSolrClient = getCloudSolrClient(cluster)) {
@ -315,7 +320,6 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
Set<String> leaders = getLeaderCoreNames(clusterStateProvider.getClusterState()); Set<String> leaders = getLeaderCoreNames(clusterStateProvider.getClusterState());
assertEquals("should have " + 3 * numShards + " leaders, " + numShards + " per collection", 3 * numShards, leaders.size()); assertEquals("should have " + 3 * numShards + " leaders, " + numShards + " per collection", 3 * numShards, leaders.size());
List<UpdateCommand> updateCommands = TrackingUpdateProcessorFactory.commandsForGroup(getTestName());
assertEquals(3, updateCommands.size()); assertEquals(3, updateCommands.size());
for (UpdateCommand updateCommand : updateCommands) { for (UpdateCommand updateCommand : updateCommands) {
String node = (String) updateCommand.getReq().getContext().get(TrackingUpdateProcessorFactory.REQUEST_NODE); String node = (String) updateCommand.getReq().getContext().get(TrackingUpdateProcessorFactory.REQUEST_NODE);
@ -324,6 +328,179 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
} }
} }
/** @see TrackingUpdateProcessorFactory */
private String getTrackUpdatesGroupName() {
return getTestName();
}
@Test
@Slow
public void testPreemptiveCreation() throws Exception {
String configName = TimeRoutedAliasUpdateProcessorTest.configName + getTestName();
createConfigSet(configName);
final int numShards = 1 ;
final int numReplicas = 1 ;
CollectionAdminRequest.createTimeRoutedAlias(alias, "2017-10-23T00:00:00Z", "+1DAY", timeField,
CollectionAdminRequest.createCollection("_unused_", configName, numShards, numReplicas)
.setMaxShardsPerNode(numReplicas)).setPreemptiveCreateWindow("3HOUR")
.process(solrClient);
// cause some collections to be created
assertUpdateResponse(solrClient.add(alias,
sdoc("id","1","timestamp_dt", "2017-10-25T00:00:00Z")
));
assertUpdateResponse(solrClient.commit(alias));
// wait for all the collections to exist...
waitCol("2017-10-23", numShards); // This one should have already existed from the alias creation
waitCol("2017-10-24", numShards); // Create 1
waitCol("2017-10-25", numShards); // Create 2nd synchronously (ensure this is not broken)
// normal update, nothing special, no collection creation required.
List<String> cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(3,cols.size());
assertNumDocs("2017-10-23", 0);
assertNumDocs("2017-10-24", 0);
assertNumDocs("2017-10-25", 1);
// cause some collections to be created
ModifiableSolrParams params = params();
assertUpdateResponse(add(alias, Arrays.asList(
sdoc("id", "2", "timestamp_dt", "2017-10-24T00:00:00Z"),
sdoc("id", "3", "timestamp_dt", "2017-10-25T00:00:00Z"),
sdoc("id", "4", "timestamp_dt", "2017-10-23T00:00:00Z"),
sdoc("id", "5", "timestamp_dt", "2017-10-25T23:00:00Z")), // should cause preemptive creation
params));
assertUpdateResponse(solrClient.commit(alias));
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(3,cols.size());
assertNumDocs("2017-10-23", 1);
assertNumDocs("2017-10-24", 1);
assertNumDocs("2017-10-25", 3);
assertUpdateResponse(add(alias, Collections.singletonList(
sdoc("id", "6", "timestamp_dt", "2017-10-25T23:01:00Z")), // might cause duplicate preemptive creation
params));
assertUpdateResponse(solrClient.commit(alias));
waitCol("2017-10-26", numShards);
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(4,cols.size());
assertNumDocs("2017-10-23", 1);
assertNumDocs("2017-10-24", 1);
assertNumDocs("2017-10-25", 4);
assertNumDocs("2017-10-26", 0);
// now test with pre-create window longer than time slice, and forcing multiple creations.
CollectionAdminRequest.setAliasProperty(alias)
.addProperty(TimeRoutedAlias.ROUTER_PREEMPTIVE_CREATE_MATH, "3DAY").process(solrClient);
assertUpdateResponse(add(alias, Collections.singletonList(
sdoc("id", "7", "timestamp_dt", "2017-10-25T23:01:00Z")), // should cause preemptive creation now
params));
assertUpdateResponse(solrClient.commit(alias));
waitCol("2017-10-27", numShards);
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(5,cols.size()); // only one created in async case
assertNumDocs("2017-10-23", 1);
assertNumDocs("2017-10-24", 1);
assertNumDocs("2017-10-25", 5);
assertNumDocs("2017-10-26", 0);
assertNumDocs("2017-10-27", 0);
assertUpdateResponse(add(alias, Collections.singletonList(
sdoc("id", "8", "timestamp_dt", "2017-10-25T23:01:00Z")), // should cause preemptive creation now
params));
assertUpdateResponse(solrClient.commit(alias));
waitCol("2017-10-27", numShards);
waitCol("2017-10-28", numShards);
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(6,cols.size()); // Subsequent documents continue to create up to limit
assertNumDocs("2017-10-23", 1);
assertNumDocs("2017-10-24", 1);
assertNumDocs("2017-10-25", 6);
assertNumDocs("2017-10-26", 0);
assertNumDocs("2017-10-27", 0);
assertNumDocs("2017-10-28", 0);
QueryResponse resp;
resp = solrClient.query(alias, params(
"q", "*:*",
"rows", "10"));
assertEquals(8, resp.getResults().getNumFound());
assertUpdateResponse(add(alias, Arrays.asList(
sdoc("id", "9", "timestamp_dt", "2017-10-27T23:01:00Z"), // should cause preemptive creation
// If these are not ignored properly this test will fail during cleanup with a message about router.name being
// required. This happens because the test finishes while overseer threads are still trying to invoke maintain
// after the @After method has deleted collections and emptied out the aliases.... this leaves the maintain
// command cloning alias properties Aliases.EMPTY and thus not getting a value from router.name
// (normally router.name == 'time') The check for non-blank router.name happens to be the first validation.
// There is a small chance this could slip through without a fail occasionally, but it was 100% with just one
// of these.
sdoc("id", "10", "timestamp_dt", "2017-10-28T23:01:00Z"), // should be ignored due to in progress creation
sdoc("id", "11", "timestamp_dt", "2017-10-28T23:02:00Z"), // should be ignored due to in progress creation
sdoc("id", "12", "timestamp_dt", "2017-10-28T23:03:00Z")), // should be ignored due to in progress creation
params));
assertUpdateResponse(solrClient.commit(alias));
waitCol("2017-10-29", numShards);
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(7,cols.size());
assertNumDocs("2017-10-23", 1);
assertNumDocs("2017-10-24", 1);
assertNumDocs("2017-10-25", 6);
assertNumDocs("2017-10-26", 0);
assertNumDocs("2017-10-27", 1);
assertNumDocs("2017-10-28", 3); // should get through even though preemptive creation ignored it.
assertNumDocs("2017-10-29", 0);
resp = solrClient.query(alias, params(
"q", "*:*",
"rows", "0"));
assertEquals(12, resp.getResults().getNumFound());
// Sych creation with an interval longer than the time slice for the alias..
assertUpdateResponse(add(alias, Collections.singletonList(
sdoc("id", "13", "timestamp_dt", "2017-10-30T23:03:00Z")), // lucky?
params));
assertUpdateResponse(solrClient.commit(alias));
waitCol("2017-10-30", numShards);
waitCol("2017-10-31", numShards); // spooky! async case arising in middle of sync creation!!
cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
assertEquals(9,cols.size());
assertNumDocs("2017-10-23", 1);
assertNumDocs("2017-10-24", 1);
assertNumDocs("2017-10-25", 6);
assertNumDocs("2017-10-26", 0);
assertNumDocs("2017-10-27", 1);
assertNumDocs("2017-10-28", 3); // should get through even though preemptive creation ignored it.
assertNumDocs("2017-10-29", 0);
assertNumDocs("2017-10-30", 1);
assertNumDocs("2017-10-31", 0);
resp = solrClient.query(alias, params(
"q", "*:*",
"rows", "0"));
assertEquals(13, resp.getResults().getNumFound());
}
private void assertNumDocs(final String datePart, int expected) throws SolrServerException, IOException {
QueryResponse resp = solrClient.query(alias + "_" + datePart, params(
"q", "*:*",
"rows", "10"));
assertEquals(expected, resp.getResults().getNumFound());
}
private Set<String> getLeaderCoreNames(ClusterState clusterState) { private Set<String> getLeaderCoreNames(ClusterState clusterState) {
Set<String> leaders = new TreeSet<>(); // sorted just to make it easier to read when debugging... Set<String> leaders = new TreeSet<>(); // sorted just to make it easier to read when debugging...
List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners(); List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
@ -344,9 +521,28 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
return leaders; return leaders;
} }
private void waitCol(final String datePart, int slices) { private void waitCol(final String datePart, int slices) throws InterruptedException {
waitForState("waiting for collections to be created",alias + "_" + datePart, // collection to exist
(liveNodes, collectionState) -> collectionState.getActiveSlices().size() == slices); String collection = alias + "_" + datePart;
waitForState("waiting for collections to be created", collection,
(liveNodes, collectionState) -> {
if (collectionState == null) {
// per predicate javadoc, this is what we get if the collection doesn't exist at all.
return false;
}
Collection<Slice> activeSlices = collectionState.getActiveSlices();
int size = activeSlices.size();
return size == slices;
});
// and alias to be aware of collection
long start = System.nanoTime(); // mumble mumble precommit mumble mumble...
while (!cluster.getSolrClient().getZkStateReader().getAliases().getCollectionAliasListMap().get(alias).contains(collection)) {
if (NANOSECONDS.toSeconds(System.nanoTime() - start) > 10) {
fail("took over 10 seconds after collection creation to update aliases");
} else {
Thread.sleep(500);
}
}
} }
private void testFailedDocument(Instant timestamp, String errorMsg) throws SolrServerException, IOException { private void testFailedDocument(Instant timestamp, String errorMsg) throws SolrServerException, IOException {
@ -484,6 +680,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
} }
/** Adds the docs to Solr via {@link #solrClient} with the params */ /** Adds the docs to Solr via {@link #solrClient} with the params */
@SuppressWarnings("SameParameterValue")
private static UpdateResponse add(String collection, Collection<SolrInputDocument> docs, SolrParams params) throws SolrServerException, IOException { private static UpdateResponse add(String collection, Collection<SolrInputDocument> docs, SolrParams params) throws SolrServerException, IOException {
UpdateRequest req = new UpdateRequest(); UpdateRequest req = new UpdateRequest();
if (params != null) { if (params != null) {

View File

@ -16,18 +16,17 @@
*/ */
package org.apache.solr.update.processor; package org.apache.solr.update.processor;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.util.ConcurrentHashSet; import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.common.SolrException; import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.response.SolrQueryResponse;
@ -42,113 +41,60 @@ import org.slf4j.LoggerFactory;
/** /**
* This Factory is similar to {@link RecordingUpdateProcessorFactory}, but with the goal of * This Factory is similar to {@link RecordingUpdateProcessorFactory}, but with the goal of
* tracking requests across multiple collections/shards/replicas in a CloudSolrTestCase. * tracking requests across multiple collections/shards/replicas in a {@link SolrCloudTestCase}.
* It can optionally save references to the commands it receives inm a single global * It can optionally save references to the commands it receives inm a single global
* Map&lt;String,BlockingQueue&gt; keys in the map are arbitrary, but the intention is that tests * Map&lt;String,BlockingQueue&gt; keys in the map are arbitrary, but the intention is that tests
* generate a key that is unique to that test, and configure the factory with the key as "group name" * generate a key that is unique to that test, and configure the factory with the key as "group name"
* to avoid cross talk between tests. Tests can poll for requests from a group to observe that the expected * to avoid cross talk between tests. Tests can poll for requests from a group to observe that the expected
* commands are executed. By default, this factory does nothing except return the "next" * commands are executed. By default, this factory does nothing except return the "next"
* processor from the chain unless it's told to {@link #startRecording()} in which case all factories * processor from the chain unless it's told to {@link #startRecording(String)} in which case all factories
* with the same group will begin recording. It is critical that tests utilizing this * with the same group will begin recording.
* processor call {@link #close()} on at least one group member after the test finishes. The requests associated with
* the commands are also provided with a
* *
* This class is only for unit test purposes and should not be used in any production capacity. It presumes all nodes * This class is only for unit test purposes and should not be used in any production capacity. It presumes all nodes
* exist within the same JVM (i. e. MiniSolrCloudCluster). * exist within the same JVM (i.e. {@link MiniSolrCloudCluster}).
*/ */
public final class TrackingUpdateProcessorFactory public final class TrackingUpdateProcessorFactory
extends UpdateRequestProcessorFactory implements Closeable { extends UpdateRequestProcessorFactory {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String REQUEST_COUNT = "TrackingUpdateProcessorRequestCount"; public static final String REQUEST_COUNT = "TrackingUpdateProcessorRequestCount";
public static final String REQUEST_NODE = "TrackingUpdateProcessorRequestNode"; public static final String REQUEST_NODE = "TrackingUpdateProcessorRequestNode";
private final static Map<String,Set<TrackingUpdateProcessorFactory>> groupMembership = new ConcurrentHashMap<>();
private final static Map<String,AtomicInteger> groupSerialNums = new ConcurrentHashMap<>();
/** /**
* The map of group queues containing commands that were recorded * The map of group queues containing commands that were recorded
* @see #startRecording * @see #startRecording
*/ */
private final static Map<String, List<UpdateCommand>> commandQueueMap = new ConcurrentHashMap<>(); private final static Map<String, List<UpdateCommand>> groupToCommands = new ConcurrentHashMap<>();
private static final Object memoryConsistency = new Object();
private volatile boolean recording = false;
private String group = "default"; private String group = "default";
/** public static void startRecording(String group) {
* Get a copy of the queue for the group. final List<UpdateCommand> updateCommands = groupToCommands.get(group);
* assert updateCommands == null || updateCommands.isEmpty();
* @param group the name of the group to fetch
* @return A cloned queue containing the same elements as the queue held in commandQueueMap List<UpdateCommand> existing = groupToCommands.put(group, Collections.synchronizedList(new ArrayList<>()));
*/ assert existing == null : "Test cross-talk?";
public static ArrayList<UpdateCommand> commandsForGroup(String group) {
synchronized (memoryConsistency) {
return new ArrayList<>(commandQueueMap.get(group));
}
} }
public static void startRecording(String group) { /**
synchronized (memoryConsistency) { *
Set<TrackingUpdateProcessorFactory> trackingUpdateProcessorFactories = groupMembership.get(group); * @param group the name of the group to fetch
if (trackingUpdateProcessorFactories == null || trackingUpdateProcessorFactories.isEmpty()) { * @return A cloned queue containing the same elements as the queue held in groupToCommands
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "There are no trackingUpdateProcessors for group " + group); */
} public static List<UpdateCommand> stopRecording(String group) {
for (TrackingUpdateProcessorFactory trackingUpdateProcessorFactory : trackingUpdateProcessorFactories) { List<UpdateCommand> commands = groupToCommands.remove(group);
trackingUpdateProcessorFactory.startRecording(); return Arrays.asList(commands.toArray(new UpdateCommand[0])); // safe copy. input list is synchronized
}
}
}
public static void stopRecording(String group) {
synchronized (memoryConsistency) {
Set<TrackingUpdateProcessorFactory> trackingUpdateProcessorFactories = groupMembership.get(group);
if (trackingUpdateProcessorFactories == null || trackingUpdateProcessorFactories.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "There are no trackingUpdateProcessors for group "
+ group + " available groups are:" + groupMembership.keySet());
}
for (TrackingUpdateProcessorFactory trackingUpdateProcessorFactory : trackingUpdateProcessorFactories) {
trackingUpdateProcessorFactory.stopRecording();
}
}
} }
@Override @Override
public void init(NamedList args) { public void init(NamedList args) {
if (args != null && args.indexOf("group",0) >= 0) { if (args != null && args.indexOf("group",0) >= 0) {
group = (String) args.get("group"); group = (String) args.get("group");
log.debug("Init URP, group '{}'", group);
} else { } else {
log.warn("TrackingUpdateProcessorFactory initialized without group configuration, using 'default' but this group is shared" + log.warn("TrackingUpdateProcessorFactory initialized without group configuration, using 'default' but this group is shared" +
"across the entire VM and guaranteed to have unpredictable behavior if used by more than one test"); "across the entire VM and guaranteed to have unpredictable behavior if used by more than one test");
} }
// compute if absent to avoid replacing in the case of multiple "default"
commandQueueMap.computeIfAbsent(group, s -> new ArrayList<>());
groupMembership.computeIfAbsent(group,s-> new ConcurrentHashSet<>());
groupSerialNums.computeIfAbsent(group,s-> new AtomicInteger(0));
groupMembership.get(group).add(this);
}
/**
* @see #stopRecording
* @see #commandQueueMap
*/
public synchronized void startRecording() {
Set<TrackingUpdateProcessorFactory> facts = groupMembership.get(group);
// facts being null is a bug, all instances should have a group.
for (TrackingUpdateProcessorFactory fact : facts) {
fact.recording = true;
}
}
/** @see #startRecording */
public synchronized void stopRecording() {
Set<TrackingUpdateProcessorFactory> factories = groupMembership.get(group);
// facts being null is a bug, all instances should have a group.
for (TrackingUpdateProcessorFactory fact : factories) {
fact.recording = false;
}
} }
@Override @Override
@ -156,35 +102,26 @@ public final class TrackingUpdateProcessorFactory
public synchronized UpdateRequestProcessor getInstance(SolrQueryRequest req, public synchronized UpdateRequestProcessor getInstance(SolrQueryRequest req,
SolrQueryResponse rsp, SolrQueryResponse rsp,
UpdateRequestProcessor next ) { UpdateRequestProcessor next ) {
return recording ? new RecordingUpdateRequestProcessor(group, next) : next; final List<UpdateCommand> commands = groupToCommands.get(group);
} return commands == null ? next : new RecordingUpdateRequestProcessor(commands, next);
@Override
public void close() {
commandQueueMap.remove(group);
groupMembership.get(group).clear();
} }
private static final class RecordingUpdateRequestProcessor private static final class RecordingUpdateRequestProcessor
extends UpdateRequestProcessor { extends UpdateRequestProcessor {
private String group; private final List<UpdateCommand> groupCommands;
public RecordingUpdateRequestProcessor(String group, RecordingUpdateRequestProcessor(List<UpdateCommand> groupCommands, UpdateRequestProcessor next) {
UpdateRequestProcessor next) {
super(next); super(next);
this.group = group; this.groupCommands = groupCommands;
} }
private void record(UpdateCommand cmd) { private void record(UpdateCommand cmd) {
synchronized (memoryConsistency) { groupCommands.add(cmd.clone()); // important because cmd.clear() will be called
String coreName = cmd.getReq().getCore().getName();
Map<Object, Object> context = cmd.getReq().getContext(); Map<Object, Object> context = cmd.getReq().getContext();
context.put(REQUEST_COUNT, groupSerialNums.get(group).incrementAndGet()); context.put(REQUEST_COUNT, groupCommands.size());
context.put(REQUEST_NODE, coreName); context.put(REQUEST_NODE, cmd.getReq().getCore().getName());
List<UpdateCommand> commands = commandQueueMap.get(group);
commands.add(cmd.clone()); // important because cmd.clear() will be called
}
} }
@Override @Override
@ -212,13 +149,6 @@ public final class TrackingUpdateProcessorFactory
record(cmd); record(cmd);
super.processRollback(cmd); super.processRollback(cmd);
} }
@Override
protected void doClose() {
super.doClose();
groupMembership.get(group).remove(this);
}
} }
} }

View File

@ -623,6 +623,27 @@ without error. If there was no limit, than an erroneous value could trigger man
+ +
The default is 10 minutes. The default is 10 minutes.
`router.preemptiveCreateMath`::
A date math expression that results in early creation of new collections.
+
If a document arrives with a timestamp that is after the end time of the most recent collection minus this
interval, then the next (and only the next) collection will be created asynchronously. Without this setting, collections are created
synchronously when required by the document time stamp and thus block the flow of documents until the collection
is created (possibly several seconds). Preemptive creation reduces these hiccups. If set to enough time (perhaps
an hour or more) then if there are problems creating a collection, this window of time might be enough to take
corrective action. However after a successful preemptive creation, the collection is consuming resources without
being used, and new documents will tend to be routed through it only to be routed elsewhere. Also, note that
router.autoDeleteAge is currently evaluated relative to the date of a newly created collection, and so you may
want to increase the delete age by the preemptive window amount so that the oldest collection isn't deleted too
soon. Note that it has to be possible to subtract the interval specified from a date, so if prepending a
minus sign creates invalid date math, this will cause an error. Also note that a document that is itself
destined for a collection that does not exist will still trigger synchronous creation up to that destination collection
but will not trigger additional async preemptive creation. Only one type of collection creation can happen
per document.
Example: `90MINUTES`.
+
This property is blank by default indicating just-in-time, synchronous creation of new collections.
`router.autoDeleteAge`:: `router.autoDeleteAge`::
A date math expression that results in the oldest collections getting deleted automatically. A date math expression that results in the oldest collections getting deleted automatically.
+ +

View File

@ -1508,6 +1508,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
public static final String ROUTER_START = "router.start"; public static final String ROUTER_START = "router.start";
public static final String ROUTER_INTERVAL = "router.interval"; public static final String ROUTER_INTERVAL = "router.interval";
public static final String ROUTER_MAX_FUTURE = "router.maxFutureMs"; public static final String ROUTER_MAX_FUTURE = "router.maxFutureMs";
public static final String ROUTER_PREEMPTIVE_CREATE_WINDOW = "router.preemptiveCreateMath";
private final String aliasName; private final String aliasName;
private final String routerField; private final String routerField;
@ -1516,6 +1517,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
//Optional: //Optional:
private TimeZone tz; private TimeZone tz;
private Integer maxFutureMs; private Integer maxFutureMs;
private String preemptiveCreateMath;
private final Create createCollTemplate; private final Create createCollTemplate;
@ -1540,6 +1542,11 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
return this; return this;
} }
public CreateTimeRoutedAlias setPreemptiveCreateWindow(String preemptiveCreateMath) {
this.preemptiveCreateMath = preemptiveCreateMath;
return this;
}
@Override @Override
public SolrParams getParams() { public SolrParams getParams() {
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams(); ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
@ -1554,6 +1561,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
if (maxFutureMs != null) { if (maxFutureMs != null) {
params.add(ROUTER_MAX_FUTURE, ""+maxFutureMs); params.add(ROUTER_MAX_FUTURE, ""+maxFutureMs);
} }
if (preemptiveCreateMath != null) {
params.add(ROUTER_PREEMPTIVE_CREATE_WINDOW, preemptiveCreateMath);
}
// merge the above with collectionParams. Above takes precedence. // merge the above with collectionParams. Above takes precedence.
ModifiableSolrParams createCollParams = new ModifiableSolrParams(); // output target ModifiableSolrParams createCollParams = new ModifiableSolrParams(); // output target

View File

@ -168,6 +168,10 @@
"type": "integer", "type": "integer",
"description":"How many milliseconds into the future to accept document. Documents with a value in router.field that is greater than now() + maxFutureMs will be rejected to avoid provisioning too much resources." "description":"How many milliseconds into the future to accept document. Documents with a value in router.field that is greater than now() + maxFutureMs will be rejected to avoid provisioning too much resources."
}, },
"preemptiveCreateMath":{
"type": "string",
"description": "If a document arrives with a timestamp that is after the end time of the most recent collection minus this interval, then the next collection will be created asynchronously. Without this setting, collections are created synchronously when required by the document time stamp and thus block the flow of documents until the collection is created (possibly several seconds). Preemptive creation reduces these hiccups. If set to enough time (perhaps an hour or more) then if there are problems creating a collection, this window of time might be enough to take corrective action. However after a successful preemptive creation, the collection is consuming resources without being used, and new documents will tend to be routed through it only to be routed elsewhere. Also, note that router.autoDeleteAge is currently evaluated relative to the date of a newly created collection, and so you may want to increase the delete age by the preemptive window amount so that the oldest collection isn't deleted too soon."
},
"autoDeleteAge": { "autoDeleteAge": {
"type": "string", "type": "string",
"description": "A date math expressions yielding a time in the past. Collections covering a period of time entirely before this age will be automatically deleted." "description": "A date math expressions yielding a time in the past. Collections covering a period of time entirely before this age will be automatically deleted."