mirror of https://github.com/apache/lucene.git
SOLR-11722: Refactor out a TimeRoutedAlias class from various parts.
Also allowed TRA's to be tolerant of pre-existing collections.
This commit is contained in:
parent
a1828a5664
commit
b0d244f656
|
@ -17,11 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.solr.cloud.api.collections;
|
package org.apache.solr.cloud.api.collections;
|
||||||
|
|
||||||
import java.lang.invoke.MethodHandles;
|
|
||||||
import java.text.ParseException;
|
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.time.temporal.ChronoUnit;
|
import java.time.temporal.ChronoUnit;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -31,7 +28,6 @@ import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TimeZone;
|
import java.util.TimeZone;
|
||||||
import java.util.function.Predicate;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
|
@ -41,54 +37,16 @@ import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.params.CommonParams;
|
import org.apache.solr.common.params.CommonParams;
|
||||||
import org.apache.solr.common.util.NamedList;
|
import org.apache.solr.common.util.NamedList;
|
||||||
import org.apache.solr.common.util.StrUtils;
|
import org.apache.solr.common.util.StrUtils;
|
||||||
import org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor;
|
|
||||||
import org.apache.solr.util.DateMathParser;
|
import org.apache.solr.util.DateMathParser;
|
||||||
import org.apache.solr.util.TimeZoneUtils;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
|
import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
|
||||||
import static org.apache.solr.common.params.CommonParams.TZ;
|
|
||||||
|
|
||||||
public class CreateAliasCmd implements OverseerCollectionMessageHandler.Cmd {
|
public class CreateAliasCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
|
||||||
|
|
||||||
public static final String ROUTER_PREFIX = "router.";
|
|
||||||
public static final String ROUTER_START = ROUTER_PREFIX + "start"; //TODO, router related
|
|
||||||
public static final String ROUTER_TYPE = ROUTER_PREFIX + "name";
|
|
||||||
public static final String ROUTER_FIELD = ROUTER_PREFIX + "field";
|
|
||||||
public static final String ROUTER_INCREMENT = ROUTER_PREFIX + "interval";
|
|
||||||
public static final String ROUTER_MAX_FUTURE = ROUTER_PREFIX + "max-future-ms";
|
|
||||||
|
|
||||||
public static final String CREATE_COLLECTION_PREFIX = "create-collection.";
|
|
||||||
|
|
||||||
private final OverseerCollectionMessageHandler ocmh;
|
private final OverseerCollectionMessageHandler ocmh;
|
||||||
|
|
||||||
/**
|
|
||||||
* Parameters required for creating a routed alias
|
|
||||||
*/
|
|
||||||
public static final List<String> REQUIRED_ROUTER_PARAMS = Collections.unmodifiableList(Arrays.asList(
|
|
||||||
CommonParams.NAME,
|
|
||||||
ROUTER_TYPE,
|
|
||||||
ROUTER_FIELD,
|
|
||||||
ROUTER_START,
|
|
||||||
ROUTER_INCREMENT));
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Optional parameters for creating a routed alias excluding parameters for collection creation.
|
|
||||||
*/
|
|
||||||
public static final List<String> OPTIONAL_ROUTER_PARAMS = Collections.unmodifiableList(Arrays.asList(
|
|
||||||
ROUTER_MAX_FUTURE,
|
|
||||||
TZ)); // kinda special
|
|
||||||
|
|
||||||
private static Predicate<String> PARAM_IS_METADATA =
|
|
||||||
key -> key.equals(TZ) ||
|
|
||||||
(key.startsWith(ROUTER_PREFIX) && !key.equals(ROUTER_START)) || //nocommit reconsider START special case
|
|
||||||
key.startsWith(CREATE_COLLECTION_PREFIX);
|
|
||||||
|
|
||||||
private static boolean anyRoutingParams(ZkNodeProps message) {
|
private static boolean anyRoutingParams(ZkNodeProps message) {
|
||||||
return message.keySet().stream().anyMatch(k -> k.startsWith(ROUTER_PREFIX));
|
return message.keySet().stream().anyMatch(k -> k.startsWith(TimeRoutedAlias.ROUTER_PREFIX));
|
||||||
}
|
}
|
||||||
|
|
||||||
public CreateAliasCmd(OverseerCollectionMessageHandler ocmh) {
|
public CreateAliasCmd(OverseerCollectionMessageHandler ocmh) {
|
||||||
|
@ -100,81 +58,11 @@ public class CreateAliasCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
throws Exception {
|
throws Exception {
|
||||||
final String aliasName = message.getStr(CommonParams.NAME);
|
final String aliasName = message.getStr(CommonParams.NAME);
|
||||||
ZkStateReader zkStateReader = ocmh.zkStateReader;
|
ZkStateReader zkStateReader = ocmh.zkStateReader;
|
||||||
ZkStateReader.AliasesManager holder = zkStateReader.aliasesHolder;
|
|
||||||
|
|
||||||
//TODO refactor callCreatePlainAlias
|
|
||||||
if (!anyRoutingParams(message)) {
|
if (!anyRoutingParams(message)) {
|
||||||
|
callCreatePlainAlias(message, aliasName, zkStateReader);
|
||||||
final List<String> canonicalCollectionList = parseCollectionsParameter(message.get("collections"));
|
} else {
|
||||||
final String canonicalCollectionsString = StrUtils.join(canonicalCollectionList, ',');
|
callCreateRoutedAlias(message, aliasName, zkStateReader, state);
|
||||||
validateAllCollectionsExistAndNoDups(canonicalCollectionList, zkStateReader);
|
|
||||||
holder.applyModificationAndExportToZk(aliases -> aliases.cloneWithCollectionAlias(aliasName, canonicalCollectionsString));
|
|
||||||
|
|
||||||
} else { //TODO refactor callCreateRoutedAlias
|
|
||||||
|
|
||||||
// Validate we got everything we need
|
|
||||||
if (!message.getProperties().keySet().containsAll(REQUIRED_ROUTER_PARAMS)) {
|
|
||||||
throw new SolrException(BAD_REQUEST, "A routed alias requires these params: " + REQUIRED_ROUTER_PARAMS
|
|
||||||
+ " plus some create-collection prefixed ones.");
|
|
||||||
}
|
|
||||||
|
|
||||||
Map<String, String> aliasMetadata = new LinkedHashMap<>();
|
|
||||||
message.getProperties().entrySet().stream()
|
|
||||||
.filter(entry -> PARAM_IS_METADATA.test(entry.getKey()))
|
|
||||||
.forEach(entry -> aliasMetadata.put(entry.getKey(), (String) entry.getValue()));
|
|
||||||
|
|
||||||
//TODO read these from metadata where appropriate. This leads to consistent logic between initial routed alias
|
|
||||||
// collection creation, and subsequent collections to be created.
|
|
||||||
|
|
||||||
final String routingType = message.getStr(ROUTER_TYPE);
|
|
||||||
final String tz = message.getStr(TZ);
|
|
||||||
final String start = message.getStr(ROUTER_START);
|
|
||||||
final String increment = message.getStr(ROUTER_INCREMENT);
|
|
||||||
final String maxFutureMs = message.getStr(ROUTER_MAX_FUTURE);
|
|
||||||
|
|
||||||
try {
|
|
||||||
if (maxFutureMs != null && 0 > Long.parseLong(maxFutureMs)) {
|
|
||||||
throw new NumberFormatException("Negative value not allowed here");
|
|
||||||
}
|
|
||||||
} catch (NumberFormatException e) {
|
|
||||||
throw new SolrException(BAD_REQUEST, ROUTER_MAX_FUTURE + " must be a valid long integer representing a number " +
|
|
||||||
"of milliseconds greater than or equal to zero");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!"time".equals(routingType)) {
|
|
||||||
throw new SolrException(BAD_REQUEST, "Only time based routing is supported at this time");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check for invalid timezone
|
|
||||||
TimeZone zone = TimeZoneUtils.parseTimezone(tz);
|
|
||||||
|
|
||||||
// check that the increment is valid date math
|
|
||||||
try {
|
|
||||||
new DateMathParser(zone).parseMath(increment);
|
|
||||||
} catch (ParseException e) {
|
|
||||||
throw new SolrException(BAD_REQUEST,e.getMessage(),e);
|
|
||||||
}
|
|
||||||
|
|
||||||
Instant startTime = parseStart(start, zone);
|
|
||||||
|
|
||||||
// It's too much work to check the routed field against the schema, there seems to be no good way to get
|
|
||||||
// a copy of the schema aside from loading it directly from zookeeper based on the config name, but that
|
|
||||||
// also requires I load solrconfig.xml to check what the value for managedSchemaResourceName is too, (or
|
|
||||||
// discover that managed schema is not turned on and read schema.xml instead... and check for dynamic
|
|
||||||
// field patterns too. As much as it would be nice to validate all inputs it's not worth the effort.
|
|
||||||
|
|
||||||
String initialCollectionName = TimeRoutedAliasUpdateProcessor
|
|
||||||
.formatCollectionNameFromInstant(aliasName, startTime);
|
|
||||||
|
|
||||||
// Create the collection
|
|
||||||
NamedList createResults = new NamedList();
|
|
||||||
RoutedAliasCreateCollectionCmd.createCollectionAndWait(state, createResults, aliasName, aliasMetadata, initialCollectionName, ocmh);
|
|
||||||
validateAllCollectionsExistAndNoDups(Collections.singletonList(initialCollectionName), zkStateReader);
|
|
||||||
|
|
||||||
// Create/update the alias
|
|
||||||
holder.applyModificationAndExportToZk(aliases -> aliases
|
|
||||||
.cloneWithCollectionAlias(aliasName, initialCollectionName)
|
|
||||||
.cloneWithCollectionAliasMetadata(aliasName, aliasMetadata));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sleep a bit to allow ZooKeeper state propagation.
|
// Sleep a bit to allow ZooKeeper state propagation.
|
||||||
|
@ -192,6 +80,57 @@ public class CreateAliasCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void callCreatePlainAlias(ZkNodeProps message, String aliasName, ZkStateReader zkStateReader) {
|
||||||
|
final List<String> canonicalCollectionList = parseCollectionsParameter(message.get("collections"));
|
||||||
|
final String canonicalCollectionsString = StrUtils.join(canonicalCollectionList, ',');
|
||||||
|
validateAllCollectionsExistAndNoDups(canonicalCollectionList, zkStateReader);
|
||||||
|
zkStateReader.aliasesHolder
|
||||||
|
.applyModificationAndExportToZk(aliases -> aliases.cloneWithCollectionAlias(aliasName, canonicalCollectionsString));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The v2 API directs that the 'collections' parameter be provided as a JSON array (e.g. ["a", "b"]). We also
|
||||||
|
* maintain support for the legacy format, a comma-separated list (e.g. a,b).
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private List<String> parseCollectionsParameter(Object colls) {
|
||||||
|
if (colls == null) throw new SolrException(BAD_REQUEST, "missing collections param");
|
||||||
|
if (colls instanceof List) return (List<String>) colls;
|
||||||
|
return StrUtils.splitSmart(colls.toString(), ",", true).stream()
|
||||||
|
.map(String::trim)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void callCreateRoutedAlias(ZkNodeProps message, String aliasName, ZkStateReader zkStateReader, ClusterState state) throws Exception {
|
||||||
|
// Validate we got everything we need
|
||||||
|
if (!message.getProperties().keySet().containsAll(TimeRoutedAlias.REQUIRED_ROUTER_PARAMS)) {
|
||||||
|
throw new SolrException(BAD_REQUEST, "A routed alias requires these params: " + TimeRoutedAlias.REQUIRED_ROUTER_PARAMS
|
||||||
|
+ " plus some create-collection prefixed ones.");
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, String> aliasMetadata = new LinkedHashMap<>();
|
||||||
|
message.getProperties().entrySet().stream()
|
||||||
|
.filter(entry -> TimeRoutedAlias.PARAM_IS_METADATA.test(entry.getKey()))
|
||||||
|
.forEach(entry -> aliasMetadata.put(entry.getKey(), (String) entry.getValue())); // way easier than .collect
|
||||||
|
|
||||||
|
TimeRoutedAlias timeRoutedAlias = new TimeRoutedAlias(aliasName, aliasMetadata); // validates as well
|
||||||
|
|
||||||
|
String start = message.getStr(TimeRoutedAlias.ROUTER_START);
|
||||||
|
Instant startTime = parseStart(start, timeRoutedAlias.getTimeZone());
|
||||||
|
|
||||||
|
String initialCollectionName = TimeRoutedAlias.formatCollectionNameFromInstant(aliasName, startTime);
|
||||||
|
|
||||||
|
// Create the collection
|
||||||
|
NamedList createResults = new NamedList();
|
||||||
|
RoutedAliasCreateCollectionCmd.createCollectionAndWait(state, createResults, aliasName, aliasMetadata, initialCollectionName, ocmh);
|
||||||
|
validateAllCollectionsExistAndNoDups(Collections.singletonList(initialCollectionName), zkStateReader);
|
||||||
|
|
||||||
|
// Create/update the alias
|
||||||
|
zkStateReader.aliasesHolder.applyModificationAndExportToZk(aliases -> aliases
|
||||||
|
.cloneWithCollectionAlias(aliasName, initialCollectionName)
|
||||||
|
.cloneWithCollectionAliasMetadata(aliasName, aliasMetadata));
|
||||||
|
}
|
||||||
|
|
||||||
private Instant parseStart(String str, TimeZone zone) {
|
private Instant parseStart(String str, TimeZone zone) {
|
||||||
Instant start = DateMathParser.parseMath(new Date(), str, zone).toInstant();
|
Instant start = DateMathParser.parseMath(new Date(), str, zone).toInstant();
|
||||||
checkMilis(start);
|
checkMilis(start);
|
||||||
|
@ -223,17 +162,4 @@ public class CreateAliasCmd implements OverseerCollectionMessageHandler.Cmd {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* The v2 API directs that the 'collections' parameter be provided as a JSON array (e.g. ["a", "b"]). We also
|
|
||||||
* maintain support for the legacy format, a comma-separated list (e.g. a,b).
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private List<String> parseCollectionsParameter(Object colls) {
|
|
||||||
if (colls == null) throw new SolrException(BAD_REQUEST, "missing collections param");
|
|
||||||
if (colls instanceof List) return (List<String>) colls;
|
|
||||||
return StrUtils.splitSmart(colls.toString(), ",", true).stream()
|
|
||||||
.map(String::trim)
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,6 @@ import java.time.Instant;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TimeZone;
|
|
||||||
|
|
||||||
import org.apache.solr.cloud.Overseer;
|
import org.apache.solr.cloud.Overseer;
|
||||||
import org.apache.solr.cloud.OverseerSolrResponse;
|
import org.apache.solr.cloud.OverseerSolrResponse;
|
||||||
|
@ -32,22 +31,18 @@ import org.apache.solr.common.cloud.ClusterState;
|
||||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.params.CollectionParams;
|
import org.apache.solr.common.params.CollectionParams;
|
||||||
import org.apache.solr.common.params.CommonParams;
|
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
import org.apache.solr.common.util.NamedList;
|
import org.apache.solr.common.util.NamedList;
|
||||||
import org.apache.solr.common.util.StrUtils;
|
import org.apache.solr.common.util.StrUtils;
|
||||||
import org.apache.solr.handler.admin.CollectionsHandler;
|
import org.apache.solr.handler.admin.CollectionsHandler;
|
||||||
import org.apache.solr.request.LocalSolrQueryRequest;
|
import org.apache.solr.request.LocalSolrQueryRequest;
|
||||||
import org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor;
|
|
||||||
import org.apache.solr.util.TimeZoneUtils;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static org.apache.solr.cloud.api.collections.CreateAliasCmd.CREATE_COLLECTION_PREFIX;
|
|
||||||
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.COLL_CONF;
|
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.COLL_CONF;
|
||||||
|
import static org.apache.solr.cloud.api.collections.TimeRoutedAlias.CREATE_COLLECTION_PREFIX;
|
||||||
|
import static org.apache.solr.cloud.api.collections.TimeRoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP;
|
||||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||||
import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.ROUTER_FIELD_METADATA;
|
|
||||||
import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.ROUTER_INTERVAL_METADATA;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For "routed aliases", creates another collection and adds it to the alias. In some cases it will not
|
* For "routed aliases", creates another collection and adds it to the alias. In some cases it will not
|
||||||
|
@ -69,11 +64,6 @@ public class RoutedAliasCreateCollectionCmd implements OverseerCollectionMessage
|
||||||
this.ocmh = ocmh;
|
this.ocmh = ocmh;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* TODO:
|
|
||||||
There are a few classes related to time routed alias processing. We need to share some logic better.
|
|
||||||
*/
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
|
public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
|
||||||
//---- PARSE PRIMARY MESSAGE PARAMS
|
//---- PARSE PRIMARY MESSAGE PARAMS
|
||||||
|
@ -92,17 +82,10 @@ public class RoutedAliasCreateCollectionCmd implements OverseerCollectionMessage
|
||||||
throw newAliasMustExistException(aliasName); // if it did exist, we'd have a non-null map
|
throw newAliasMustExistException(aliasName); // if it did exist, we'd have a non-null map
|
||||||
}
|
}
|
||||||
|
|
||||||
String routeField = aliasMetadata.get(ROUTER_FIELD_METADATA);
|
final TimeRoutedAlias timeRoutedAlias = new TimeRoutedAlias(aliasName, aliasMetadata);
|
||||||
if (routeField == null) {
|
|
||||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
|
||||||
"This command only works on time routed aliases. Expected alias metadata not found.");
|
|
||||||
}
|
|
||||||
String intervalDateMath = aliasMetadata.getOrDefault(ROUTER_INTERVAL_METADATA, "+1DAY");
|
|
||||||
TimeZone intervalTimeZone = TimeZoneUtils.parseTimezone(aliasMetadata.get(CommonParams.TZ));
|
|
||||||
|
|
||||||
//TODO this is ugly; how can we organize the code related to this feature better?
|
|
||||||
final List<Map.Entry<Instant, String>> parsedCollections =
|
final List<Map.Entry<Instant, String>> parsedCollections =
|
||||||
TimeRoutedAliasUpdateProcessor.parseCollections(aliasName, aliases, () -> newAliasMustExistException(aliasName));
|
timeRoutedAlias.parseCollections(aliases, () -> newAliasMustExistException(aliasName));
|
||||||
|
|
||||||
//---- GET MOST RECENT COLL
|
//---- GET MOST RECENT COLL
|
||||||
final Map.Entry<Instant, String> mostRecentEntry = parsedCollections.get(0);
|
final Map.Entry<Instant, String> mostRecentEntry = parsedCollections.get(0);
|
||||||
|
@ -127,9 +110,8 @@ public class RoutedAliasCreateCollectionCmd implements OverseerCollectionMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
//---- COMPUTE NEXT COLLECTION NAME
|
//---- COMPUTE NEXT COLLECTION NAME
|
||||||
final Instant nextCollTimestamp = TimeRoutedAliasUpdateProcessor.computeNextCollTimestamp(mostRecentCollTimestamp, intervalDateMath, intervalTimeZone);
|
final Instant nextCollTimestamp = timeRoutedAlias.computeNextCollTimestamp(mostRecentCollTimestamp);
|
||||||
assert nextCollTimestamp.isAfter(mostRecentCollTimestamp);
|
final String createCollName = TimeRoutedAlias.formatCollectionNameFromInstant(aliasName, nextCollTimestamp);
|
||||||
final String createCollName = TimeRoutedAliasUpdateProcessor.formatCollectionNameFromInstant(aliasName, nextCollTimestamp);
|
|
||||||
|
|
||||||
//---- CREATE THE COLLECTION
|
//---- CREATE THE COLLECTION
|
||||||
createCollectionAndWait(clusterState, results, aliasName, aliasMetadata, createCollName, ocmh);
|
createCollectionAndWait(clusterState, results, aliasName, aliasMetadata, createCollName, ocmh);
|
||||||
|
@ -152,6 +134,11 @@ public class RoutedAliasCreateCollectionCmd implements OverseerCollectionMessage
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a collection (for use in a routed alias), waiting for it to be ready before returning.
|
||||||
|
* If the collection already exists then this is not an error.
|
||||||
|
* IMPORTANT: Only call this from an {@link OverseerCollectionMessageHandler.Cmd}.
|
||||||
|
*/
|
||||||
static void createCollectionAndWait(ClusterState clusterState, NamedList results, String aliasName, Map<String, String> aliasMetadata, String createCollName, OverseerCollectionMessageHandler ocmh) throws Exception {
|
static void createCollectionAndWait(ClusterState clusterState, NamedList results, String aliasName, Map<String, String> aliasMetadata, String createCollName, OverseerCollectionMessageHandler ocmh) throws Exception {
|
||||||
// Map alias metadata starting with a prefix to a create-collection API request
|
// Map alias metadata starting with a prefix to a create-collection API request
|
||||||
final ModifiableSolrParams createReqParams = new ModifiableSolrParams();
|
final ModifiableSolrParams createReqParams = new ModifiableSolrParams();
|
||||||
|
@ -165,7 +152,7 @@ public class RoutedAliasCreateCollectionCmd implements OverseerCollectionMessage
|
||||||
"We require an explicit " + COLL_CONF );
|
"We require an explicit " + COLL_CONF );
|
||||||
}
|
}
|
||||||
createReqParams.set(NAME, createCollName);
|
createReqParams.set(NAME, createCollName);
|
||||||
createReqParams.set("property." + TimeRoutedAliasUpdateProcessor.TIME_PARTITION_ALIAS_NAME_CORE_PROP, aliasName);
|
createReqParams.set("property." + ROUTED_ALIAS_NAME_CORE_PROP, aliasName);
|
||||||
// a CollectionOperation reads params and produces a message (Map) that is supposed to be sent to the Overseer.
|
// a CollectionOperation reads params and produces a message (Map) that is supposed to be sent to the Overseer.
|
||||||
// Although we could create the Map without it, there are a fair amount of rules we don't want to reproduce.
|
// Although we could create the Map without it, there are a fair amount of rules we don't want to reproduce.
|
||||||
final Map<String, Object> createMsgMap = CollectionsHandler.CollectionOperation.CREATE_OP.execute(
|
final Map<String, Object> createMsgMap = CollectionsHandler.CollectionOperation.CREATE_OP.execute(
|
||||||
|
@ -173,8 +160,18 @@ public class RoutedAliasCreateCollectionCmd implements OverseerCollectionMessage
|
||||||
null,
|
null,
|
||||||
ocmh.overseer.getCoreContainer().getCollectionsHandler());
|
ocmh.overseer.getCoreContainer().getCollectionsHandler());
|
||||||
createMsgMap.put(Overseer.QUEUE_OPERATION, "create");
|
createMsgMap.put(Overseer.QUEUE_OPERATION, "create");
|
||||||
// Since we are running in the Overseer here, send the message directly to the Overseer CreateCollectionCmd
|
|
||||||
|
try {
|
||||||
|
// Since we are running in the Overseer here, send the message directly to the Overseer CreateCollectionCmd.
|
||||||
|
// note: there's doesn't seem to be any point in locking on the collection name, so we don't. We currently should
|
||||||
|
// already have a lock on the alias name which should be sufficient.
|
||||||
ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, new ZkNodeProps(createMsgMap), results);
|
ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, new ZkNodeProps(createMsgMap), results);
|
||||||
|
} catch (SolrException e) {
|
||||||
|
// The collection might already exist, and that's okay -- we can adopt it.
|
||||||
|
if (!e.getMessage().contains("collection already exists")) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
CollectionsHandler.waitForActiveCollection(createCollName, null, ocmh.overseer.getCoreContainer(), new OverseerSolrResponse(results));
|
CollectionsHandler.waitForActiveCollection(createCollName, null, ocmh.overseer.getCoreContainer(), new OverseerSolrResponse(results));
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,211 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.cloud.api.collections;
|
||||||
|
|
||||||
|
import java.text.ParseException;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.time.ZoneOffset;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.time.format.DateTimeFormatterBuilder;
|
||||||
|
import java.time.temporal.ChronoField;
|
||||||
|
import java.util.AbstractMap;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.TimeZone;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
import com.google.common.base.Objects;
|
||||||
|
import org.apache.solr.common.SolrException;
|
||||||
|
import org.apache.solr.common.cloud.Aliases;
|
||||||
|
import org.apache.solr.common.params.CommonParams;
|
||||||
|
import org.apache.solr.common.params.MapSolrParams;
|
||||||
|
import org.apache.solr.common.params.RequiredSolrParams;
|
||||||
|
import org.apache.solr.util.DateMathParser;
|
||||||
|
import org.apache.solr.util.TimeZoneUtils;
|
||||||
|
|
||||||
|
import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
|
||||||
|
import static org.apache.solr.common.params.CommonParams.TZ;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Holds configuration for a routed alias, and some common code and constants.
|
||||||
|
*
|
||||||
|
* @see CreateAliasCmd
|
||||||
|
* @see RoutedAliasCreateCollectionCmd
|
||||||
|
* @see org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor
|
||||||
|
*/
|
||||||
|
public class TimeRoutedAlias {
|
||||||
|
|
||||||
|
// These are parameter names to routed alias creation, AND are stored as metadata with the alias.
|
||||||
|
public static final String ROUTER_PREFIX = "router.";
|
||||||
|
public static final String ROUTER_TYPE_NAME = ROUTER_PREFIX + "name";
|
||||||
|
public static final String ROUTER_FIELD = ROUTER_PREFIX + "field";
|
||||||
|
public static final String ROUTER_START = ROUTER_PREFIX + "start";
|
||||||
|
public static final String ROUTER_INTERVAL = ROUTER_PREFIX + "interval";
|
||||||
|
public static final String ROUTER_MAX_FUTURE = ROUTER_PREFIX + "max-future-ms";
|
||||||
|
public static final String CREATE_COLLECTION_PREFIX = "create-collection.";
|
||||||
|
// plus TZ and NAME
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parameters required for creating a routed alias
|
||||||
|
*/
|
||||||
|
public static final List<String> REQUIRED_ROUTER_PARAMS = Collections.unmodifiableList(Arrays.asList(
|
||||||
|
CommonParams.NAME,
|
||||||
|
ROUTER_TYPE_NAME,
|
||||||
|
ROUTER_FIELD,
|
||||||
|
ROUTER_START,
|
||||||
|
ROUTER_INTERVAL));
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Optional parameters for creating a routed alias excluding parameters for collection creation.
|
||||||
|
*/
|
||||||
|
public static final List<String> OPTIONAL_ROUTER_PARAMS = Collections.unmodifiableList(Arrays.asList(
|
||||||
|
ROUTER_MAX_FUTURE,
|
||||||
|
TZ)); // kinda special
|
||||||
|
|
||||||
|
static Predicate<String> PARAM_IS_METADATA =
|
||||||
|
key -> key.equals(TZ) ||
|
||||||
|
(key.startsWith(ROUTER_PREFIX) && !key.equals(ROUTER_START)) || //TODO reconsider START special case
|
||||||
|
key.startsWith(CREATE_COLLECTION_PREFIX);
|
||||||
|
|
||||||
|
public static final String ROUTED_ALIAS_NAME_CORE_PROP = "routedAliasName"; // core prop
|
||||||
|
|
||||||
|
// This format must be compatible with collection name limitations
|
||||||
|
private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
|
||||||
|
.append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("[_HH[_mm[_ss]]]") //brackets mean optional
|
||||||
|
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
|
||||||
|
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
|
||||||
|
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
|
||||||
|
.toFormatter(Locale.ROOT).withZone(ZoneOffset.UTC); // deliberate -- collection names disregard TZ
|
||||||
|
|
||||||
|
public static Instant parseInstantFromCollectionName(String aliasName, String collection) {
|
||||||
|
final String dateTimePart = collection.substring(aliasName.length() + 1);
|
||||||
|
return DATE_TIME_FORMATTER.parse(dateTimePart, Instant::from);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String formatCollectionNameFromInstant(String aliasName, Instant timestamp) {
|
||||||
|
String nextCollName = DATE_TIME_FORMATTER.format(timestamp);
|
||||||
|
for (int i = 0; i < 3; i++) { // chop off seconds, minutes, hours
|
||||||
|
if (nextCollName.endsWith("_00")) {
|
||||||
|
nextCollName = nextCollName.substring(0, nextCollName.length()-3);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert DATE_TIME_FORMATTER.parse(nextCollName, Instant::from).equals(timestamp);
|
||||||
|
return aliasName + "_" + nextCollName;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
//
|
||||||
|
// Instance data and methods
|
||||||
|
//
|
||||||
|
|
||||||
|
private final String aliasName;
|
||||||
|
private final String routeField;
|
||||||
|
private final long maxFutureMs;
|
||||||
|
private final String intervalDateMath; // ex: +1DAY
|
||||||
|
private final TimeZone timeZone;
|
||||||
|
|
||||||
|
public TimeRoutedAlias(String aliasName, Map<String, String> aliasMetadata) {
|
||||||
|
this.aliasName = aliasName;
|
||||||
|
final MapSolrParams params = new MapSolrParams(aliasMetadata); // for convenience
|
||||||
|
final RequiredSolrParams required = params.required();
|
||||||
|
if (!"time".equals(required.get(ROUTER_TYPE_NAME))) {
|
||||||
|
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Only 'time' routed aliases is supported right now.");
|
||||||
|
}
|
||||||
|
routeField = required.get(ROUTER_FIELD);
|
||||||
|
intervalDateMath = required.get(ROUTER_INTERVAL);
|
||||||
|
|
||||||
|
//optional:
|
||||||
|
maxFutureMs = params.getLong(ROUTER_MAX_FUTURE, TimeUnit.MINUTES.toMillis(10));
|
||||||
|
timeZone = TimeZoneUtils.parseTimezone(aliasMetadata.get(CommonParams.TZ));
|
||||||
|
|
||||||
|
// More validation:
|
||||||
|
|
||||||
|
// check that the interval is valid date math
|
||||||
|
try {
|
||||||
|
new DateMathParser(timeZone).parseMath(intervalDateMath);
|
||||||
|
} catch (ParseException e) {
|
||||||
|
throw new SolrException(BAD_REQUEST, "bad " + TimeRoutedAlias.ROUTER_INTERVAL + ", " + e, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (maxFutureMs < 0) {
|
||||||
|
throw new SolrException(BAD_REQUEST, ROUTER_MAX_FUTURE + " must be >= 0");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getAliasName() {
|
||||||
|
return aliasName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRouteField() {
|
||||||
|
return routeField;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMaxFutureMs() {
|
||||||
|
return maxFutureMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getIntervalDateMath() {
|
||||||
|
return intervalDateMath;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TimeZone getTimeZone() {
|
||||||
|
return timeZone;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return Objects.toStringHelper(this)
|
||||||
|
.add("aliasName", aliasName)
|
||||||
|
.add("routeField", routeField)
|
||||||
|
.add("maxFutureMs", maxFutureMs)
|
||||||
|
.add("intervalDateMath", intervalDateMath)
|
||||||
|
.add("timeZone", timeZone)
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Parses the timestamp from the collection list and returns them in reverse sorted order (most recent 1st) */
|
||||||
|
public List<Map.Entry<Instant,String>> parseCollections(Aliases aliases, Supplier<SolrException> aliasNotExist) {
|
||||||
|
final List<String> collections = aliases.getCollectionAliasListMap().get(aliasName);
|
||||||
|
if (collections == null) {
|
||||||
|
throw aliasNotExist.get();
|
||||||
|
}
|
||||||
|
// note: I considered TreeMap but didn't like the log(N) just to grab the most recent when we use it later
|
||||||
|
List<Map.Entry<Instant,String>> result = new ArrayList<>(collections.size());
|
||||||
|
for (String collection : collections) {
|
||||||
|
Instant colStartTime = parseInstantFromCollectionName(aliasName, collection);
|
||||||
|
result.add(new AbstractMap.SimpleImmutableEntry<>(colStartTime, collection));
|
||||||
|
}
|
||||||
|
result.sort((e1, e2) -> e2.getKey().compareTo(e1.getKey())); // reverse sort by key
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Computes the timestamp of the next collection given the timestamp of the one before. */
|
||||||
|
public Instant computeNextCollTimestamp(Instant fromTimestamp) {
|
||||||
|
final Instant nextCollTimestamp =
|
||||||
|
DateMathParser.parseMath(Date.from(fromTimestamp), "NOW" + intervalDateMath, timeZone).toInstant();
|
||||||
|
assert nextCollTimestamp.isAfter(fromTimestamp);
|
||||||
|
return nextCollTimestamp;
|
||||||
|
}
|
||||||
|
}
|
|
@ -102,9 +102,9 @@ import static org.apache.solr.client.solrj.response.RequestStatusState.NOT_FOUND
|
||||||
import static org.apache.solr.client.solrj.response.RequestStatusState.RUNNING;
|
import static org.apache.solr.client.solrj.response.RequestStatusState.RUNNING;
|
||||||
import static org.apache.solr.client.solrj.response.RequestStatusState.SUBMITTED;
|
import static org.apache.solr.client.solrj.response.RequestStatusState.SUBMITTED;
|
||||||
import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
|
import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
|
||||||
import static org.apache.solr.cloud.api.collections.CreateAliasCmd.CREATE_COLLECTION_PREFIX;
|
import static org.apache.solr.cloud.api.collections.TimeRoutedAlias.CREATE_COLLECTION_PREFIX;
|
||||||
import static org.apache.solr.cloud.api.collections.CreateAliasCmd.OPTIONAL_ROUTER_PARAMS;
|
import static org.apache.solr.cloud.api.collections.TimeRoutedAlias.OPTIONAL_ROUTER_PARAMS;
|
||||||
import static org.apache.solr.cloud.api.collections.CreateAliasCmd.REQUIRED_ROUTER_PARAMS;
|
import static org.apache.solr.cloud.api.collections.TimeRoutedAlias.REQUIRED_ROUTER_PARAMS;
|
||||||
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.COLL_CONF;
|
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.COLL_CONF;
|
||||||
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
|
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
|
||||||
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET;
|
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET;
|
||||||
|
|
|
@ -19,31 +19,22 @@ package org.apache.solr.update.processor;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.text.ParseException;
|
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.time.ZoneOffset;
|
|
||||||
import java.time.format.DateTimeFormatter;
|
|
||||||
import java.time.format.DateTimeFormatterBuilder;
|
|
||||||
import java.time.temporal.ChronoField;
|
|
||||||
import java.util.AbstractMap;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TimeZone;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Supplier;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.solr.cloud.Overseer;
|
import org.apache.solr.cloud.Overseer;
|
||||||
import org.apache.solr.cloud.ZkController;
|
import org.apache.solr.cloud.ZkController;
|
||||||
import org.apache.solr.cloud.api.collections.RoutedAliasCreateCollectionCmd;
|
import org.apache.solr.cloud.api.collections.RoutedAliasCreateCollectionCmd;
|
||||||
|
import org.apache.solr.cloud.api.collections.TimeRoutedAlias;
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.common.cloud.Aliases;
|
import org.apache.solr.common.cloud.Aliases;
|
||||||
import org.apache.solr.common.cloud.Replica;
|
import org.apache.solr.common.cloud.Replica;
|
||||||
|
@ -51,7 +42,6 @@ import org.apache.solr.common.cloud.Slice;
|
||||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||||
import org.apache.solr.common.params.CollectionParams;
|
import org.apache.solr.common.params.CollectionParams;
|
||||||
import org.apache.solr.common.params.CommonParams;
|
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
import org.apache.solr.common.params.SolrParams;
|
import org.apache.solr.common.params.SolrParams;
|
||||||
import org.apache.solr.common.params.UpdateParams;
|
import org.apache.solr.common.params.UpdateParams;
|
||||||
|
@ -65,8 +55,6 @@ import org.apache.solr.update.CommitUpdateCommand;
|
||||||
import org.apache.solr.update.DeleteUpdateCommand;
|
import org.apache.solr.update.DeleteUpdateCommand;
|
||||||
import org.apache.solr.update.SolrCmdDistributor;
|
import org.apache.solr.update.SolrCmdDistributor;
|
||||||
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
|
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
|
||||||
import org.apache.solr.util.DateMathParser;
|
|
||||||
import org.apache.solr.util.TimeZoneUtils;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -89,19 +77,6 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
||||||
//TODO do we make this more generic to others who want to partition collections using something else?
|
//TODO do we make this more generic to others who want to partition collections using something else?
|
||||||
|
|
||||||
public static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param
|
public static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param
|
||||||
public static final String TIME_PARTITION_ALIAS_NAME_CORE_PROP = "timePartitionAliasName"; // core prop
|
|
||||||
// alias metadata:
|
|
||||||
public static final String ROUTER_FIELD_METADATA = "router.field";
|
|
||||||
public static final String ROUTER_MAX_FUTURE_TIME_METADATA = "router.maxFutureMs";
|
|
||||||
public static final String ROUTER_INTERVAL_METADATA = "router.interval";
|
|
||||||
|
|
||||||
// This format must be compatible with collection name limitations
|
|
||||||
public static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
|
|
||||||
.append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("[_HH[_mm[_ss]]]") //brackets mean optional
|
|
||||||
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
|
|
||||||
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
|
|
||||||
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
|
|
||||||
.toFormatter(Locale.ROOT).withZone(ZoneOffset.UTC);
|
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
@ -109,11 +84,8 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
||||||
private static ConcurrentHashMap<String, Semaphore> aliasToSemaphoreMap = new ConcurrentHashMap<>(4);
|
private static ConcurrentHashMap<String, Semaphore> aliasToSemaphoreMap = new ConcurrentHashMap<>(4);
|
||||||
|
|
||||||
private final String thisCollection;
|
private final String thisCollection;
|
||||||
private final String aliasName;
|
|
||||||
private final String routeField;
|
private final TimeRoutedAlias timeRoutedAlias;
|
||||||
private final long maxFutureMs;
|
|
||||||
private final String intervalDateMath;
|
|
||||||
private final TimeZone intervalTimeZone;
|
|
||||||
|
|
||||||
private final ZkController zkController;
|
private final ZkController zkController;
|
||||||
private final SolrCmdDistributor cmdDistrib;
|
private final SolrCmdDistributor cmdDistrib;
|
||||||
|
@ -125,19 +97,19 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
||||||
|
|
||||||
public static UpdateRequestProcessor wrap(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
|
public static UpdateRequestProcessor wrap(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
|
||||||
//TODO get from "Collection property"
|
//TODO get from "Collection property"
|
||||||
final String timePartitionAliasName = req.getCore().getCoreDescriptor()
|
final String aliasName = req.getCore().getCoreDescriptor()
|
||||||
.getCoreProperty(TIME_PARTITION_ALIAS_NAME_CORE_PROP, null);
|
.getCoreProperty(TimeRoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, null);
|
||||||
final DistribPhase shardDistribPhase =
|
final DistribPhase shardDistribPhase =
|
||||||
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
|
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
|
||||||
final DistribPhase aliasDistribPhase =
|
final DistribPhase aliasDistribPhase =
|
||||||
DistribPhase.parseParam(req.getParams().get(ALIAS_DISTRIB_UPDATE_PARAM));
|
DistribPhase.parseParam(req.getParams().get(ALIAS_DISTRIB_UPDATE_PARAM));
|
||||||
if (timePartitionAliasName == null || aliasDistribPhase != DistribPhase.NONE || shardDistribPhase != DistribPhase.NONE) {
|
if (aliasName == null || aliasDistribPhase != DistribPhase.NONE || shardDistribPhase != DistribPhase.NONE) {
|
||||||
// if aliasDistribPhase is not NONE, then there is no further collection routing to be done here.
|
// if aliasDistribPhase is not NONE, then there is no further collection routing to be done here.
|
||||||
// TODO this may eventually not be true but at the moment it is
|
// TODO this may eventually not be true but at the moment it is
|
||||||
// if shardDistribPhase is not NONE, then the phase is after the scope of this URP
|
// if shardDistribPhase is not NONE, then the phase is after the scope of this URP
|
||||||
return next;
|
return next;
|
||||||
} else {
|
} else {
|
||||||
return new TimeRoutedAliasUpdateProcessor(req, rsp, next, timePartitionAliasName, aliasDistribPhase);
|
return new TimeRoutedAliasUpdateProcessor(req, rsp, next, aliasName, aliasDistribPhase);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -148,7 +120,6 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
||||||
assert aliasDistribPhase == DistribPhase.NONE;
|
assert aliasDistribPhase == DistribPhase.NONE;
|
||||||
final SolrCore core = req.getCore();
|
final SolrCore core = req.getCore();
|
||||||
this.thisCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
|
this.thisCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
|
||||||
this.aliasName = aliasName;
|
|
||||||
CoreContainer cc = core.getCoreContainer();
|
CoreContainer cc = core.getCoreContainer();
|
||||||
zkController = cc.getZkController();
|
zkController = cc.getZkController();
|
||||||
cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
|
cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
|
||||||
|
@ -158,15 +129,11 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
||||||
if (aliasMetadata == null) {
|
if (aliasMetadata == null) {
|
||||||
throw newAliasMustExistException(); // if it did exist, we'd have a non-null map
|
throw newAliasMustExistException(); // if it did exist, we'd have a non-null map
|
||||||
}
|
}
|
||||||
routeField = aliasMetadata.get(ROUTER_FIELD_METADATA);
|
try {
|
||||||
intervalDateMath = aliasMetadata.getOrDefault(ROUTER_INTERVAL_METADATA, "+1DAY");
|
this.timeRoutedAlias = new TimeRoutedAlias(aliasName, aliasMetadata);
|
||||||
String futureTimeStr = aliasMetadata.get(ROUTER_MAX_FUTURE_TIME_METADATA);
|
} catch (Exception e) { // ensure we throw SERVER_ERROR not BAD_REQUEST at this stage
|
||||||
if (futureTimeStr != null) {
|
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Routed alias has invalid metadata: " + e, e);
|
||||||
maxFutureMs = Long.parseLong(futureTimeStr);
|
|
||||||
} else {
|
|
||||||
maxFutureMs = TimeUnit.MINUTES.toMillis(10);
|
|
||||||
}
|
}
|
||||||
intervalTimeZone = TimeZoneUtils.parseTimezone(aliasMetadata.get(CommonParams.TZ));
|
|
||||||
|
|
||||||
ModifiableSolrParams outParams = new ModifiableSolrParams(req.getParams());
|
ModifiableSolrParams outParams = new ModifiableSolrParams(req.getParams());
|
||||||
// Don't distribute these params; they will be distributed from the local processCommit separately.
|
// Don't distribute these params; they will be distributed from the local processCommit separately.
|
||||||
|
@ -185,9 +152,13 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
||||||
outParamsToLeader = outParams;
|
outParamsToLeader = outParams;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getAliasName() {
|
||||||
|
return timeRoutedAlias.getAliasName();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void processAdd(AddUpdateCommand cmd) throws IOException {
|
public void processAdd(AddUpdateCommand cmd) throws IOException {
|
||||||
final Object routeValue = cmd.getSolrInputDocument().getFieldValue(routeField);
|
final Object routeValue = cmd.getSolrInputDocument().getFieldValue(timeRoutedAlias.getRouteField());
|
||||||
final Instant routeTimestamp = parseRouteKey(routeValue);
|
final Instant routeTimestamp = parseRouteKey(routeValue);
|
||||||
|
|
||||||
updateParsedCollectionAliases();
|
updateParsedCollectionAliases();
|
||||||
|
@ -197,7 +168,7 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
||||||
|
|
||||||
if (targetCollection == null) {
|
if (targetCollection == null) {
|
||||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||||
"Doc " + cmd.getPrintableId() + " couldn't be routed with " + routeField + "=" + routeTimestamp);
|
"Doc " + cmd.getPrintableId() + " couldn't be routed with " + timeRoutedAlias.getRouteField() + "=" + routeTimestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note: the following rule is tempting but not necessary and is not compatible with
|
// Note: the following rule is tempting but not necessary and is not compatible with
|
||||||
|
@ -217,15 +188,15 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check the doc isn't too far in the future
|
// Check the doc isn't too far in the future
|
||||||
final Instant maxFutureTime = Instant.now().plusMillis(maxFutureMs);
|
final Instant maxFutureTime = Instant.now().plusMillis(timeRoutedAlias.getMaxFutureMs());
|
||||||
if (routeTimestamp.isAfter(maxFutureTime)) {
|
if (routeTimestamp.isAfter(maxFutureTime)) {
|
||||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||||
"The document's time routed key of " + routeValue + " is too far in the future given " +
|
"The document's time routed key of " + routeValue + " is too far in the future given " +
|
||||||
ROUTER_MAX_FUTURE_TIME_METADATA + "=" + maxFutureMs);
|
TimeRoutedAlias.ROUTER_MAX_FUTURE + "=" + timeRoutedAlias.getMaxFutureMs());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new collection?
|
// Create a new collection?
|
||||||
final Instant nextCollTimestamp = computeNextCollTimestamp(mostRecentCollTimestamp, intervalDateMath, intervalTimeZone);
|
final Instant nextCollTimestamp = timeRoutedAlias.computeNextCollTimestamp(mostRecentCollTimestamp);
|
||||||
if (routeTimestamp.isBefore(nextCollTimestamp)) {
|
if (routeTimestamp.isBefore(nextCollTimestamp)) {
|
||||||
break; // thus we don't need another collection
|
break; // thus we don't need another collection
|
||||||
}
|
}
|
||||||
|
@ -251,22 +222,6 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Computes the timestamp of the next collection given the timestamp of the one before. */
|
|
||||||
public static Instant computeNextCollTimestamp(Instant fromTimestamp, String intervalDateMath, TimeZone intervalTimeZone) {
|
|
||||||
//TODO overload DateMathParser.parseMath to take tz and "now"
|
|
||||||
final DateMathParser dateMathParser = new DateMathParser(intervalTimeZone);
|
|
||||||
dateMathParser.setNow(Date.from(fromTimestamp));
|
|
||||||
final Instant nextCollTimestamp;
|
|
||||||
try {
|
|
||||||
nextCollTimestamp = dateMathParser.parseMath(intervalDateMath).toInstant();
|
|
||||||
} catch (ParseException e) {
|
|
||||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
|
||||||
"Invalid Date Math String:'" + intervalDateMath +'\'', e);
|
|
||||||
}
|
|
||||||
assert nextCollTimestamp.isAfter(fromTimestamp);
|
|
||||||
return nextCollTimestamp;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Instant parseRouteKey(Object routeKey) {
|
private Instant parseRouteKey(Object routeKey) {
|
||||||
final Instant docTimestamp;
|
final Instant docTimestamp;
|
||||||
if (routeKey instanceof Instant) {
|
if (routeKey instanceof Instant) {
|
||||||
|
@ -290,9 +245,9 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
||||||
final Aliases aliases = zkController.getZkStateReader().getAliases(); // note: might be different from last request
|
final Aliases aliases = zkController.getZkStateReader().getAliases(); // note: might be different from last request
|
||||||
if (this.parsedCollectionsAliases != aliases) {
|
if (this.parsedCollectionsAliases != aliases) {
|
||||||
if (this.parsedCollectionsAliases != null) {
|
if (this.parsedCollectionsAliases != null) {
|
||||||
log.debug("Observing possibly updated alias: {}", aliasName);
|
log.debug("Observing possibly updated alias: {}", getAliasName());
|
||||||
}
|
}
|
||||||
this.parsedCollectionsDesc = parseCollections(aliasName, aliases, this::newAliasMustExistException);
|
this.parsedCollectionsDesc = timeRoutedAlias.parseCollections(aliases, this::newAliasMustExistException);
|
||||||
this.parsedCollectionsAliases = aliases;
|
this.parsedCollectionsAliases = aliases;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -321,13 +276,13 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
||||||
// Alternatively a Lock or CountDownLatch could have been used but they didn't seem
|
// Alternatively a Lock or CountDownLatch could have been used but they didn't seem
|
||||||
// to make it any easier.
|
// to make it any easier.
|
||||||
|
|
||||||
final Semaphore semaphore = aliasToSemaphoreMap.computeIfAbsent(aliasName, n -> new Semaphore(1));
|
final Semaphore semaphore = aliasToSemaphoreMap.computeIfAbsent(getAliasName(), n -> new Semaphore(1));
|
||||||
if (semaphore.tryAcquire()) {
|
if (semaphore.tryAcquire()) {
|
||||||
try {
|
try {
|
||||||
final String operation = CollectionParams.CollectionAction.ROUTEDALIAS_CREATECOLL.toLower();
|
final String operation = CollectionParams.CollectionAction.ROUTEDALIAS_CREATECOLL.toLower();
|
||||||
Map<String, Object> msg = new HashMap<>();
|
Map<String, Object> msg = new HashMap<>();
|
||||||
msg.put(Overseer.QUEUE_OPERATION, operation);
|
msg.put(Overseer.QUEUE_OPERATION, operation);
|
||||||
msg.put(CollectionParams.NAME, aliasName);
|
msg.put(CollectionParams.NAME, getAliasName());
|
||||||
msg.put(RoutedAliasCreateCollectionCmd.IF_MOST_RECENT_COLL_NAME, mostRecentCollName);
|
msg.put(RoutedAliasCreateCollectionCmd.IF_MOST_RECENT_COLL_NAME, mostRecentCollName);
|
||||||
SolrQueryResponse rsp = new SolrQueryResponse();
|
SolrQueryResponse rsp = new SolrQueryResponse();
|
||||||
try {
|
try {
|
||||||
|
@ -373,44 +328,12 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Parses the timestamp from the collection list and returns them in reverse sorted order (most recent 1st) */
|
|
||||||
public static List<Map.Entry<Instant,String>> parseCollections(String aliasName, Aliases aliases, Supplier<SolrException> aliasNotExist) {
|
|
||||||
final List<String> collections = aliases.getCollectionAliasListMap().get(aliasName);
|
|
||||||
if (collections == null) {
|
|
||||||
throw aliasNotExist.get();
|
|
||||||
}
|
|
||||||
// note: I considered TreeMap but didn't like the log(N) just to grab the most recent when we use it later
|
|
||||||
List<Map.Entry<Instant,String>> result = new ArrayList<>(collections.size());
|
|
||||||
for (String collection : collections) {
|
|
||||||
Instant colStartTime = parseInstantFromCollectionName(aliasName, collection);
|
|
||||||
result.add(new AbstractMap.SimpleImmutableEntry<>(colStartTime, collection));
|
|
||||||
}
|
|
||||||
result.sort((e1, e2) -> e2.getKey().compareTo(e1.getKey())); // reverse sort by key
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
private SolrException newAliasMustExistException() {
|
private SolrException newAliasMustExistException() {
|
||||||
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
|
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
|
||||||
"Collection " + thisCollection + " created for use with alias " + aliasName + " which doesn't exist anymore." +
|
"Collection " + thisCollection + " created for use with alias " + getAliasName() + " which doesn't exist anymore." +
|
||||||
" You cannot write to this unless the alias exists.");
|
" You cannot write to this unless the alias exists.");
|
||||||
}
|
}
|
||||||
|
|
||||||
static Instant parseInstantFromCollectionName(String aliasName, String collection) {
|
|
||||||
final String dateTimePart = collection.substring(aliasName.length() + 1);
|
|
||||||
return DATE_TIME_FORMATTER.parse(dateTimePart, Instant::from);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static String formatCollectionNameFromInstant(String aliasName, Instant timestamp) {
|
|
||||||
String nextCollName = TimeRoutedAliasUpdateProcessor.DATE_TIME_FORMATTER.format(timestamp);
|
|
||||||
for (int i = 0; i < 3; i++) { // chop off seconds, minutes, hours
|
|
||||||
if (nextCollName.endsWith("_00")) {
|
|
||||||
nextCollName = nextCollName.substring(0, nextCollName.length()-3);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assert TimeRoutedAliasUpdateProcessor.DATE_TIME_FORMATTER.parse(nextCollName, Instant::from).equals(timestamp);
|
|
||||||
return aliasName + "_" + nextCollName;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
|
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
|
||||||
final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
|
final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
|
||||||
|
@ -453,7 +376,7 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
|
||||||
|
|
||||||
private List<SolrCmdDistributor.Node> lookupShardLeadersOfCollections() {
|
private List<SolrCmdDistributor.Node> lookupShardLeadersOfCollections() {
|
||||||
final Aliases aliases = zkController.getZkStateReader().getAliases();
|
final Aliases aliases = zkController.getZkStateReader().getAliases();
|
||||||
List<String> collections = aliases.getCollectionAliasListMap().get(aliasName);
|
List<String> collections = aliases.getCollectionAliasListMap().get(getAliasName());
|
||||||
if (collections == null) {
|
if (collections == null) {
|
||||||
throw newAliasMustExistException();
|
throw newAliasMustExistException();
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,13 +38,13 @@ import org.apache.solr.client.solrj.SolrClient;
|
||||||
import org.apache.solr.client.solrj.SolrServerException;
|
import org.apache.solr.client.solrj.SolrServerException;
|
||||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.cloud.api.collections.TimeRoutedAlias;
|
||||||
import org.apache.solr.common.cloud.Aliases;
|
import org.apache.solr.common.cloud.Aliases;
|
||||||
import org.apache.solr.common.cloud.CompositeIdRouter;
|
import org.apache.solr.common.cloud.CompositeIdRouter;
|
||||||
import org.apache.solr.common.cloud.DocCollection;
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
import org.apache.solr.common.cloud.ImplicitDocRouter;
|
import org.apache.solr.common.cloud.ImplicitDocRouter;
|
||||||
import org.apache.solr.common.cloud.Replica;
|
import org.apache.solr.common.cloud.Replica;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor;
|
|
||||||
import org.apache.solr.util.DateMathParser;
|
import org.apache.solr.util.DateMathParser;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -139,9 +139,7 @@ public class CreateRoutedAliasTest extends SolrCloudTestCase {
|
||||||
assertSuccess(post);
|
assertSuccess(post);
|
||||||
|
|
||||||
Date startDate = DateMathParser.parseMath(new Date(), "NOW/DAY");
|
Date startDate = DateMathParser.parseMath(new Date(), "NOW/DAY");
|
||||||
String initialCollectionName = TimeRoutedAliasUpdateProcessor
|
String initialCollectionName = TimeRoutedAlias.formatCollectionNameFromInstant(aliasName, startDate.toInstant());
|
||||||
.formatCollectionNameFromInstant(aliasName, startDate.toInstant()
|
|
||||||
);
|
|
||||||
// small chance could fail due to "NOW"; see above
|
// small chance could fail due to "NOW"; see above
|
||||||
assertCollectionExists(initialCollectionName);
|
assertCollectionExists(initialCollectionName);
|
||||||
|
|
||||||
|
@ -198,9 +196,7 @@ public class CreateRoutedAliasTest extends SolrCloudTestCase {
|
||||||
"&create-collection.replicationFactor=2");
|
"&create-collection.replicationFactor=2");
|
||||||
assertSuccess(get);
|
assertSuccess(get);
|
||||||
|
|
||||||
String initialCollectionName = TimeRoutedAliasUpdateProcessor
|
String initialCollectionName = TimeRoutedAlias.formatCollectionNameFromInstant(aliasName, start);
|
||||||
.formatCollectionNameFromInstant(aliasName, start
|
|
||||||
);
|
|
||||||
assertCollectionExists(initialCollectionName);
|
assertCollectionExists(initialCollectionName);
|
||||||
|
|
||||||
// Test created collection:
|
// Test created collection:
|
||||||
|
@ -271,7 +267,7 @@ public class CreateRoutedAliasTest extends SolrCloudTestCase {
|
||||||
"&router.interval=%2B30MINUTE" +
|
"&router.interval=%2B30MINUTE" +
|
||||||
"&create-collection.collection.configName=_default" +
|
"&create-collection.collection.configName=_default" +
|
||||||
"&create-collection.numShards=1");
|
"&create-collection.numShards=1");
|
||||||
assertFailure(get, "Only time based routing is supported");
|
assertFailure(get, "Only 'time' routed aliases is supported right now");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -320,7 +316,7 @@ public class CreateRoutedAliasTest extends SolrCloudTestCase {
|
||||||
"&router.max-future-ms=-60000" + // bad: negative
|
"&router.max-future-ms=-60000" + // bad: negative
|
||||||
"&create-collection.collection.configName=_default" +
|
"&create-collection.collection.configName=_default" +
|
||||||
"&create-collection.numShards=1");
|
"&create-collection.numShards=1");
|
||||||
assertFailure(get, "router.max-future-ms must be a valid long integer");
|
assertFailure(get, "must be >= 0");
|
||||||
}
|
}
|
||||||
@Test
|
@Test
|
||||||
public void testUnParseableFutureFails() throws Exception {
|
public void testUnParseableFutureFails() throws Exception {
|
||||||
|
@ -333,10 +329,10 @@ public class CreateRoutedAliasTest extends SolrCloudTestCase {
|
||||||
"&router.name=time" +
|
"&router.name=time" +
|
||||||
"&router.start=2018-01-15T00:00:00Z" +
|
"&router.start=2018-01-15T00:00:00Z" +
|
||||||
"&router.interval=%2B30MINUTE" +
|
"&router.interval=%2B30MINUTE" +
|
||||||
"&router.max-future-ms=SixtyThousandMiliseconds" + // bad
|
"&router.max-future-ms=SixtyThousandMilliseconds" + // bad
|
||||||
"&create-collection.collection.configName=_default" +
|
"&create-collection.collection.configName=_default" +
|
||||||
"&create-collection.numShards=1");
|
"&create-collection.numShards=1");
|
||||||
assertFailure(get, "router.max-future-ms must be a valid long integer");
|
assertFailure(get, "SixtyThousandMilliseconds"); //TODO improve SolrParams.getLong
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertSuccess(HttpUriRequest msg) throws IOException {
|
private void assertSuccess(HttpUriRequest msg) throws IOException {
|
||||||
|
|
|
@ -41,10 +41,10 @@ import org.apache.solr.client.solrj.response.FieldStatsInfo;
|
||||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||||
import org.apache.solr.client.solrj.response.UpdateResponse;
|
import org.apache.solr.client.solrj.response.UpdateResponse;
|
||||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
|
import org.apache.solr.cloud.api.collections.TimeRoutedAlias;
|
||||||
import org.apache.solr.common.SolrDocumentList;
|
import org.apache.solr.common.SolrDocumentList;
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.common.SolrInputDocument;
|
import org.apache.solr.common.SolrInputDocument;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
|
||||||
import org.apache.solr.common.util.ExecutorUtil;
|
import org.apache.solr.common.util.ExecutorUtil;
|
||||||
import org.apache.solr.common.util.NamedList;
|
import org.apache.solr.common.util.NamedList;
|
||||||
import org.apache.solr.request.SolrQueryRequest;
|
import org.apache.solr.request.SolrQueryRequest;
|
||||||
|
@ -94,7 +94,6 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
||||||
// manipulate the config...
|
// manipulate the config...
|
||||||
|
|
||||||
String conf = "{" +
|
String conf = "{" +
|
||||||
" 'set-user-property' : {'timePartitionAliasName':'" + alias + "'}," + // no data driven
|
|
||||||
" 'set-user-property' : {'update.autoCreateFields':false}," + // no data driven
|
" 'set-user-property' : {'update.autoCreateFields':false}," + // no data driven
|
||||||
" 'add-updateprocessor' : {" +
|
" 'add-updateprocessor' : {" +
|
||||||
" 'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" +
|
" 'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" +
|
||||||
|
@ -122,11 +121,12 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
||||||
.contains(configName)
|
.contains(configName)
|
||||||
);
|
);
|
||||||
|
|
||||||
// start with one collection and an alias for it
|
// Start with one collection manually created (and use higher numShards & replicas than we'll use for others)
|
||||||
|
// This tests we may pre-create the collection and it's acceptable.
|
||||||
final String col23rd = alias + "_2017-10-23";
|
final String col23rd = alias + "_2017-10-23";
|
||||||
CollectionAdminRequest.createCollection(col23rd, configName, 2, 2)
|
CollectionAdminRequest.createCollection(col23rd, configName, 2, 2)
|
||||||
.setMaxShardsPerNode(2)
|
.setMaxShardsPerNode(2)
|
||||||
.withProperty(TimeRoutedAliasUpdateProcessor.TIME_PARTITION_ALIAS_NAME_CORE_PROP, alias)
|
.withProperty(TimeRoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, alias)
|
||||||
.process(solrClient);
|
.process(solrClient);
|
||||||
|
|
||||||
List<String> retrievedConfigSetNames = new ConfigSetAdminRequest.List().process(solrClient).getConfigSets();
|
List<String> retrievedConfigSetNames = new ConfigSetAdminRequest.List().process(solrClient).getConfigSets();
|
||||||
|
@ -135,16 +135,10 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
||||||
expectedConfigSetNames.size() == retrievedConfigSetNames.size());
|
expectedConfigSetNames.size() == retrievedConfigSetNames.size());
|
||||||
assertTrue("ConfigNames should be :" + expectedConfigSetNames, expectedConfigSetNames.containsAll(retrievedConfigSetNames) && retrievedConfigSetNames.containsAll(expectedConfigSetNames));
|
assertTrue("ConfigNames should be :" + expectedConfigSetNames, expectedConfigSetNames.containsAll(retrievedConfigSetNames) && retrievedConfigSetNames.containsAll(expectedConfigSetNames));
|
||||||
|
|
||||||
CollectionAdminRequest.createAlias(alias, col23rd).process(solrClient);
|
CollectionAdminRequest.createTimeRoutedAlias(alias, "2017-10-23T00:00:00Z", "+1DAY", timeField,
|
||||||
//TODO use SOLR-11617 client API to set alias metadata
|
CollectionAdminRequest.createCollection("_unused_", configName, 1, 1)
|
||||||
final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
|
.setMaxShardsPerNode(2))
|
||||||
|
.process(solrClient);
|
||||||
zkStateReader.aliasesHolder.applyModificationAndExportToZk(a ->
|
|
||||||
a.cloneWithCollectionAliasMetadata(alias, TimeRoutedAliasUpdateProcessor.ROUTER_FIELD_METADATA, timeField)
|
|
||||||
.cloneWithCollectionAliasMetadata(alias, "create-collection.collection.configName", configName)
|
|
||||||
.cloneWithCollectionAliasMetadata(alias, "create-collection.numShards", "1")
|
|
||||||
.cloneWithCollectionAliasMetadata(alias, "create-collection.replicationFactor", "1")
|
|
||||||
.cloneWithCollectionAliasMetadata(alias, "router.interval", "+1DAY"));
|
|
||||||
|
|
||||||
// now we index a document
|
// now we index a document
|
||||||
assertUpdateResponse(solrClient.add(alias, newDoc(Instant.parse("2017-10-23T00:00:00Z"))));
|
assertUpdateResponse(solrClient.add(alias, newDoc(Instant.parse("2017-10-23T00:00:00Z"))));
|
||||||
|
@ -158,12 +152,12 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
||||||
// a document which is too far into the future
|
// a document which is too far into the future
|
||||||
testFailedDocument(Instant.now().plus(30, ChronoUnit.MINUTES), "too far in the future");
|
testFailedDocument(Instant.now().plus(30, ChronoUnit.MINUTES), "too far in the future");
|
||||||
|
|
||||||
// add another collection, add to alias (soonest comes first)
|
// add another collection with the precise name we expect, but don't add to alias explicitly. When we add a document
|
||||||
|
// destined for this collection, Solr will see it already exists and add it to the alias.
|
||||||
final String col24th = alias + "_2017-10-24";
|
final String col24th = alias + "_2017-10-24";
|
||||||
CollectionAdminRequest.createCollection(col24th, configName, 1, 1) // more shards and replicas now
|
CollectionAdminRequest.createCollection(col24th, configName, 1, 1) // more shards and replicas now
|
||||||
.withProperty("timePartitionAliasName", alias)
|
.withProperty(TimeRoutedAlias.ROUTED_ALIAS_NAME_CORE_PROP, alias)
|
||||||
.process(solrClient);
|
.process(solrClient);
|
||||||
CollectionAdminRequest.createAlias(alias, col24th + "," + col23rd).process(solrClient);
|
|
||||||
|
|
||||||
// index 3 documents in a random fashion
|
// index 3 documents in a random fashion
|
||||||
addDocsAndCommit(
|
addDocsAndCommit(
|
||||||
|
@ -305,7 +299,7 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
||||||
int totalNumFound = 0;
|
int totalNumFound = 0;
|
||||||
Instant colEndInstant = null; // exclusive end
|
Instant colEndInstant = null; // exclusive end
|
||||||
for (String col : cols) { // ASSUMPTION: reverse sorted order
|
for (String col : cols) { // ASSUMPTION: reverse sorted order
|
||||||
final Instant colStartInstant = TimeRoutedAliasUpdateProcessor.parseInstantFromCollectionName(alias, col);
|
final Instant colStartInstant = TimeRoutedAlias.parseInstantFromCollectionName(alias, col);
|
||||||
final QueryResponse colStatsResp = solrClient.query(col, params(
|
final QueryResponse colStatsResp = solrClient.query(col, params(
|
||||||
"q", "*:*",
|
"q", "*:*",
|
||||||
"rows", "0",
|
"rows", "0",
|
||||||
|
@ -336,13 +330,13 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
|
||||||
@Test
|
@Test
|
||||||
public void testParse() {
|
public void testParse() {
|
||||||
assertEquals(Instant.parse("2017-10-02T03:04:05Z"),
|
assertEquals(Instant.parse("2017-10-02T03:04:05Z"),
|
||||||
TimeRoutedAliasUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03_04_05"));
|
TimeRoutedAlias.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03_04_05"));
|
||||||
assertEquals(Instant.parse("2017-10-02T03:04:00Z"),
|
assertEquals(Instant.parse("2017-10-02T03:04:00Z"),
|
||||||
TimeRoutedAliasUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03_04"));
|
TimeRoutedAlias.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03_04"));
|
||||||
assertEquals(Instant.parse("2017-10-02T03:00:00Z"),
|
assertEquals(Instant.parse("2017-10-02T03:00:00Z"),
|
||||||
TimeRoutedAliasUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03"));
|
TimeRoutedAlias.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03"));
|
||||||
assertEquals(Instant.parse("2017-10-02T00:00:00Z"),
|
assertEquals(Instant.parse("2017-10-02T00:00:00Z"),
|
||||||
TimeRoutedAliasUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02"));
|
TimeRoutedAlias.parseInstantFromCollectionName(alias, alias + "_2017-10-02"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class IncrementURPFactory extends FieldMutatingUpdateProcessorFactory {
|
public static class IncrementURPFactory extends FieldMutatingUpdateProcessorFactory {
|
||||||
|
|
|
@ -1381,11 +1381,11 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
||||||
// to allow this stuff not to be duplicated. (this is pasted from CreateAliasCmd.java), however I think
|
// to allow this stuff not to be duplicated. (this is pasted from CreateAliasCmd.java), however I think
|
||||||
// a comprehensive cleanup of this for all the requests in this class should be done as a separate ticket.
|
// a comprehensive cleanup of this for all the requests in this class should be done as a separate ticket.
|
||||||
|
|
||||||
public static final String ROUTING_TYPE = "router.name";
|
public static final String ROUTER_TYPE_NAME = "router.name";
|
||||||
public static final String ROUTING_FIELD = "router.field";
|
public static final String ROUTER_FIELD = "router.field";
|
||||||
public static final String ROUTING_START = "router.start";
|
public static final String ROUTER_START = "router.start";
|
||||||
public static final String ROUTING_INCREMENT = "router.interval";
|
public static final String ROUTER_INTERVAL = "router.interval";
|
||||||
public static final String ROUTING_MAX_FUTURE = "router.max-future-ms";
|
public static final String ROUTER_MAX_FUTURE = "router.max-future-ms";
|
||||||
|
|
||||||
private final String aliasName;
|
private final String aliasName;
|
||||||
private final String routerField;
|
private final String routerField;
|
||||||
|
@ -1422,15 +1422,15 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
||||||
public SolrParams getParams() {
|
public SolrParams getParams() {
|
||||||
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
|
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
|
||||||
params.add(CommonParams.NAME, aliasName);
|
params.add(CommonParams.NAME, aliasName);
|
||||||
params.add(ROUTING_TYPE, "time");
|
params.add(ROUTER_TYPE_NAME, "time");
|
||||||
params.add(ROUTING_FIELD, routerField);
|
params.add(ROUTER_FIELD, routerField);
|
||||||
params.add(ROUTING_START, start);
|
params.add(ROUTER_START, start);
|
||||||
params.add(ROUTING_INCREMENT, interval);
|
params.add(ROUTER_INTERVAL, interval);
|
||||||
if (tz != null) {
|
if (tz != null) {
|
||||||
params.add(CommonParams.TZ, tz.getID());
|
params.add(CommonParams.TZ, tz.getID());
|
||||||
}
|
}
|
||||||
if (maxFutureMs != null) {
|
if (maxFutureMs != null) {
|
||||||
params.add(ROUTING_MAX_FUTURE, ""+maxFutureMs);
|
params.add(ROUTER_MAX_FUTURE, ""+maxFutureMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
// merge the above with collectionParams. Above takes precedence.
|
// merge the above with collectionParams. Above takes precedence.
|
||||||
|
|
Loading…
Reference in New Issue