mirror of https://github.com/apache/lucene.git
SOLR-11542: TimePartitionedUpdateProcessor URP
This commit is contained in:
parent
fd1820a430
commit
df5a5f949b
|
@ -72,6 +72,10 @@ New Features
|
|||
* SOLR-11487: Collection Aliases may now have metadata (currently an internal feature).
|
||||
(Gus Heck, David Smiley)
|
||||
|
||||
* SOLR-11542: New TimePartitionedUpdateProcessor URP that routes documents to another collection
|
||||
in the same Alias defined set based on a time field (currently an internal feature).
|
||||
(David Smiley)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -16,10 +16,10 @@
|
|||
*/
|
||||
package org.apache.solr.update.processor;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.response.SolrQueryResponse;
|
||||
|
||||
|
@ -39,27 +39,19 @@ public class DistributedUpdateProcessorFactory
|
|||
* used by any {@link UpdateRequestProcessorFactory#getInstance} call to annotate a
|
||||
* SolrQueryRequest with the names of parameters that should also be forwarded.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static void addParamToDistributedRequestWhitelist(final SolrQueryRequest req, final String... paramNames) {
|
||||
Set<String> whitelist = (Set<String>) req.getContext().get(DistributedUpdateProcessor.PARAM_WHITELIST_CTX_KEY);
|
||||
if (null == whitelist) {
|
||||
whitelist = new TreeSet<String>();
|
||||
req.getContext().put(DistributedUpdateProcessor.PARAM_WHITELIST_CTX_KEY, whitelist);
|
||||
}
|
||||
for (String p : paramNames) {
|
||||
whitelist.add(p);
|
||||
}
|
||||
Set<String> whitelist = (Set<String>) req.getContext()
|
||||
.computeIfAbsent(DistributedUpdateProcessor.PARAM_WHITELIST_CTX_KEY, key -> new TreeSet<>());
|
||||
Collections.addAll(whitelist, paramNames);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(NamedList args) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public DistributedUpdateProcessor getInstance(SolrQueryRequest req,
|
||||
public UpdateRequestProcessor getInstance(SolrQueryRequest req,
|
||||
SolrQueryResponse rsp, UpdateRequestProcessor next) {
|
||||
|
||||
return new DistributedUpdateProcessor(req, rsp, next);
|
||||
// note: will sometimes return DURP (no overhead) instead of wrapping
|
||||
return TimePartitionedUpdateProcessor.wrap(req, rsp,
|
||||
new DistributedUpdateProcessor(req, rsp, next));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,294 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.update.processor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.format.DateTimeFormatterBuilder;
|
||||
import java.time.temporal.ChronoField;
|
||||
import java.util.AbstractMap;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.Aliases;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.params.UpdateParams;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.response.SolrQueryResponse;
|
||||
import org.apache.solr.update.AddUpdateCommand;
|
||||
import org.apache.solr.update.CommitUpdateCommand;
|
||||
import org.apache.solr.update.DeleteUpdateCommand;
|
||||
import org.apache.solr.update.SolrCmdDistributor;
|
||||
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM;
|
||||
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
|
||||
|
||||
/**
|
||||
* Distributes update requests to rolling series of collections partitioned by a timestamp field.
|
||||
*
|
||||
* Depends on this core having a special core property that points to the alias name that this collection is a part of.
|
||||
* And further requires certain metadata on the Alias.
|
||||
*
|
||||
* @since 7.2.0
|
||||
*/
|
||||
public class TimePartitionedUpdateProcessor extends UpdateRequestProcessor {
|
||||
//TODO do we make this more generic to others who want to partition collections using something else?
|
||||
|
||||
// TODO auto add new collection partitions when cross a timestamp boundary. That needs to be coordinated to avoid
|
||||
// race conditions, remembering that even the lead collection might have multiple instances of this URP
|
||||
// (multiple shards or perhaps just multiple streams thus instances of this URP)
|
||||
|
||||
public static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param
|
||||
public static final String TIME_PARTITION_ALIAS_NAME_CORE_PROP = "timePartitionAliasName"; // core prop
|
||||
public static final String ROUTER_FIELD_METADATA = "router.field"; // alias metadata
|
||||
|
||||
// This format must be compatible with collection name limitations
|
||||
private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
|
||||
.append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("[_HH[_mm[_ss]]]") //brackets mean optional
|
||||
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
|
||||
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
|
||||
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
|
||||
.toFormatter(Locale.ROOT).withZone(ZoneOffset.UTC);
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final String thisCollection;
|
||||
private final String aliasName;
|
||||
private final String routeField;
|
||||
|
||||
private final SolrCmdDistributor cmdDistrib;
|
||||
private final ZkController zkController;
|
||||
private final SolrParams outParamsToLeader;
|
||||
|
||||
private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection. Sorted descending
|
||||
private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
|
||||
|
||||
public static UpdateRequestProcessor wrap(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
|
||||
final String timePartitionAliasName = req.getCore().getCoreDescriptor()
|
||||
.getCoreProperty(TIME_PARTITION_ALIAS_NAME_CORE_PROP, null);
|
||||
final DistribPhase shardDistribPhase =
|
||||
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
|
||||
final DistribPhase aliasDistribPhase =
|
||||
DistribPhase.parseParam(req.getParams().get(ALIAS_DISTRIB_UPDATE_PARAM));
|
||||
if (timePartitionAliasName == null || aliasDistribPhase != DistribPhase.NONE || shardDistribPhase != DistribPhase.NONE) {
|
||||
// if aliasDistribPhase is not NONE, then there is no further collection routing to be done here.
|
||||
// TODO this may eventually not be true but at the moment it is
|
||||
// if shardDistribPhase is not NONE, then the phase is after the scope of this URP
|
||||
return next;
|
||||
} else {
|
||||
return new TimePartitionedUpdateProcessor(req, rsp, next, timePartitionAliasName, aliasDistribPhase);
|
||||
}
|
||||
}
|
||||
|
||||
protected TimePartitionedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next,
|
||||
String aliasName,
|
||||
DistribPhase aliasDistribPhase) {
|
||||
super(next);
|
||||
assert aliasDistribPhase == DistribPhase.NONE;
|
||||
final SolrCore core = req.getCore();
|
||||
this.thisCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
|
||||
this.aliasName = aliasName;
|
||||
CoreContainer cc = core.getCoreContainer();
|
||||
zkController = cc.getZkController();
|
||||
cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
|
||||
|
||||
final Map<String, String> aliasMetadata = zkController.getZkStateReader().getAliases().getCollectionAliasMetadata(aliasName);
|
||||
if (aliasMetadata == null) {
|
||||
throw newAliasMustExistException(); // if it did exist, we'd have a non-null map
|
||||
}
|
||||
routeField = aliasMetadata.get(ROUTER_FIELD_METADATA);
|
||||
|
||||
ModifiableSolrParams outParams = new ModifiableSolrParams(req.getParams());
|
||||
// Don't distribute these params; they will be distributed from the local processCommit separately.
|
||||
// (See RequestHandlerUtils.handleCommit from which this list was retrieved from)
|
||||
outParams.remove(UpdateParams.OPTIMIZE);
|
||||
outParams.remove(UpdateParams.COMMIT);
|
||||
outParams.remove(UpdateParams.SOFT_COMMIT);
|
||||
outParams.remove(UpdateParams.PREPARE_COMMIT);
|
||||
outParams.remove(UpdateParams.ROLLBACK);
|
||||
// Add these...
|
||||
// Ensures we skip over URPs prior to DistributedURP (see UpdateRequestProcessorChain)
|
||||
outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.NONE.toString());
|
||||
// Signal this is a distributed search from this URP (see #wrap())
|
||||
outParams.set(ALIAS_DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
|
||||
outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), core.getName()));
|
||||
outParamsToLeader = outParams;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processAdd(AddUpdateCommand cmd) throws IOException {
|
||||
final Object routeValue = cmd.getSolrInputDocument().getFieldValue(routeField);
|
||||
final String targetCollection = findTargetCollectionGivenRouteKey(routeValue);
|
||||
if (targetCollection == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"Doc " + cmd.getPrintableId() + " couldn't be routed with " + routeField + "=" + routeValue);
|
||||
}
|
||||
if (thisCollection.equals(targetCollection)) {
|
||||
// pass on through; we've reached the right collection
|
||||
super.processAdd(cmd);
|
||||
} else {
|
||||
// send to the right collection
|
||||
SolrCmdDistributor.Node targetLeaderNode = lookupShardLeaderOfCollection(targetCollection);
|
||||
cmdDistrib.distribAdd(cmd, Collections.singletonList(targetLeaderNode), new ModifiableSolrParams(outParamsToLeader));
|
||||
}
|
||||
}
|
||||
|
||||
protected String findTargetCollectionGivenRouteKey(Object routeKey) {
|
||||
final Instant docTimestamp;
|
||||
if (routeKey instanceof Instant) {
|
||||
docTimestamp = (Instant) routeKey;
|
||||
} else if (routeKey instanceof Date) {
|
||||
docTimestamp = ((Date)routeKey).toInstant();
|
||||
} else if (routeKey instanceof CharSequence) {
|
||||
docTimestamp = Instant.parse((CharSequence)routeKey);
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected type of routeKey: " + routeKey);
|
||||
}
|
||||
final Aliases aliases = zkController.getZkStateReader().getAliases(); // note: might be different from last request
|
||||
if (this.parsedCollectionsAliases != aliases) {
|
||||
if (this.parsedCollectionsAliases != null) {
|
||||
log.info("Observing possibly updated alias {}", aliasName);
|
||||
}
|
||||
this.parsedCollectionsDesc = doParseCollections(aliases);
|
||||
this.parsedCollectionsAliases = aliases;
|
||||
}
|
||||
// iterates in reverse chronological order
|
||||
// We're O(N) here but N should be small, the loop is fast, and usually looking for 1st.
|
||||
for (Map.Entry<Instant, String> entry : parsedCollectionsDesc) {
|
||||
Instant colStartTime = entry.getKey();
|
||||
if (!docTimestamp.isBefore(colStartTime)) { // i.e. docTimeStamp is >= the colStartTime
|
||||
return entry.getValue(); //found it
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/** Parses the timestamp from the collection list and returns them in reverse sorted order (newest 1st) */
|
||||
private List<Map.Entry<Instant,String>> doParseCollections(Aliases aliases) {
|
||||
final List<String> collections = aliases.getCollectionAliasListMap().get(aliasName);
|
||||
if (collections == null) {
|
||||
throw newAliasMustExistException();
|
||||
}
|
||||
// note: I considered TreeMap but didn't like the log(N) just to grab the head when we use it later
|
||||
List<Map.Entry<Instant,String>> result = new ArrayList<>(collections.size());
|
||||
for (String collection : collections) {
|
||||
Instant colStartTime = parseInstantFromCollectionName(aliasName, collection);
|
||||
result.add(new AbstractMap.SimpleImmutableEntry<>(colStartTime, collection));
|
||||
}
|
||||
result.sort((e1, e2) -> e2.getKey().compareTo(e1.getKey())); // reverse sort by key
|
||||
return result;
|
||||
}
|
||||
|
||||
private SolrException newAliasMustExistException() {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
|
||||
"Collection " + thisCollection + " created for use with alias " + aliasName + " which doesn't exist anymore." +
|
||||
" You cannot write to this unless the alias exists.");
|
||||
}
|
||||
|
||||
static Instant parseInstantFromCollectionName(String aliasName, String collection) {
|
||||
final String dateTimePart = collection.substring(aliasName.length() + 1);
|
||||
return DATE_TIME_FORMATTER.parse(dateTimePart, Instant::from);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
|
||||
final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
|
||||
cmdDistrib.distribDelete(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processCommit(CommitUpdateCommand cmd) throws IOException {
|
||||
final List<SolrCmdDistributor.Node> nodes = lookupShardLeadersOfCollections();
|
||||
cmdDistrib.distribCommit(cmd, nodes, new ModifiableSolrParams(outParamsToLeader));
|
||||
cmdDistrib.blockAndDoRetries(); //TODO shouldn't distribCommit do this implicitly? It doesn't.
|
||||
}
|
||||
|
||||
// Not supported by SolrCmdDistributor and is sketchy any way
|
||||
// @Override
|
||||
// public void processRollback(RollbackUpdateCommand cmd) throws IOException {
|
||||
// }
|
||||
|
||||
@Override
|
||||
public void finish() throws IOException {
|
||||
try {
|
||||
cmdDistrib.finish();
|
||||
final List<SolrCmdDistributor.Error> errors = cmdDistrib.getErrors();
|
||||
if (!errors.isEmpty()) {
|
||||
throw new DistributedUpdateProcessor.DistributedUpdatesAsyncException(errors);
|
||||
}
|
||||
} finally {
|
||||
super.finish();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
try {
|
||||
cmdDistrib.close();
|
||||
} finally {
|
||||
super.doClose();
|
||||
}
|
||||
}
|
||||
|
||||
private List<SolrCmdDistributor.Node> lookupShardLeadersOfCollections() {
|
||||
final Aliases aliases = zkController.getZkStateReader().getAliases();
|
||||
List<String> collections = aliases.getCollectionAliasListMap().get(aliasName);
|
||||
if (collections == null) {
|
||||
throw newAliasMustExistException();
|
||||
}
|
||||
return collections.stream().map(this::lookupShardLeaderOfCollection).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private SolrCmdDistributor.Node lookupShardLeaderOfCollection(String collection) {
|
||||
//TODO consider router to get the right slice. Refactor common code in CloudSolrClient & DistributedUrp
|
||||
final Collection<Slice> activeSlices = zkController.getClusterState().getCollection(collection).getActiveSlices();
|
||||
if (activeSlices.isEmpty()) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Cannot route to collection " + collection);
|
||||
}
|
||||
final Slice slice = activeSlices.iterator().next();
|
||||
//TODO when should we do StdNode vs RetryNode?
|
||||
final Replica leader = slice.getLeader();
|
||||
if (leader == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
|
||||
"No 'leader' replica available for shard " + slice.getName() + " of collection " + collection);
|
||||
}
|
||||
return new SolrCmdDistributor.RetryNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(),
|
||||
collection, null);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,275 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.update.processor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.function.UnaryOperator;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.V2Request;
|
||||
import org.apache.solr.client.solrj.response.FieldStatsInfo;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.client.solrj.response.UpdateResponse;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.SolrDocumentList;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.Aliases;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.response.SolrQueryResponse;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TimePartitionedUpdateProcessorTest extends SolrCloudTestCase {
|
||||
|
||||
static final String configName = "timeConfig";
|
||||
static final String alias = "myalias";
|
||||
static final String timeField = "timestamp";
|
||||
static final String intField = "integer_i";
|
||||
|
||||
static SolrClient solrClient;
|
||||
|
||||
private int lastDocId = 0;
|
||||
private int numDocsDeletedOrFailed = 0;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(2).configure();
|
||||
solrClient = getCloudSolrClient(cluster);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void finish() throws Exception {
|
||||
IOUtils.close(solrClient);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
// First create a config using REST API. To do this, we create a collection with the name of the eventual config.
|
||||
// We configure it, and ultimately delete it the collection, leaving a config with the same name behind.
|
||||
// Then when we create the "real" collections referencing this config.
|
||||
CollectionAdminRequest.createCollection(configName, 1, 1).process(solrClient);
|
||||
// manipulate the config...
|
||||
checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config")
|
||||
.withMethod(SolrRequest.METHOD.POST)
|
||||
.withPayload("{" +
|
||||
" 'set-user-property' : {'timePartitionAliasName':'" + alias + "'}," + // no data driven
|
||||
" 'set-user-property' : {'update.autoCreateFields':false}," + // no data driven
|
||||
" 'add-updateprocessor' : {" +
|
||||
" 'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" +
|
||||
" }," +
|
||||
" 'add-updateprocessor' : {" + // for testing
|
||||
" 'name':'inc', 'class':'" + IncrementURPFactory.class.getName() + "'," +
|
||||
" 'fieldName':'" + intField + "'" +
|
||||
" }," +
|
||||
"}").build()));
|
||||
checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config/params")
|
||||
.withMethod(SolrRequest.METHOD.POST)
|
||||
.withPayload("{" +
|
||||
" 'set' : {" +
|
||||
" '_UPDATE' : {'processor':'inc,tolerant'}" +
|
||||
" }" +
|
||||
"}").build()));
|
||||
CollectionAdminRequest.deleteCollection(configName).process(solrClient);
|
||||
|
||||
// start with one collection and an alias for it
|
||||
final String col23rd = alias + "_2017-10-23";
|
||||
CollectionAdminRequest.createCollection(col23rd, configName, 1, 1)
|
||||
.withProperty(TimePartitionedUpdateProcessor.TIME_PARTITION_ALIAS_NAME_CORE_PROP, alias)
|
||||
.process(solrClient);
|
||||
|
||||
assertEquals("We only expect 2 configSets",
|
||||
Arrays.asList("_default", configName), new ConfigSetAdminRequest.List().process(solrClient).getConfigSets());
|
||||
|
||||
CollectionAdminRequest.createAlias(alias, col23rd).process(solrClient);
|
||||
//TODO use SOLR-11617 client API to set alias metadata
|
||||
final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
|
||||
UnaryOperator<Aliases> op = a -> a.cloneWithCollectionAliasMetadata(alias, TimePartitionedUpdateProcessor.ROUTER_FIELD_METADATA, timeField);
|
||||
zkStateReader.aliasesHolder.applyModificationAndExportToZk(op);
|
||||
|
||||
|
||||
// now we index a document
|
||||
solrClient.add(alias, newDoc(Instant.parse("2017-10-23T00:00:00Z")));
|
||||
solrClient.commit(alias);
|
||||
//assertDocRoutedToCol(lastDocId, col23rd);
|
||||
assertInvariants();
|
||||
|
||||
// a document that is too old (throws exception... if we have a TolerantUpdateProcessor then we see it there)
|
||||
try {
|
||||
final UpdateResponse resp = solrClient.add(alias, newDoc(Instant.parse("2017-10-01T00:00:00Z")));
|
||||
final Object errors = resp.getResponseHeader().get("errors");
|
||||
assertTrue(errors != null && errors.toString().contains("couldn't be routed"));
|
||||
} catch (SolrException e) {
|
||||
assertTrue(e.getMessage().contains("couldn't be routed"));
|
||||
}
|
||||
numDocsDeletedOrFailed++;
|
||||
|
||||
// add another collection, add to alias (soonest comes first)
|
||||
final String col24th = alias + "_2017-10-24";
|
||||
CollectionAdminRequest.createCollection(col24th, configName, 2, 2) // more shards and replicas now
|
||||
.setMaxShardsPerNode(2)
|
||||
.withProperty("timePartitionAliasName", alias)
|
||||
.process(solrClient);
|
||||
CollectionAdminRequest.createAlias(alias, col24th + "," + col23rd).process(solrClient);
|
||||
|
||||
// index 3 documents in a random fashion
|
||||
addDocsAndCommit(
|
||||
newDoc(Instant.parse("2017-10-23T00:00:00Z")),
|
||||
newDoc(Instant.parse("2017-10-24T01:00:00Z")),
|
||||
newDoc(Instant.parse("2017-10-24T02:00:00Z"))
|
||||
);
|
||||
assertInvariants();
|
||||
|
||||
// assert that the IncrementURP has updated all '0' to '1'
|
||||
final SolrDocumentList checkIncResults = solrClient.query(alias, params("q", "NOT " + intField + ":1")).getResults();
|
||||
assertEquals(checkIncResults.toString(), 0, checkIncResults.getNumFound());
|
||||
|
||||
//delete a random document id; ensure we don't find it
|
||||
int idToDelete = 1 + random().nextInt(lastDocId);
|
||||
if (idToDelete == 2) { // #2 didn't make it
|
||||
idToDelete++;
|
||||
}
|
||||
solrClient.deleteById(alias, Integer.toString(idToDelete));
|
||||
solrClient.commit(alias);
|
||||
numDocsDeletedOrFailed++;
|
||||
assertInvariants();
|
||||
}
|
||||
|
||||
private void checkNoError(NamedList<Object> response) {
|
||||
Object errors = response.get("errorMessages");
|
||||
assertNull("" + errors, errors);
|
||||
}
|
||||
|
||||
/** Adds these documents and commits, returning when they are committed.
|
||||
* We randomly go about this in different ways. */
|
||||
private void addDocsAndCommit(SolrInputDocument... solrInputDocuments) throws Exception {
|
||||
// we assume these are not old docs!
|
||||
|
||||
// this is a list of the collections & the alias name. Use to pick randomly where to send.
|
||||
// (it doesn't matter where we send docs since the alias is honored at the URP level)
|
||||
List<String> collections = new ArrayList<>();
|
||||
collections.add(alias);
|
||||
collections.addAll(new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias));
|
||||
|
||||
int commitWithin = random().nextBoolean() ? -1 : 500; // if -1, we commit explicitly instead
|
||||
int numDocsBefore = queryNumDocs();
|
||||
if (random().nextBoolean()) {
|
||||
// send in separate requests
|
||||
for (SolrInputDocument solrInputDocument : solrInputDocuments) {
|
||||
String col = collections.get(random().nextInt(collections.size()));
|
||||
solrClient.add(col, solrInputDocument, commitWithin);
|
||||
}
|
||||
} else {
|
||||
// send in a batch.
|
||||
String col = collections.get(random().nextInt(collections.size()));
|
||||
solrClient.add(col, Arrays.asList(solrInputDocuments), commitWithin);
|
||||
}
|
||||
String col = collections.get(random().nextInt(collections.size()));
|
||||
if (commitWithin == -1) {
|
||||
solrClient.commit(col);
|
||||
} else {
|
||||
// check that it all got committed eventually
|
||||
int numDocs = queryNumDocs();
|
||||
if (numDocs == numDocsBefore + solrInputDocuments.length) {
|
||||
System.err.println("Docs committed sooner than expected. Bug or slow test env?");
|
||||
return;
|
||||
}
|
||||
// wait until it's committed, plus some play time for commit to become visible
|
||||
Thread.sleep(commitWithin + 200);
|
||||
numDocs = queryNumDocs();
|
||||
assertEquals("not committed. Bug or a slow test?",
|
||||
numDocsBefore + solrInputDocuments.length, numDocs);
|
||||
}
|
||||
}
|
||||
|
||||
private int queryNumDocs() throws SolrServerException, IOException {
|
||||
return (int) solrClient.query(alias, params("q", "*:*", "rows", "0")).getResults().getNumFound();
|
||||
}
|
||||
|
||||
private void assertInvariants() throws IOException, SolrServerException {
|
||||
final int expectNumFound = lastDocId - numDocsDeletedOrFailed; //lastDocId is effectively # generated docs
|
||||
|
||||
final List<String> cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias);
|
||||
assert !cols.isEmpty();
|
||||
|
||||
int totalNumFound = 0;
|
||||
Instant colEndInstant = null; // exclusive end
|
||||
for (String col : cols) {
|
||||
final Instant colStartInstant = TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, col);
|
||||
//TODO do this in parallel threads
|
||||
final QueryResponse colStatsResp = solrClient.query(col, params(
|
||||
"q", "*:*",
|
||||
"rows", "0",
|
||||
"stats", "true",
|
||||
"stats.field", timeField));
|
||||
long numFound = colStatsResp.getResults().getNumFound();
|
||||
if (numFound > 0) {
|
||||
totalNumFound += numFound;
|
||||
final FieldStatsInfo timestampStats = colStatsResp.getFieldStatsInfo().get(timeField);
|
||||
assertTrue(colStartInstant.toEpochMilli() <= ((Date)timestampStats.getMin()).getTime());
|
||||
if (colEndInstant != null) {
|
||||
assertTrue(colEndInstant.toEpochMilli() > ((Date)timestampStats.getMax()).getTime());
|
||||
}
|
||||
}
|
||||
|
||||
colEndInstant = colStartInstant; // next older segment will max out at our current start time
|
||||
}
|
||||
assertEquals(expectNumFound, totalNumFound);
|
||||
}
|
||||
|
||||
private SolrInputDocument newDoc(Instant timestamp) {
|
||||
return sdoc("id", Integer.toString(++lastDocId),
|
||||
timeField, timestamp.toString(),
|
||||
intField, "0"); // always 0
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParse() {
|
||||
assertEquals(Instant.parse("2017-10-02T03:04:05Z"),
|
||||
TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03_04_05"));
|
||||
assertEquals(Instant.parse("2017-10-02T03:04:00Z"),
|
||||
TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03_04"));
|
||||
assertEquals(Instant.parse("2017-10-02T03:00:00Z"),
|
||||
TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03"));
|
||||
assertEquals(Instant.parse("2017-10-02T00:00:00Z"),
|
||||
TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02"));
|
||||
}
|
||||
|
||||
public static class IncrementURPFactory extends FieldMutatingUpdateProcessorFactory {
|
||||
|
||||
@Override
|
||||
public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
|
||||
return FieldValueMutatingUpdateProcessor.valueMutator( getSelector(), next,
|
||||
(src) -> Integer.valueOf(src.toString()) + 1);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -141,14 +141,14 @@ public class UpdateRequestProcessorFactoryTest extends AbstractSolrTestCase {
|
|||
&& procs.get(1) instanceof LogUpdateProcessorFactory.LogUpdateProcessor));
|
||||
|
||||
// fetch the distributed version of this chain
|
||||
proc = chain.createProcessor(req(DISTRIB_UPDATE_PARAM, "non_blank_value"),
|
||||
proc = chain.createProcessor(req(DISTRIB_UPDATE_PARAM, "NONE"), // just some non-blank value
|
||||
new SolrQueryResponse());
|
||||
procs = procToList(proc);
|
||||
assertNotNull(name + " (distrib) chain produced null proc", proc);
|
||||
assertFalse(name + " (distrib) procs is empty", procs.isEmpty());
|
||||
|
||||
// for these 3 (distrib) chains, the first proc should always be LogUpdateProcessor
|
||||
assertTrue(name + " (distrib) first proc should be LogUpdateProcessor because of @RunAllways: "
|
||||
assertTrue(name + " (distrib) first proc should be LogUpdateProcessor because of @RunAlways: "
|
||||
+ procs.toString(),
|
||||
( // compare them both just because i'm going insane and the more checks the better
|
||||
proc instanceof LogUpdateProcessorFactory.LogUpdateProcessor
|
||||
|
@ -173,7 +173,7 @@ public class UpdateRequestProcessorFactoryTest extends AbstractSolrTestCase {
|
|||
* walks the "next" values of the proc building up a List of the procs for easier testing
|
||||
*/
|
||||
public static List<UpdateRequestProcessor> procToList(UpdateRequestProcessor proc) {
|
||||
List<UpdateRequestProcessor> result = new ArrayList<UpdateRequestProcessor>(7);
|
||||
List<UpdateRequestProcessor> result = new ArrayList<>(7);
|
||||
while (null != proc) {
|
||||
result.add(proc);
|
||||
proc = proc.next;
|
||||
|
|
Loading…
Reference in New Issue