mirror of https://github.com/apache/lucene.git
SOLR-11925: Time Routed Aliases: router.autoDeleteAge feature
This commit is contained in:
parent
4700b1d304
commit
02b5172ea2
|
@ -158,6 +158,9 @@ New Features
|
|||
|
||||
* SOLR-11778: Add per-stage RequestHandler metrics. (ab)
|
||||
|
||||
* SOLR-11925: Time Routed Aliases can have their oldest collections automatically deleted via the "router.autoDeleteAge"
|
||||
setting. (David Smiley)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -84,7 +84,7 @@ public class CreateAliasCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
final List<String> canonicalCollectionList = parseCollectionsParameter(message.get("collections"));
|
||||
final String canonicalCollectionsString = StrUtils.join(canonicalCollectionList, ',');
|
||||
validateAllCollectionsExistAndNoDups(canonicalCollectionList, zkStateReader);
|
||||
zkStateReader.aliasesHolder
|
||||
zkStateReader.aliasesManager
|
||||
.applyModificationAndExportToZk(aliases -> aliases.cloneWithCollectionAlias(aliasName, canonicalCollectionsString));
|
||||
}
|
||||
|
||||
|
@ -121,12 +121,11 @@ public class CreateAliasCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
String initialCollectionName = TimeRoutedAlias.formatCollectionNameFromInstant(aliasName, startTime);
|
||||
|
||||
// Create the collection
|
||||
NamedList createResults = new NamedList();
|
||||
RoutedAliasCreateCollectionCmd.createCollectionAndWait(state, createResults, aliasName, aliasMetadata, initialCollectionName, ocmh);
|
||||
RoutedAliasCreateCollectionCmd.createCollectionAndWait(state, aliasName, aliasMetadata, initialCollectionName, ocmh);
|
||||
validateAllCollectionsExistAndNoDups(Collections.singletonList(initialCollectionName), zkStateReader);
|
||||
|
||||
// Create/update the alias
|
||||
zkStateReader.aliasesHolder.applyModificationAndExportToZk(aliases -> aliases
|
||||
zkStateReader.aliasesManager.applyModificationAndExportToZk(aliases -> aliases
|
||||
.cloneWithCollectionAlias(aliasName, initialCollectionName)
|
||||
.cloneWithCollectionAliasMetadata(aliasName, aliasMetadata));
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ public class DeleteAliasCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
String aliasName = message.getStr(NAME);
|
||||
|
||||
ZkStateReader zkStateReader = ocmh.zkStateReader;
|
||||
zkStateReader.aliasesHolder.applyModificationAndExportToZk(a -> a.cloneWithCollectionAlias(aliasName, null));
|
||||
zkStateReader.aliasesManager.applyModificationAndExportToZk(a -> a.cloneWithCollectionAlias(aliasName, null));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ public class ModifyAliasCmd implements Cmd {
|
|||
@SuppressWarnings("unchecked")
|
||||
Map<String, String> metadata = (Map<String, String>) message.get(META_DATA);
|
||||
|
||||
zkStateReader.aliasesHolder.applyModificationAndExportToZk(aliases1 -> {
|
||||
zkStateReader.aliasesManager.applyModificationAndExportToZk(aliases1 -> {
|
||||
for (Map.Entry<String, String> entry : metadata.entrySet()) {
|
||||
String key = entry.getKey();
|
||||
if ("".equals(key.trim())) {
|
||||
|
|
|
@ -18,11 +18,18 @@
|
|||
package org.apache.solr.cloud.api.collections;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.text.ParseException;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrResponse;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.Overseer;
|
||||
import org.apache.solr.cloud.OverseerSolrResponse;
|
||||
import org.apache.solr.common.SolrException;
|
||||
|
@ -32,10 +39,13 @@ import org.apache.solr.common.cloud.ZkNodeProps;
|
|||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.handler.admin.CollectionsHandler;
|
||||
import org.apache.solr.request.LocalSolrQueryRequest;
|
||||
import org.apache.solr.response.SolrQueryResponse;
|
||||
import org.apache.solr.util.DateMathParser;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -45,18 +55,20 @@ import static org.apache.solr.cloud.api.collections.TimeRoutedAlias.ROUTED_ALIAS
|
|||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
|
||||
/**
|
||||
* For "routed aliases", creates another collection and adds it to the alias. In some cases it will not
|
||||
* add a new collection.
|
||||
* If a collection is created, then collection creation info is returned.
|
||||
* (Internal) For "time routed aliases", both deletes old collections and creates new collections
|
||||
* associated with routed aliases.
|
||||
*
|
||||
* Note: this logic is within an Overseer because we want to leverage the mutual exclusion
|
||||
* property afforded by the lock it obtains on the alias name.
|
||||
*
|
||||
* @since 7.3
|
||||
* @lucene.internal
|
||||
*/
|
||||
// TODO rename class to MaintainRoutedAliasCmd
|
||||
public class RoutedAliasCreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
public static final String IF_MOST_RECENT_COLL_NAME = "ifMostRecentCollName";
|
||||
public static final String IF_MOST_RECENT_COLL_NAME = "ifMostRecentCollName"; //TODO rename to createAfter
|
||||
|
||||
private final OverseerCollectionMessageHandler ocmh;
|
||||
|
||||
|
@ -64,6 +76,21 @@ public class RoutedAliasCreateCollectionCmd implements OverseerCollectionMessage
|
|||
this.ocmh = ocmh;
|
||||
}
|
||||
|
||||
/** Invokes this command from the client. If there's a problem it will throw an exception. */
|
||||
public static NamedList remoteInvoke(CollectionsHandler collHandler, String aliasName, String mostRecentCollName)
|
||||
throws Exception {
|
||||
final String operation = CollectionParams.CollectionAction.ROUTEDALIAS_CREATECOLL.toLower();
|
||||
Map<String, Object> msg = new HashMap<>();
|
||||
msg.put(Overseer.QUEUE_OPERATION, operation);
|
||||
msg.put(CollectionParams.NAME, aliasName);
|
||||
msg.put(RoutedAliasCreateCollectionCmd.IF_MOST_RECENT_COLL_NAME, mostRecentCollName);
|
||||
final SolrResponse rsp = collHandler.sendToOCPQueue(new ZkNodeProps(msg));
|
||||
if (rsp.getException() != null) {
|
||||
throw rsp.getException();
|
||||
}
|
||||
return rsp.getResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
|
||||
//---- PARSE PRIMARY MESSAGE PARAMS
|
||||
|
@ -75,13 +102,12 @@ public class RoutedAliasCreateCollectionCmd implements OverseerCollectionMessage
|
|||
// TODO collection param (or intervalDateMath override?), useful for data capped collections
|
||||
|
||||
//---- PARSE ALIAS INFO FROM ZK
|
||||
final ZkStateReader.AliasesManager aliasesHolder = ocmh.zkStateReader.aliasesHolder;
|
||||
final Aliases aliases = aliasesHolder.getAliases();
|
||||
final ZkStateReader.AliasesManager aliasesManager = ocmh.zkStateReader.aliasesManager;
|
||||
final Aliases aliases = aliasesManager.getAliases();
|
||||
final Map<String, String> aliasMetadata = aliases.getCollectionAliasMetadata(aliasName);
|
||||
if (aliasMetadata == null) {
|
||||
throw newAliasMustExistException(aliasName); // if it did exist, we'd have a non-null map
|
||||
}
|
||||
|
||||
final TimeRoutedAlias timeRoutedAlias = new TimeRoutedAlias(aliasName, aliasMetadata);
|
||||
|
||||
final List<Map.Entry<Instant, String>> parsedCollections =
|
||||
|
@ -113,13 +139,21 @@ public class RoutedAliasCreateCollectionCmd implements OverseerCollectionMessage
|
|||
final Instant nextCollTimestamp = timeRoutedAlias.computeNextCollTimestamp(mostRecentCollTimestamp);
|
||||
final String createCollName = TimeRoutedAlias.formatCollectionNameFromInstant(aliasName, nextCollTimestamp);
|
||||
|
||||
//---- DELETE OLDEST COLLECTIONS AND REMOVE FROM ALIAS (if configured)
|
||||
NamedList deleteResults = deleteOldestCollectionsAndUpdateAlias(timeRoutedAlias, aliasesManager, nextCollTimestamp);
|
||||
if (deleteResults != null) {
|
||||
results.add("delete", deleteResults);
|
||||
}
|
||||
|
||||
//---- CREATE THE COLLECTION
|
||||
createCollectionAndWait(clusterState, results, aliasName, aliasMetadata, createCollName, ocmh);
|
||||
NamedList createResults = createCollectionAndWait(clusterState, aliasName, aliasMetadata,
|
||||
createCollName, ocmh);
|
||||
if (createResults != null) {
|
||||
results.add("create", createResults);
|
||||
}
|
||||
|
||||
//TODO delete some of the oldest collection(s) ?
|
||||
|
||||
//---- UPDATE THE ALIAS
|
||||
aliasesHolder.applyModificationAndExportToZk(curAliases -> {
|
||||
//---- UPDATE THE ALIAS WITH NEW COLLECTION
|
||||
aliasesManager.applyModificationAndExportToZk(curAliases -> {
|
||||
final List<String> curTargetCollections = curAliases.getCollectionAliasListMap().get(aliasName);
|
||||
if (curTargetCollections.contains(createCollName)) {
|
||||
return curAliases;
|
||||
|
@ -134,12 +168,92 @@ public class RoutedAliasCreateCollectionCmd implements OverseerCollectionMessage
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes some of the oldest collection(s) based on {@link TimeRoutedAlias#getAutoDeleteAgeMath()}. If not present
|
||||
* then does nothing. Returns non-null results if something was deleted (or if we tried to).
|
||||
* {@code now} is the date from which the math is relative to.
|
||||
*/
|
||||
NamedList deleteOldestCollectionsAndUpdateAlias(TimeRoutedAlias timeRoutedAlias,
|
||||
ZkStateReader.AliasesManager aliasesManager,
|
||||
Instant now) throws Exception {
|
||||
final String autoDeleteAgeMathStr = timeRoutedAlias.getAutoDeleteAgeMath();
|
||||
if (autoDeleteAgeMathStr == null) {
|
||||
return null;
|
||||
}
|
||||
final Instant delBefore;
|
||||
try {
|
||||
delBefore = new DateMathParser(Date.from(now), timeRoutedAlias.getTimeZone()).parseMath(autoDeleteAgeMathStr).toInstant();
|
||||
} catch (ParseException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e); // note: should not happen by this point
|
||||
}
|
||||
|
||||
String aliasName = timeRoutedAlias.getAliasName();
|
||||
|
||||
Collection<String> collectionsToDelete = new LinkedHashSet<>();
|
||||
|
||||
// First update the alias (there may be no change to make!)
|
||||
aliasesManager.applyModificationAndExportToZk(curAliases -> {
|
||||
// note: we could re-parse the TimeRoutedAlias object from curAliases but I don't think there's a point to it.
|
||||
|
||||
final List<Map.Entry<Instant, String>> parsedCollections =
|
||||
timeRoutedAlias.parseCollections(curAliases, () -> newAliasMustExistException(aliasName));
|
||||
|
||||
//iterating from newest to oldest, find the first collection that has a time <= "before". We keep this collection
|
||||
// (and all newer to left) but we delete older collections, which are the ones that follow.
|
||||
// This logic will always keep the first collection, which we can't delete.
|
||||
int numToKeep = 0;
|
||||
for (Map.Entry<Instant, String> parsedCollection : parsedCollections) {
|
||||
numToKeep++;
|
||||
final Instant colInstant = parsedCollection.getKey();
|
||||
if (colInstant.isBefore(delBefore) || colInstant.equals(delBefore)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (numToKeep == parsedCollections.size()) {
|
||||
log.debug("No old time routed collections to delete.");
|
||||
return curAliases;
|
||||
}
|
||||
|
||||
final List<String> targetList = curAliases.getCollectionAliasListMap().get(aliasName);
|
||||
// remember to delete these... (oldest to newest)
|
||||
for (int i = targetList.size() - 1; i >= numToKeep; i--) {
|
||||
collectionsToDelete.add(targetList.get(i));
|
||||
}
|
||||
// new alias list has only "numToKeep" first items
|
||||
final List<String> collectionsToKeep = targetList.subList(0, numToKeep);
|
||||
final String collectionsToKeepStr = StrUtils.join(collectionsToKeep, ',');
|
||||
return curAliases.cloneWithCollectionAlias(aliasName, collectionsToKeepStr);
|
||||
});
|
||||
|
||||
if (collectionsToDelete.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
log.info("Removing old time routed collections: {}", collectionsToDelete);
|
||||
// Should this be done asynchronously? If we got "ASYNC" then probably.
|
||||
// It would shorten the time the Overseer holds a lock on the alias name
|
||||
// (deleting the collections will be done later and not use that lock).
|
||||
// Don't bother about parallel; it's unusual to have more than 1.
|
||||
// Note we don't throw an exception here under most cases; instead the response will have information about
|
||||
// how each delete request went, possibly including a failure message.
|
||||
final CollectionsHandler collHandler = ocmh.overseer.getCoreContainer().getCollectionsHandler();
|
||||
NamedList results = new NamedList();
|
||||
for (String collection : collectionsToDelete) {
|
||||
final SolrParams reqParams = CollectionAdminRequest.deleteCollection(collection).getParams();
|
||||
SolrQueryResponse rsp = new SolrQueryResponse();
|
||||
collHandler.handleRequestBody(new LocalSolrQueryRequest(null, reqParams), rsp);
|
||||
results.add(collection, rsp.getValues());
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a collection (for use in a routed alias), waiting for it to be ready before returning.
|
||||
* If the collection already exists then this is not an error.
|
||||
* IMPORTANT: Only call this from an {@link OverseerCollectionMessageHandler.Cmd}.
|
||||
*/
|
||||
static void createCollectionAndWait(ClusterState clusterState, NamedList results, String aliasName, Map<String, String> aliasMetadata, String createCollName, OverseerCollectionMessageHandler ocmh) throws Exception {
|
||||
static NamedList createCollectionAndWait(ClusterState clusterState, String aliasName, Map<String, String> aliasMetadata,
|
||||
String createCollName, OverseerCollectionMessageHandler ocmh) throws Exception {
|
||||
// Map alias metadata starting with a prefix to a create-collection API request
|
||||
final ModifiableSolrParams createReqParams = new ModifiableSolrParams();
|
||||
for (Map.Entry<String, String> e : aliasMetadata.entrySet()) {
|
||||
|
@ -161,6 +275,7 @@ public class RoutedAliasCreateCollectionCmd implements OverseerCollectionMessage
|
|||
ocmh.overseer.getCoreContainer().getCollectionsHandler());
|
||||
createMsgMap.put(Overseer.QUEUE_OPERATION, "create");
|
||||
|
||||
NamedList results = new NamedList();
|
||||
try {
|
||||
// Since we are running in the Overseer here, send the message directly to the Overseer CreateCollectionCmd.
|
||||
// note: there's doesn't seem to be any point in locking on the collection name, so we don't. We currently should
|
||||
|
@ -173,7 +288,9 @@ public class RoutedAliasCreateCollectionCmd implements OverseerCollectionMessage
|
|||
}
|
||||
}
|
||||
|
||||
CollectionsHandler.waitForActiveCollection(createCollName, null, ocmh.overseer.getCoreContainer(), new OverseerSolrResponse(results));
|
||||
CollectionsHandler.waitForActiveCollection(createCollName, ocmh.overseer.getCoreContainer(),
|
||||
new OverseerSolrResponse(results));
|
||||
return results;
|
||||
}
|
||||
|
||||
private SolrException newAliasMustExistException(String aliasName) {
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.solr.cloud.api.collections;
|
||||
|
||||
import java.text.ParseException;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
@ -64,6 +63,7 @@ public class TimeRoutedAlias {
|
|||
public static final String ROUTER_START = ROUTER_PREFIX + "start";
|
||||
public static final String ROUTER_INTERVAL = ROUTER_PREFIX + "interval";
|
||||
public static final String ROUTER_MAX_FUTURE = ROUTER_PREFIX + "max-future-ms";
|
||||
public static final String ROUTER_AUTO_DELETE_AGE = ROUTER_PREFIX + "autoDeleteAge";
|
||||
public static final String CREATE_COLLECTION_PREFIX = "create-collection.";
|
||||
// plus TZ and NAME
|
||||
|
||||
|
@ -122,8 +122,9 @@ public class TimeRoutedAlias {
|
|||
|
||||
private final String aliasName;
|
||||
private final String routeField;
|
||||
private final String intervalMath; // ex: +1DAY
|
||||
private final long maxFutureMs;
|
||||
private final String intervalDateMath; // ex: +1DAY
|
||||
private final String autoDeleteAgeMath; // ex: /DAY-30DAYS *optional*
|
||||
private final TimeZone timeZone;
|
||||
|
||||
public TimeRoutedAlias(String aliasName, Map<String, String> aliasMetadata) {
|
||||
|
@ -134,21 +135,37 @@ public class TimeRoutedAlias {
|
|||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Only 'time' routed aliases is supported right now.");
|
||||
}
|
||||
routeField = required.get(ROUTER_FIELD);
|
||||
intervalDateMath = required.get(ROUTER_INTERVAL);
|
||||
intervalMath = required.get(ROUTER_INTERVAL);
|
||||
|
||||
//optional:
|
||||
maxFutureMs = params.getLong(ROUTER_MAX_FUTURE, TimeUnit.MINUTES.toMillis(10));
|
||||
autoDeleteAgeMath = params.get(ROUTER_AUTO_DELETE_AGE); // no default
|
||||
timeZone = TimeZoneUtils.parseTimezone(aliasMetadata.get(CommonParams.TZ));
|
||||
|
||||
// More validation:
|
||||
|
||||
// check that the interval is valid date math
|
||||
// check that the date math is valid
|
||||
final Date now = new Date();
|
||||
try {
|
||||
new DateMathParser(timeZone).parseMath(intervalDateMath);
|
||||
} catch (ParseException e) {
|
||||
final Date after = new DateMathParser(now, timeZone).parseMath(intervalMath);
|
||||
if (!after.after(now)) {
|
||||
throw new SolrException(BAD_REQUEST, "duration must add to produce a time in the future");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(BAD_REQUEST, "bad " + TimeRoutedAlias.ROUTER_INTERVAL + ", " + e, e);
|
||||
}
|
||||
|
||||
if (autoDeleteAgeMath != null) {
|
||||
try {
|
||||
final Date before = new DateMathParser(now, timeZone).parseMath(autoDeleteAgeMath);
|
||||
if (now.before(before)) {
|
||||
throw new SolrException(BAD_REQUEST, "duration must round or subtract to produce a time in the past");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(BAD_REQUEST, "bad " + TimeRoutedAlias.ROUTER_AUTO_DELETE_AGE + ", " + e, e);
|
||||
}
|
||||
}
|
||||
|
||||
if (maxFutureMs < 0) {
|
||||
throw new SolrException(BAD_REQUEST, ROUTER_MAX_FUTURE + " must be >= 0");
|
||||
}
|
||||
|
@ -162,12 +179,16 @@ public class TimeRoutedAlias {
|
|||
return routeField;
|
||||
}
|
||||
|
||||
public String getIntervalMath() {
|
||||
return intervalMath;
|
||||
}
|
||||
|
||||
public long getMaxFutureMs() {
|
||||
return maxFutureMs;
|
||||
}
|
||||
|
||||
public String getIntervalDateMath() {
|
||||
return intervalDateMath;
|
||||
public String getAutoDeleteAgeMath() {
|
||||
return autoDeleteAgeMath;
|
||||
}
|
||||
|
||||
public TimeZone getTimeZone() {
|
||||
|
@ -179,8 +200,9 @@ public class TimeRoutedAlias {
|
|||
return Objects.toStringHelper(this)
|
||||
.add("aliasName", aliasName)
|
||||
.add("routeField", routeField)
|
||||
.add("intervalMath", intervalMath)
|
||||
.add("maxFutureMs", maxFutureMs)
|
||||
.add("intervalDateMath", intervalDateMath)
|
||||
.add("autoDeleteAgeMath", autoDeleteAgeMath)
|
||||
.add("timeZone", timeZone)
|
||||
.toString();
|
||||
}
|
||||
|
@ -204,7 +226,7 @@ public class TimeRoutedAlias {
|
|||
/** Computes the timestamp of the next collection given the timestamp of the one before. */
|
||||
public Instant computeNextCollTimestamp(Instant fromTimestamp) {
|
||||
final Instant nextCollTimestamp =
|
||||
DateMathParser.parseMath(Date.from(fromTimestamp), "NOW" + intervalDateMath, timeZone).toInstant();
|
||||
DateMathParser.parseMath(Date.from(fromTimestamp), "NOW" + intervalMath, timeZone).toInstant();
|
||||
assert nextCollTimestamp.isAfter(fromTimestamp);
|
||||
return nextCollTimestamp;
|
||||
}
|
||||
|
|
|
@ -241,26 +241,39 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
|
|||
throw new SolrException(BAD_REQUEST,
|
||||
"Invalid request. collections can be accessed only in SolrCloud mode");
|
||||
}
|
||||
SolrResponse response = null;
|
||||
Map<String, Object> props = operation.execute(req, rsp, this);
|
||||
if (props == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
String asyncId = req.getParams().get(ASYNC);
|
||||
if (props != null) {
|
||||
if (asyncId != null) {
|
||||
props.put(ASYNC, asyncId);
|
||||
}
|
||||
props.put(QUEUE_OPERATION, operation.action.toLower());
|
||||
if (asyncId != null) {
|
||||
props.put(ASYNC, asyncId);
|
||||
}
|
||||
|
||||
props.put(QUEUE_OPERATION, operation.action.toLower());
|
||||
|
||||
if (operation.sendToOCPQueue) {
|
||||
ZkNodeProps zkProps = new ZkNodeProps(props);
|
||||
if (operation.sendToOCPQueue) {
|
||||
response = handleResponse(operation.action.toLower(), zkProps, rsp, operation.timeOut);
|
||||
SolrResponse overseerResponse = sendToOCPQueue(zkProps, operation.timeOut);
|
||||
rsp.getValues().addAll(overseerResponse.getResponse());
|
||||
Exception exp = overseerResponse.getException();
|
||||
if (exp != null) {
|
||||
rsp.setException(exp);
|
||||
}
|
||||
else Overseer.getStateUpdateQueue(coreContainer.getZkController().getZkClient()).offer(Utils.toJSON(props));
|
||||
final String collectionName = zkProps.getStr(NAME);
|
||||
|
||||
//TODO yuck; shouldn't create-collection at the overseer do this? (conditionally perhaps)
|
||||
if (action.equals(CollectionAction.CREATE) && asyncId == null) {
|
||||
if (rsp.getException() == null) {
|
||||
waitForActiveCollection(collectionName, zkProps, cores, response);
|
||||
waitForActiveCollection(zkProps.getStr(NAME), cores, overseerResponse);
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// submits and doesn't wait for anything (no response)
|
||||
Overseer.getStateUpdateQueue(coreContainer.getZkController().getZkClient()).offer(Utils.toJSON(props));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
@ -268,16 +281,13 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
|
|||
|
||||
public static long DEFAULT_COLLECTION_OP_TIMEOUT = 180*1000;
|
||||
|
||||
//TODO rename to submitToOverseerRPC
|
||||
public void handleResponse(String operation, ZkNodeProps m,
|
||||
SolrQueryResponse rsp) throws KeeperException, InterruptedException {
|
||||
handleResponse(operation, m, rsp, DEFAULT_COLLECTION_OP_TIMEOUT);
|
||||
public SolrResponse sendToOCPQueue(ZkNodeProps m) throws KeeperException, InterruptedException {
|
||||
return sendToOCPQueue(m, DEFAULT_COLLECTION_OP_TIMEOUT);
|
||||
}
|
||||
|
||||
//TODO rename to submitToOverseerRPC
|
||||
public SolrResponse handleResponse(String operation, ZkNodeProps m,
|
||||
SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException {
|
||||
if (!m.containsKey(QUEUE_OPERATION)) {
|
||||
public SolrResponse sendToOCPQueue(ZkNodeProps m, long timeout) throws KeeperException, InterruptedException {
|
||||
String operation = m.getStr(QUEUE_OPERATION);
|
||||
if (operation == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "missing key " + QUEUE_OPERATION);
|
||||
}
|
||||
if (m.get(ASYNC) != null) {
|
||||
|
@ -301,26 +311,16 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
|
|||
.offer(Utils.toJSON(m));
|
||||
}
|
||||
r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC));
|
||||
SolrResponse response = new OverseerSolrResponse(r);
|
||||
|
||||
rsp.getValues().addAll(response.getResponse());
|
||||
|
||||
return response;
|
||||
}
|
||||
return new OverseerSolrResponse(r);
|
||||
}
|
||||
|
||||
long time = System.nanoTime();
|
||||
QueueEvent event = coreContainer.getZkController()
|
||||
.getOverseerCollectionQueue()
|
||||
.offer(Utils.toJSON(m), timeout);
|
||||
if (event.getBytes() != null) {
|
||||
SolrResponse response = SolrResponse.deserialize(event.getBytes());
|
||||
rsp.getValues().addAll(response.getResponse());
|
||||
SimpleOrderedMap exp = (SimpleOrderedMap) response.getResponse().get("exception");
|
||||
if (exp != null) {
|
||||
Integer code = (Integer) exp.get("rspCode");
|
||||
rsp.setException(new SolrException(code != null && code != -1 ? ErrorCode.getErrorCode(code) : ErrorCode.SERVER_ERROR, (String)exp.get("msg")));
|
||||
}
|
||||
return response;
|
||||
return SolrResponse.deserialize(event.getBytes());
|
||||
} else {
|
||||
if (System.nanoTime() - time >= TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS)) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, operation
|
||||
|
@ -1156,16 +1156,16 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
|
|||
}
|
||||
}
|
||||
|
||||
public static void waitForActiveCollection(String collectionName, ZkNodeProps message, CoreContainer cc, SolrResponse response)
|
||||
public static void waitForActiveCollection(String collectionName, CoreContainer cc, SolrResponse createCollResponse)
|
||||
throws KeeperException, InterruptedException {
|
||||
|
||||
if (response.getResponse().get("exception") != null) {
|
||||
if (createCollResponse.getResponse().get("exception") != null) {
|
||||
// the main called failed, don't wait
|
||||
log.info("Not waiting for active collection due to exception: " + response.getResponse().get("exception"));
|
||||
log.info("Not waiting for active collection due to exception: " + createCollResponse.getResponse().get("exception"));
|
||||
return;
|
||||
}
|
||||
|
||||
if (response.getResponse().get("failure") != null) {
|
||||
if (createCollResponse.getResponse().get("failure") != null) {
|
||||
// TODO: we should not wait for Replicas we know failed
|
||||
}
|
||||
|
||||
|
|
|
@ -16,21 +16,7 @@
|
|||
*/
|
||||
package org.apache.solr.handler.admin;
|
||||
|
||||
import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_AT_ONCE_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_WAIT_SECONDS_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -56,6 +42,19 @@ import org.apache.zookeeper.KeeperException;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_AT_ONCE_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_WAIT_SECONDS_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
|
||||
|
||||
class RebalanceLeaders {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
|
@ -266,9 +265,7 @@ class RebalanceLeaders {
|
|||
propMap.put(ELECTION_NODE_PROP, electionNode);
|
||||
String asyncId = REBALANCELEADERS.toLower() + "_" + core + "_" + Math.abs(System.nanoTime());
|
||||
propMap.put(ASYNC, asyncId);
|
||||
ZkNodeProps m = new ZkNodeProps(propMap);
|
||||
SolrQueryResponse rspIgnore = new SolrQueryResponse(); // I'm constructing my own response
|
||||
collectionsHandler.handleResponse(REBALANCELEADERS.toLower(), m, rspIgnore); // Want to construct my own response here.
|
||||
collectionsHandler.sendToOCPQueue(new ZkNodeProps(propMap)); // ignore response; we construct our own
|
||||
}
|
||||
|
||||
// currentAsyncIds - map of request IDs and reporting data (value)
|
||||
|
|
|
@ -444,7 +444,7 @@ public class HttpSolrCall {
|
|||
if (!retry) {
|
||||
// we couldn't find a core to work with, try reloading aliases
|
||||
// TODO: it would be nice if admin ui elements skipped this...
|
||||
cores.getZkController().getZkStateReader().aliasesHolder.update();
|
||||
cores.getZkController().getZkStateReader().aliasesManager.update();
|
||||
action = RETRY;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.time.Instant;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -31,7 +30,6 @@ import java.util.concurrent.Semaphore;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.solr.cloud.Overseer;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.cloud.api.collections.RoutedAliasCreateCollectionCmd;
|
||||
import org.apache.solr.cloud.api.collections.TimeRoutedAlias;
|
||||
|
@ -40,8 +38,6 @@ import org.apache.solr.common.cloud.Aliases;
|
|||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.params.UpdateParams;
|
||||
|
@ -80,7 +76,10 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
|||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
// used to limit unnecessary concurrent collection creation requests
|
||||
// To avoid needless/redundant concurrent communication with the Overseer from this JVM, we
|
||||
// maintain a Semaphore from an alias name keyed ConcurrentHashMap.
|
||||
// Alternatively a Lock or CountDownLatch could have been used but they didn't seem
|
||||
// to make it any easier.
|
||||
private static ConcurrentHashMap<String, Semaphore> aliasToSemaphoreMap = new ConcurrentHashMap<>(4);
|
||||
|
||||
private final String thisCollection;
|
||||
|
@ -163,7 +162,7 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
|||
|
||||
updateParsedCollectionAliases();
|
||||
String targetCollection;
|
||||
do {
|
||||
do { // typically we don't loop; it's only when we need to create a collection
|
||||
targetCollection = findTargetCollectionGivenTimestamp(routeTimestamp);
|
||||
|
||||
if (targetCollection == null) {
|
||||
|
@ -271,39 +270,23 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
|||
// Invoke ROUTEDALIAS_CREATECOLL (in the Overseer, locked by alias name). It will create the collection
|
||||
// and update the alias contingent on the most recent collection name being the same as
|
||||
// what we think so here, otherwise it will return (without error).
|
||||
// To avoid needless concurrent communication with the Overseer from this JVM, we
|
||||
// maintain a Semaphore from an alias name keyed ConcurrentHashMap.
|
||||
// Alternatively a Lock or CountDownLatch could have been used but they didn't seem
|
||||
// to make it any easier.
|
||||
|
||||
// (see docs on aliasToSemaphoreMap)
|
||||
final Semaphore semaphore = aliasToSemaphoreMap.computeIfAbsent(getAliasName(), n -> new Semaphore(1));
|
||||
if (semaphore.tryAcquire()) {
|
||||
try {
|
||||
final String operation = CollectionParams.CollectionAction.ROUTEDALIAS_CREATECOLL.toLower();
|
||||
Map<String, Object> msg = new HashMap<>();
|
||||
msg.put(Overseer.QUEUE_OPERATION, operation);
|
||||
msg.put(CollectionParams.NAME, getAliasName());
|
||||
msg.put(RoutedAliasCreateCollectionCmd.IF_MOST_RECENT_COLL_NAME, mostRecentCollName);
|
||||
SolrQueryResponse rsp = new SolrQueryResponse();
|
||||
try {
|
||||
this.collHandler.handleResponse(
|
||||
operation,
|
||||
new ZkNodeProps(msg),
|
||||
rsp);
|
||||
if (rsp.getException() != null) {
|
||||
throw rsp.getException();
|
||||
} // otherwise don't care about the response. It's possible no collection was created because
|
||||
// of a race and that's okay... we'll ultimately retry any way.
|
||||
RoutedAliasCreateCollectionCmd.remoteInvoke(collHandler, getAliasName(), mostRecentCollName);
|
||||
// we don't care about the response. It's possible no collection was created because
|
||||
// of a race and that's okay... we'll ultimately retry any way.
|
||||
|
||||
// Ensure our view of the aliases has updated. If we didn't do this, our zkStateReader might
|
||||
// not yet know about the new alias (thus won't see the newly added collection to it), and we might think
|
||||
// we failed.
|
||||
zkController.getZkStateReader().aliasesHolder.update();
|
||||
} catch (RuntimeException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
|
||||
}
|
||||
// Ensure our view of the aliases has updated. If we didn't do this, our zkStateReader might
|
||||
// not yet know about the new alias (thus won't see the newly added collection to it), and we might think
|
||||
// we failed.
|
||||
zkController.getZkStateReader().aliasesManager.update();
|
||||
} catch (RuntimeException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
|
||||
} finally {
|
||||
semaphore.release(); // to signal we're done to anyone waiting on it
|
||||
}
|
||||
|
|
|
@ -286,28 +286,36 @@ public class DateMathParser {
|
|||
private Date now;
|
||||
|
||||
/**
|
||||
* Default constructor that assumes UTC should be used for rounding unless
|
||||
* otherwise specified in the SolrRequestInfo
|
||||
*
|
||||
* Chooses defaults based on the current request.
|
||||
* @see SolrRequestInfo#getClientTimeZone
|
||||
* @see SolrRequestInfo#getNOW()
|
||||
*/
|
||||
public DateMathParser() {
|
||||
this(null);
|
||||
this(null, null);
|
||||
}
|
||||
|
||||
//TODO Deprecate?
|
||||
public DateMathParser(TimeZone tz) {
|
||||
this(null, tz);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param now The current time. If null, it defaults to {@link SolrRequestInfo#getNOW()}.
|
||||
* otherwise the current time is assumed.
|
||||
* @param tz The TimeZone used for rounding (to determine when hours/days begin). If null, then this method defaults
|
||||
* to the value dictated by the SolrRequestInfo if it exists -- otherwise it uses UTC.
|
||||
* @see #DEFAULT_MATH_TZ
|
||||
* @see Calendar#getInstance(TimeZone,Locale)
|
||||
* @see SolrRequestInfo#getClientTimeZone
|
||||
*/
|
||||
public DateMathParser(TimeZone tz) {
|
||||
public DateMathParser(Date now, TimeZone tz) {
|
||||
this.now = now;// potentially null; it's okay
|
||||
|
||||
if (null == tz) {
|
||||
SolrRequestInfo reqInfo = SolrRequestInfo.getRequestInfo();
|
||||
tz = (null != reqInfo) ? reqInfo.getClientTimeZone() : DEFAULT_MATH_TZ;
|
||||
}
|
||||
zone = (null != tz) ? tz : DEFAULT_MATH_TZ;
|
||||
this.zone = (null != tz) ? tz : DEFAULT_MATH_TZ;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -104,9 +104,9 @@ public class AliasIntegrationTest extends SolrCloudTestCase {
|
|||
assertEquals(1, aliases.size());
|
||||
assertEquals("meta1", aliases.get(0));
|
||||
UnaryOperator<Aliases> op6 = a -> a.cloneWithCollectionAlias("meta1", "collection1meta,collection2meta");
|
||||
final ZkStateReader.AliasesManager aliasesHolder = zkStateReader.aliasesHolder;
|
||||
final ZkStateReader.AliasesManager aliasesManager = zkStateReader.aliasesManager;
|
||||
|
||||
aliasesHolder.applyModificationAndExportToZk(op6);
|
||||
aliasesManager.applyModificationAndExportToZk(op6);
|
||||
aliases = zkStateReader.getAliases().resolveAliases("meta1");
|
||||
assertEquals(2, aliases.size());
|
||||
assertEquals("collection1meta", aliases.get(0));
|
||||
|
@ -118,7 +118,7 @@ public class AliasIntegrationTest extends SolrCloudTestCase {
|
|||
|
||||
// set metadata
|
||||
UnaryOperator<Aliases> op5 = a -> a.cloneWithCollectionAliasMetadata("meta1", "foo", "bar");
|
||||
aliasesHolder.applyModificationAndExportToZk(op5);
|
||||
aliasesManager.applyModificationAndExportToZk(op5);
|
||||
Map<String, String> meta = zkStateReader.getAliases().getCollectionAliasMetadata("meta1");
|
||||
assertNotNull(meta);
|
||||
assertTrue(meta.containsKey("foo"));
|
||||
|
@ -126,7 +126,7 @@ public class AliasIntegrationTest extends SolrCloudTestCase {
|
|||
|
||||
// set more metadata
|
||||
UnaryOperator<Aliases> op4 = a -> a.cloneWithCollectionAliasMetadata("meta1", "foobar", "bazbam");
|
||||
aliasesHolder.applyModificationAndExportToZk(op4);
|
||||
aliasesManager.applyModificationAndExportToZk(op4);
|
||||
meta = zkStateReader.getAliases().getCollectionAliasMetadata("meta1");
|
||||
assertNotNull(meta);
|
||||
|
||||
|
@ -140,7 +140,7 @@ public class AliasIntegrationTest extends SolrCloudTestCase {
|
|||
|
||||
// remove metadata
|
||||
UnaryOperator<Aliases> op3 = a -> a.cloneWithCollectionAliasMetadata("meta1", "foo", null);
|
||||
aliasesHolder.applyModificationAndExportToZk(op3);
|
||||
aliasesManager.applyModificationAndExportToZk(op3);
|
||||
meta = zkStateReader.getAliases().getCollectionAliasMetadata("meta1");
|
||||
assertNotNull(meta);
|
||||
|
||||
|
@ -153,17 +153,17 @@ public class AliasIntegrationTest extends SolrCloudTestCase {
|
|||
|
||||
// removal of non existent key should succeed.
|
||||
UnaryOperator<Aliases> op2 = a -> a.cloneWithCollectionAliasMetadata("meta1", "foo", null);
|
||||
aliasesHolder.applyModificationAndExportToZk(op2);
|
||||
aliasesManager.applyModificationAndExportToZk(op2);
|
||||
|
||||
// chained invocations
|
||||
UnaryOperator<Aliases> op1 = a ->
|
||||
a.cloneWithCollectionAliasMetadata("meta1", "foo2", "bazbam")
|
||||
.cloneWithCollectionAliasMetadata("meta1", "foo3", "bazbam2");
|
||||
aliasesHolder.applyModificationAndExportToZk(op1);
|
||||
aliasesManager.applyModificationAndExportToZk(op1);
|
||||
|
||||
// some other independent update (not overwritten)
|
||||
UnaryOperator<Aliases> op = a -> a.cloneWithCollectionAlias("meta3", "collection1meta,collection2meta");
|
||||
aliasesHolder.applyModificationAndExportToZk(op);
|
||||
aliasesManager.applyModificationAndExportToZk(op);
|
||||
|
||||
// competing went through
|
||||
assertEquals("collection1meta,collection2meta", zkStateReader.getAliases().getCollectionAliasMap().get("meta3"));
|
||||
|
@ -278,7 +278,7 @@ public class AliasIntegrationTest extends SolrCloudTestCase {
|
|||
}
|
||||
|
||||
private void checkFooAndBarMeta(String aliasName, ZkStateReader zkStateReader) throws Exception {
|
||||
zkStateReader.aliasesHolder.update(); // ensure our view is up to date
|
||||
zkStateReader.aliasesManager.update(); // ensure our view is up to date
|
||||
Map<String, String> meta = zkStateReader.getAliases().getCollectionAliasMetadata(aliasName);
|
||||
assertNotNull(meta);
|
||||
assertTrue(meta.containsKey("foo"));
|
||||
|
@ -298,9 +298,9 @@ public class AliasIntegrationTest extends SolrCloudTestCase {
|
|||
assertEquals(1, aliases.size());
|
||||
assertEquals(aliasName, aliases.get(0));
|
||||
UnaryOperator<Aliases> op6 = a -> a.cloneWithCollectionAlias(aliasName, "collection1meta,collection2meta");
|
||||
final ZkStateReader.AliasesManager aliasesHolder = zkStateReader.aliasesHolder;
|
||||
final ZkStateReader.AliasesManager aliasesManager = zkStateReader.aliasesManager;
|
||||
|
||||
aliasesHolder.applyModificationAndExportToZk(op6);
|
||||
aliasesManager.applyModificationAndExportToZk(op6);
|
||||
aliases = zkStateReader.getAliases().resolveAliases(aliasName);
|
||||
assertEquals(2, aliases.size());
|
||||
assertEquals("collection1meta", aliases.get(0));
|
||||
|
|
|
@ -85,8 +85,8 @@ public class CreateRoutedAliasTest extends SolrCloudTestCase {
|
|||
// delete aliases first since they refer to the collections
|
||||
ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
|
||||
//TODO create an API to delete collections attached to the routed alias when the alias is removed
|
||||
zkStateReader.aliasesHolder.update();// ensure we're seeing the latest
|
||||
zkStateReader.aliasesHolder.applyModificationAndExportToZk(aliases -> {
|
||||
zkStateReader.aliasesManager.update();// ensure we're seeing the latest
|
||||
zkStateReader.aliasesManager.applyModificationAndExportToZk(aliases -> {
|
||||
Aliases a = zkStateReader.getAliases();
|
||||
for (String alias : a.getCollectionAliasMap().keySet()) {
|
||||
a = a.cloneWithCollectionAlias(alias,null); // remove
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Date;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
|
@ -160,7 +161,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
|||
.process(solrClient);
|
||||
|
||||
// index 3 documents in a random fashion
|
||||
addDocsAndCommit(
|
||||
addDocsAndCommit(false, // send these to alias & collections
|
||||
newDoc(Instant.parse("2017-10-23T00:00:00Z")),
|
||||
newDoc(Instant.parse("2017-10-24T01:00:00Z")),
|
||||
newDoc(Instant.parse("2017-10-24T02:00:00Z"))
|
||||
|
@ -184,19 +185,35 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
|||
// delete the Oct23rd (save memory)...
|
||||
// make sure we track that we are effectively deleting docs there
|
||||
numDocsDeletedOrFailed += solrClient.query(col23rd, params("q", "*:*", "rows", "0")).getResults().getNumFound();
|
||||
// remove from alias
|
||||
// remove from the alias
|
||||
CollectionAdminRequest.createAlias(alias, col24th).process(solrClient);
|
||||
// delete the collection
|
||||
CollectionAdminRequest.deleteCollection(col23rd).process(solrClient);
|
||||
|
||||
// now we're going to add documents that will trigger more collections to be created
|
||||
// for 25th & 26th
|
||||
addDocsAndCommit(
|
||||
addDocsAndCommit(false, // send these to alias & collections
|
||||
newDoc(Instant.parse("2017-10-24T03:00:00Z")),
|
||||
newDoc(Instant.parse("2017-10-25T04:00:00Z")),
|
||||
newDoc(Instant.parse("2017-10-26T05:00:00Z"))
|
||||
newDoc(Instant.parse("2017-10-26T05:00:00Z")),
|
||||
newDoc(Instant.parse("2017-10-26T06:00:00Z"))
|
||||
);
|
||||
assertInvariants(alias + "_2017-10-26", alias + "_2017-10-25", col24th);
|
||||
|
||||
|
||||
// update metadata to auto-delete oldest collections
|
||||
CollectionAdminRequest.modifyAlias(alias)
|
||||
.addMetadata(TimeRoutedAlias.ROUTER_AUTO_DELETE_AGE, "-1DAY") // thus usually keep 2 collections of a day size
|
||||
.process(solrClient);
|
||||
|
||||
// add more docs, creating one new collection, but trigger ones prior to
|
||||
int numDocsToBeAutoDeleted = queryNumDocs(timeField+":[* TO \"2017-10-26T00:00:00Z\"}");
|
||||
addDocsAndCommit(true, // send these to alias only
|
||||
newDoc(Instant.parse("2017-10-26T07:00:00Z")), // existing
|
||||
newDoc(Instant.parse("2017-10-27T08:00:00Z")) // new
|
||||
);
|
||||
numDocsDeletedOrFailed += numDocsToBeAutoDeleted;
|
||||
assertInvariants(alias + "_2017-10-27", alias + "_2017-10-26");
|
||||
}
|
||||
|
||||
private void testFailedDocument(Instant timestamp, String errorMsg) throws SolrServerException, IOException {
|
||||
|
@ -218,7 +235,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
|||
|
||||
/** Adds these documents and commits, returning when they are committed.
|
||||
* We randomly go about this in different ways. */
|
||||
private void addDocsAndCommit(SolrInputDocument... solrInputDocuments) throws Exception {
|
||||
private void addDocsAndCommit(boolean aliasOnly, SolrInputDocument... solrInputDocuments) throws Exception {
|
||||
// we assume all docs will be added (none too old/new to cause exception)
|
||||
Collections.shuffle(Arrays.asList(solrInputDocuments), random());
|
||||
|
||||
|
@ -226,10 +243,12 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
|||
// (it doesn't matter where we send docs since the alias is honored at the URP level)
|
||||
List<String> collections = new ArrayList<>();
|
||||
collections.add(alias);
|
||||
collections.addAll(new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias));
|
||||
if (!aliasOnly) {
|
||||
collections.addAll(new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias));
|
||||
}
|
||||
|
||||
int commitWithin = random().nextBoolean() ? -1 : 500; // if -1, we commit explicitly instead
|
||||
int numDocsBefore = queryNumDocs();
|
||||
|
||||
if (random().nextBoolean()) {
|
||||
// Send in separate threads. Choose random collection & solrClient
|
||||
try (CloudSolrClient solrClient = getCloudSolrClient(cluster)) {
|
||||
|
@ -258,21 +277,25 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
|||
solrClient.commit(col);
|
||||
} else {
|
||||
// check that it all got committed eventually
|
||||
int numDocs = queryNumDocs();
|
||||
if (numDocs == numDocsBefore + solrInputDocuments.length) {
|
||||
String docsQ =
|
||||
"{!terms f=id}"
|
||||
+ Arrays.stream(solrInputDocuments).map(d -> d.getFieldValue("id").toString())
|
||||
.collect(Collectors.joining(","));
|
||||
int numDocs = queryNumDocs(docsQ);
|
||||
if (numDocs == solrInputDocuments.length) {
|
||||
System.err.println("Docs committed sooner than expected. Bug or slow test env?");
|
||||
return;
|
||||
}
|
||||
// wait until it's committed
|
||||
Thread.sleep(commitWithin);
|
||||
for (int idx = 0; idx < 100; ++idx) { // Loop for up to 10 seconds waiting for commit to catch up
|
||||
numDocs = queryNumDocs();
|
||||
if (numDocsBefore + solrInputDocuments.length == numDocs) break;
|
||||
numDocs = queryNumDocs(docsQ);
|
||||
if (numDocs == solrInputDocuments.length) break;
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
assertEquals("not committed. Bug or a slow test?",
|
||||
numDocsBefore + solrInputDocuments.length, numDocs);
|
||||
solrInputDocuments.length, numDocs);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -282,8 +305,8 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
|||
assertTrue("Expected no errors: " + errors,errors == null || errors.isEmpty());
|
||||
}
|
||||
|
||||
private int queryNumDocs() throws SolrServerException, IOException {
|
||||
return (int) solrClient.query(alias, params("q", "*:*", "rows", "0")).getResults().getNumFound();
|
||||
private int queryNumDocs(String q) throws SolrServerException, IOException {
|
||||
return (int) solrClient.query(alias, params("q", q, "rows", "0")).getResults().getNumFound();
|
||||
}
|
||||
|
||||
private void assertInvariants(String... expectedColls) throws IOException, SolrServerException {
|
||||
|
|
|
@ -581,7 +581,7 @@ This field is required on all incoming documents.
|
|||
The type of routing to use. Presently only `time` is valid. This param is required.
|
||||
|
||||
`router.interval`::
|
||||
A fragment of a date math expression that will be appended to a timestamp to determine the next collection in the series.
|
||||
A date math expression that will be appended to a timestamp to determine the next collection in the series.
|
||||
Any date math expression that can be evaluated if appended to a timestamp of the form 2018-01-15T16:17:18 will
|
||||
work here. This param is required.
|
||||
|
||||
|
@ -590,6 +590,14 @@ The maximum milliseconds into the future that a document is allowed to have in `
|
|||
without error. If there was no limit, than an erroneous value could trigger many collections to be created.
|
||||
The default is 10 minutes worth.
|
||||
|
||||
`router.autoDeleteAge`::
|
||||
A date math expression that results in the oldest collections getting deleted automatically.
|
||||
The date math is relative to the timestamp of a newly created collection (typically close to the current time),
|
||||
and thus this must produce an earlier time via rounding and/or subtracting.
|
||||
Collections to be deleted must have a time range that is entirely before the computed age.
|
||||
Collections are considered for deletion immediately prior to new collections getting created.
|
||||
Example: `/DAY-90DAYS`. The default is not to delete.
|
||||
|
||||
`create-collection.*`::
|
||||
The * can be replaced with any parameter from the <<create,CREATE>> command except `name`. All other fields
|
||||
are identical in requirements and naming except that we insist that the configset be explicitly specified.
|
||||
|
|
|
@ -16,16 +16,16 @@
|
|||
*/
|
||||
package org.apache.solr.client.solrj;
|
||||
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.Serializable;
|
||||
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -42,6 +42,16 @@ public abstract class SolrResponse implements Serializable {
|
|||
public abstract void setElapsedTime(long elapsedTime);
|
||||
|
||||
public abstract NamedList<Object> getResponse();
|
||||
|
||||
public Exception getException() {
|
||||
NamedList exp = (NamedList) getResponse().get("exception");
|
||||
if (exp == null) {
|
||||
return null;
|
||||
}
|
||||
Integer rspCode = (Integer) exp.get("rspCode");
|
||||
ErrorCode errorCode = rspCode != null && rspCode != -1 ? ErrorCode.getErrorCode(rspCode) : ErrorCode.SERVER_ERROR;
|
||||
return new SolrException(errorCode, (String)exp.get("msg"));
|
||||
}
|
||||
|
||||
public static byte[] serializable(SolrResponse response) {
|
||||
try {
|
||||
|
|
|
@ -1341,8 +1341,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
this.aliasName = SolrIdentifierValidator.validateAliasName(aliasName);
|
||||
}
|
||||
|
||||
public void addMetadata(String key, String value) {
|
||||
public ModifyAlias addMetadata(String key, String value) {
|
||||
metadata.put(key,value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -429,7 +429,7 @@ public class ZkStateReader implements Closeable {
|
|||
refreshLegacyClusterState(new LegacyClusterStateWatcher());
|
||||
refreshStateFormat2Collections();
|
||||
refreshCollectionList(new CollectionsChildWatcher());
|
||||
refreshAliases(aliasesHolder);
|
||||
refreshAliases(aliasesManager);
|
||||
|
||||
if (securityNodeListener != null) {
|
||||
addSecuritynodeWatcher(pair -> {
|
||||
|
@ -1414,7 +1414,7 @@ public class ZkStateReader implements Closeable {
|
|||
//
|
||||
|
||||
/** Access to the {@link Aliases}. */
|
||||
public final AliasesManager aliasesHolder = new AliasesManager();
|
||||
public final AliasesManager aliasesManager = new AliasesManager();
|
||||
|
||||
/**
|
||||
* Get an immutable copy of the present state of the aliases. References to this object should not be retained
|
||||
|
@ -1423,7 +1423,7 @@ public class ZkStateReader implements Closeable {
|
|||
* @return The current aliases, Aliases.EMPTY if not solr cloud, or no aliases have existed yet. Never returns null.
|
||||
*/
|
||||
public Aliases getAliases() {
|
||||
return aliasesHolder.getAliases();
|
||||
return aliasesManager.getAliases();
|
||||
}
|
||||
|
||||
// called by createClusterStateWatchersAndUpdate()
|
||||
|
@ -1432,7 +1432,7 @@ public class ZkStateReader implements Closeable {
|
|||
constructState(Collections.emptySet());
|
||||
zkClient.exists(ALIASES, watcher, true);
|
||||
}
|
||||
aliasesHolder.update();
|
||||
aliasesManager.update();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -81,6 +81,7 @@ public interface CollectionParams {
|
|||
MODIFYALIAS(true, LockLevel.COLLECTION),
|
||||
LISTALIASES(false, LockLevel.NONE),
|
||||
ROUTEDALIAS_CREATECOLL(true, LockLevel.COLLECTION),
|
||||
DELETEROUTEDALIASCOLLECTIONS(true, LockLevel.COLLECTION),
|
||||
SPLITSHARD(true, LockLevel.SHARD),
|
||||
DELETESHARD(true, LockLevel.SHARD),
|
||||
CREATESHARD(true, LockLevel.COLLECTION),
|
||||
|
|
|
@ -186,6 +186,10 @@
|
|||
"type": "integer",
|
||||
"description":"How many milliseconds into the future to accept document. Documents with a value in router.field that is greater than now() + max-future-ms will be rejected to avoid provisioning too much resources."
|
||||
}
|
||||
"autoDeleteAge": {
|
||||
"type": "string",
|
||||
"description": "A date math expressions yielding a time in the past. Collections covering a period of time entirely before this age will be automatically deleted."
|
||||
}
|
||||
}
|
||||
},
|
||||
"TZ": {
|
||||
|
|
Loading…
Reference in New Issue