From df5a5f949b1a28feff0cc25fe13c95b502feac31 Mon Sep 17 00:00:00 2001 From: David Smiley Date: Thu, 16 Nov 2017 15:08:23 -0500 Subject: [PATCH] SOLR-11542: TimePartitionedUpdateProcessor URP --- solr/CHANGES.txt | 4 + .../DistributedUpdateProcessorFactory.java | 26 +- .../TimePartitionedUpdateProcessor.java | 294 ++++++++++++++++++ .../TimePartitionedUpdateProcessorTest.java | 275 ++++++++++++++++ .../UpdateRequestProcessorFactoryTest.java | 6 +- 5 files changed, 585 insertions(+), 20 deletions(-) create mode 100644 solr/core/src/java/org/apache/solr/update/processor/TimePartitionedUpdateProcessor.java create mode 100644 solr/core/src/test/org/apache/solr/update/processor/TimePartitionedUpdateProcessorTest.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 5615427a124..e43aeb6866f 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -72,6 +72,10 @@ New Features * SOLR-11487: Collection Aliases may now have metadata (currently an internal feature). (Gus Heck, David Smiley) +* SOLR-11542: New TimePartitionedUpdateProcessor URP that routes documents to another collection + in the same Alias defined set based on a time field (currently an internal feature). + (David Smiley) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java index 9e1e86c8f68..c706e0c064a 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java @@ -16,10 +16,10 @@ */ package org.apache.solr.update.processor; +import java.util.Collections; import java.util.Set; import java.util.TreeSet; -import org.apache.solr.common.util.NamedList; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; @@ -39,27 +39,19 @@ public class DistributedUpdateProcessorFactory * used by any {@link UpdateRequestProcessorFactory#getInstance} call to annotate a * SolrQueryRequest with the names of parameters that should also be forwarded. */ + @SuppressWarnings("unchecked") public static void addParamToDistributedRequestWhitelist(final SolrQueryRequest req, final String... paramNames) { - Set whitelist = (Set) req.getContext().get(DistributedUpdateProcessor.PARAM_WHITELIST_CTX_KEY); - if (null == whitelist) { - whitelist = new TreeSet(); - req.getContext().put(DistributedUpdateProcessor.PARAM_WHITELIST_CTX_KEY, whitelist); - } - for (String p : paramNames) { - whitelist.add(p); - } + Set whitelist = (Set) req.getContext() + .computeIfAbsent(DistributedUpdateProcessor.PARAM_WHITELIST_CTX_KEY, key -> new TreeSet<>()); + Collections.addAll(whitelist, paramNames); } @Override - public void init(NamedList args) { - - } - - @Override - public DistributedUpdateProcessor getInstance(SolrQueryRequest req, + public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { - - return new DistributedUpdateProcessor(req, rsp, next); + // note: will sometimes return DURP (no overhead) instead of wrapping + return TimePartitionedUpdateProcessor.wrap(req, rsp, + new DistributedUpdateProcessor(req, rsp, next)); } } diff --git a/solr/core/src/java/org/apache/solr/update/processor/TimePartitionedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TimePartitionedUpdateProcessor.java new file mode 100644 index 00000000000..e485a3dc620 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/update/processor/TimePartitionedUpdateProcessor.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.update.processor; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.solr.cloud.ZkController; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.Aliases; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkCoreNodeProps; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.params.UpdateParams; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.SolrCore; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.update.AddUpdateCommand; +import org.apache.solr.update.CommitUpdateCommand; +import org.apache.solr.update.DeleteUpdateCommand; +import org.apache.solr.update.SolrCmdDistributor; +import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.solr.update.processor.DistributedUpdateProcessor.DISTRIB_FROM; +import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; + +/** + * Distributes update requests to rolling series of collections partitioned by a timestamp field. + * + * Depends on this core having a special core property that points to the alias name that this collection is a part of. + * And further requires certain metadata on the Alias. + * + * @since 7.2.0 + */ +public class TimePartitionedUpdateProcessor extends UpdateRequestProcessor { + //TODO do we make this more generic to others who want to partition collections using something else? + + // TODO auto add new collection partitions when cross a timestamp boundary. That needs to be coordinated to avoid + // race conditions, remembering that even the lead collection might have multiple instances of this URP + // (multiple shards or perhaps just multiple streams thus instances of this URP) + + public static final String ALIAS_DISTRIB_UPDATE_PARAM = "alias." + DISTRIB_UPDATE_PARAM; // param + public static final String TIME_PARTITION_ALIAS_NAME_CORE_PROP = "timePartitionAliasName"; // core prop + public static final String ROUTER_FIELD_METADATA = "router.field"; // alias metadata + + // This format must be compatible with collection name limitations + private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder() + .append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("[_HH[_mm[_ss]]]") //brackets mean optional + .parseDefaulting(ChronoField.HOUR_OF_DAY, 0) + .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0) + .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0) + .toFormatter(Locale.ROOT).withZone(ZoneOffset.UTC); + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final String thisCollection; + private final String aliasName; + private final String routeField; + + private final SolrCmdDistributor cmdDistrib; + private final ZkController zkController; + private final SolrParams outParamsToLeader; + + private List> parsedCollectionsDesc; // k=timestamp (start), v=collection. Sorted descending + private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc + + public static UpdateRequestProcessor wrap(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { + final String timePartitionAliasName = req.getCore().getCoreDescriptor() + .getCoreProperty(TIME_PARTITION_ALIAS_NAME_CORE_PROP, null); + final DistribPhase shardDistribPhase = + DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); + final DistribPhase aliasDistribPhase = + DistribPhase.parseParam(req.getParams().get(ALIAS_DISTRIB_UPDATE_PARAM)); + if (timePartitionAliasName == null || aliasDistribPhase != DistribPhase.NONE || shardDistribPhase != DistribPhase.NONE) { + // if aliasDistribPhase is not NONE, then there is no further collection routing to be done here. + // TODO this may eventually not be true but at the moment it is + // if shardDistribPhase is not NONE, then the phase is after the scope of this URP + return next; + } else { + return new TimePartitionedUpdateProcessor(req, rsp, next, timePartitionAliasName, aliasDistribPhase); + } + } + + protected TimePartitionedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next, + String aliasName, + DistribPhase aliasDistribPhase) { + super(next); + assert aliasDistribPhase == DistribPhase.NONE; + final SolrCore core = req.getCore(); + this.thisCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName(); + this.aliasName = aliasName; + CoreContainer cc = core.getCoreContainer(); + zkController = cc.getZkController(); + cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler()); + + final Map aliasMetadata = zkController.getZkStateReader().getAliases().getCollectionAliasMetadata(aliasName); + if (aliasMetadata == null) { + throw newAliasMustExistException(); // if it did exist, we'd have a non-null map + } + routeField = aliasMetadata.get(ROUTER_FIELD_METADATA); + + ModifiableSolrParams outParams = new ModifiableSolrParams(req.getParams()); + // Don't distribute these params; they will be distributed from the local processCommit separately. + // (See RequestHandlerUtils.handleCommit from which this list was retrieved from) + outParams.remove(UpdateParams.OPTIMIZE); + outParams.remove(UpdateParams.COMMIT); + outParams.remove(UpdateParams.SOFT_COMMIT); + outParams.remove(UpdateParams.PREPARE_COMMIT); + outParams.remove(UpdateParams.ROLLBACK); + // Add these... + // Ensures we skip over URPs prior to DistributedURP (see UpdateRequestProcessorChain) + outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.NONE.toString()); + // Signal this is a distributed search from this URP (see #wrap()) + outParams.set(ALIAS_DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString()); + outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), core.getName())); + outParamsToLeader = outParams; + } + + @Override + public void processAdd(AddUpdateCommand cmd) throws IOException { + final Object routeValue = cmd.getSolrInputDocument().getFieldValue(routeField); + final String targetCollection = findTargetCollectionGivenRouteKey(routeValue); + if (targetCollection == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Doc " + cmd.getPrintableId() + " couldn't be routed with " + routeField + "=" + routeValue); + } + if (thisCollection.equals(targetCollection)) { + // pass on through; we've reached the right collection + super.processAdd(cmd); + } else { + // send to the right collection + SolrCmdDistributor.Node targetLeaderNode = lookupShardLeaderOfCollection(targetCollection); + cmdDistrib.distribAdd(cmd, Collections.singletonList(targetLeaderNode), new ModifiableSolrParams(outParamsToLeader)); + } + } + + protected String findTargetCollectionGivenRouteKey(Object routeKey) { + final Instant docTimestamp; + if (routeKey instanceof Instant) { + docTimestamp = (Instant) routeKey; + } else if (routeKey instanceof Date) { + docTimestamp = ((Date)routeKey).toInstant(); + } else if (routeKey instanceof CharSequence) { + docTimestamp = Instant.parse((CharSequence)routeKey); + } else { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected type of routeKey: " + routeKey); + } + final Aliases aliases = zkController.getZkStateReader().getAliases(); // note: might be different from last request + if (this.parsedCollectionsAliases != aliases) { + if (this.parsedCollectionsAliases != null) { + log.info("Observing possibly updated alias {}", aliasName); + } + this.parsedCollectionsDesc = doParseCollections(aliases); + this.parsedCollectionsAliases = aliases; + } + // iterates in reverse chronological order + // We're O(N) here but N should be small, the loop is fast, and usually looking for 1st. + for (Map.Entry entry : parsedCollectionsDesc) { + Instant colStartTime = entry.getKey(); + if (!docTimestamp.isBefore(colStartTime)) { // i.e. docTimeStamp is >= the colStartTime + return entry.getValue(); //found it + } + } + return null; + } + + /** Parses the timestamp from the collection list and returns them in reverse sorted order (newest 1st) */ + private List> doParseCollections(Aliases aliases) { + final List collections = aliases.getCollectionAliasListMap().get(aliasName); + if (collections == null) { + throw newAliasMustExistException(); + } + // note: I considered TreeMap but didn't like the log(N) just to grab the head when we use it later + List> result = new ArrayList<>(collections.size()); + for (String collection : collections) { + Instant colStartTime = parseInstantFromCollectionName(aliasName, collection); + result.add(new AbstractMap.SimpleImmutableEntry<>(colStartTime, collection)); + } + result.sort((e1, e2) -> e2.getKey().compareTo(e1.getKey())); // reverse sort by key + return result; + } + + private SolrException newAliasMustExistException() { + throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, + "Collection " + thisCollection + " created for use with alias " + aliasName + " which doesn't exist anymore." + + " You cannot write to this unless the alias exists."); + } + + static Instant parseInstantFromCollectionName(String aliasName, String collection) { + final String dateTimePart = collection.substring(aliasName.length() + 1); + return DATE_TIME_FORMATTER.parse(dateTimePart, Instant::from); + } + + @Override + public void processDelete(DeleteUpdateCommand cmd) throws IOException { + final List nodes = lookupShardLeadersOfCollections(); + cmdDistrib.distribDelete(cmd, nodes, new ModifiableSolrParams(outParamsToLeader)); + } + + @Override + public void processCommit(CommitUpdateCommand cmd) throws IOException { + final List nodes = lookupShardLeadersOfCollections(); + cmdDistrib.distribCommit(cmd, nodes, new ModifiableSolrParams(outParamsToLeader)); + cmdDistrib.blockAndDoRetries(); //TODO shouldn't distribCommit do this implicitly? It doesn't. + } + +// Not supported by SolrCmdDistributor and is sketchy any way +// @Override +// public void processRollback(RollbackUpdateCommand cmd) throws IOException { +// } + + @Override + public void finish() throws IOException { + try { + cmdDistrib.finish(); + final List errors = cmdDistrib.getErrors(); + if (!errors.isEmpty()) { + throw new DistributedUpdateProcessor.DistributedUpdatesAsyncException(errors); + } + } finally { + super.finish(); + } + } + + @Override + protected void doClose() { + try { + cmdDistrib.close(); + } finally { + super.doClose(); + } + } + + private List lookupShardLeadersOfCollections() { + final Aliases aliases = zkController.getZkStateReader().getAliases(); + List collections = aliases.getCollectionAliasListMap().get(aliasName); + if (collections == null) { + throw newAliasMustExistException(); + } + return collections.stream().map(this::lookupShardLeaderOfCollection).collect(Collectors.toList()); + } + + private SolrCmdDistributor.Node lookupShardLeaderOfCollection(String collection) { + //TODO consider router to get the right slice. Refactor common code in CloudSolrClient & DistributedUrp + final Collection activeSlices = zkController.getClusterState().getCollection(collection).getActiveSlices(); + if (activeSlices.isEmpty()) { + throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Cannot route to collection " + collection); + } + final Slice slice = activeSlices.iterator().next(); + //TODO when should we do StdNode vs RetryNode? + final Replica leader = slice.getLeader(); + if (leader == null) { + throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, + "No 'leader' replica available for shard " + slice.getName() + " of collection " + collection); + } + return new SolrCmdDistributor.RetryNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(), + collection, null); + } + +} diff --git a/solr/core/src/test/org/apache/solr/update/processor/TimePartitionedUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/TimePartitionedUpdateProcessorTest.java new file mode 100644 index 00000000000..eca6fbbb10f --- /dev/null +++ b/solr/core/src/test/org/apache/solr/update/processor/TimePartitionedUpdateProcessorTest.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.update.processor; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.function.UnaryOperator; + +import org.apache.lucene.util.IOUtils; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.ConfigSetAdminRequest; +import org.apache.solr.client.solrj.request.V2Request; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.Aliases; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TimePartitionedUpdateProcessorTest extends SolrCloudTestCase { + + static final String configName = "timeConfig"; + static final String alias = "myalias"; + static final String timeField = "timestamp"; + static final String intField = "integer_i"; + + static SolrClient solrClient; + + private int lastDocId = 0; + private int numDocsDeletedOrFailed = 0; + + @BeforeClass + public static void setupCluster() throws Exception { + configureCluster(2).configure(); + solrClient = getCloudSolrClient(cluster); + } + + @AfterClass + public static void finish() throws Exception { + IOUtils.close(solrClient); + } + + @Test + public void test() throws Exception { + // First create a config using REST API. To do this, we create a collection with the name of the eventual config. + // We configure it, and ultimately delete it the collection, leaving a config with the same name behind. + // Then when we create the "real" collections referencing this config. + CollectionAdminRequest.createCollection(configName, 1, 1).process(solrClient); + // manipulate the config... + checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config") + .withMethod(SolrRequest.METHOD.POST) + .withPayload("{" + + " 'set-user-property' : {'timePartitionAliasName':'" + alias + "'}," + // no data driven + " 'set-user-property' : {'update.autoCreateFields':false}," + // no data driven + " 'add-updateprocessor' : {" + + " 'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" + + " }," + + " 'add-updateprocessor' : {" + // for testing + " 'name':'inc', 'class':'" + IncrementURPFactory.class.getName() + "'," + + " 'fieldName':'" + intField + "'" + + " }," + + "}").build())); + checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config/params") + .withMethod(SolrRequest.METHOD.POST) + .withPayload("{" + + " 'set' : {" + + " '_UPDATE' : {'processor':'inc,tolerant'}" + + " }" + + "}").build())); + CollectionAdminRequest.deleteCollection(configName).process(solrClient); + + // start with one collection and an alias for it + final String col23rd = alias + "_2017-10-23"; + CollectionAdminRequest.createCollection(col23rd, configName, 1, 1) + .withProperty(TimePartitionedUpdateProcessor.TIME_PARTITION_ALIAS_NAME_CORE_PROP, alias) + .process(solrClient); + + assertEquals("We only expect 2 configSets", + Arrays.asList("_default", configName), new ConfigSetAdminRequest.List().process(solrClient).getConfigSets()); + + CollectionAdminRequest.createAlias(alias, col23rd).process(solrClient); + //TODO use SOLR-11617 client API to set alias metadata + final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader(); + UnaryOperator op = a -> a.cloneWithCollectionAliasMetadata(alias, TimePartitionedUpdateProcessor.ROUTER_FIELD_METADATA, timeField); + zkStateReader.aliasesHolder.applyModificationAndExportToZk(op); + + + // now we index a document + solrClient.add(alias, newDoc(Instant.parse("2017-10-23T00:00:00Z"))); + solrClient.commit(alias); + //assertDocRoutedToCol(lastDocId, col23rd); + assertInvariants(); + + // a document that is too old (throws exception... if we have a TolerantUpdateProcessor then we see it there) + try { + final UpdateResponse resp = solrClient.add(alias, newDoc(Instant.parse("2017-10-01T00:00:00Z"))); + final Object errors = resp.getResponseHeader().get("errors"); + assertTrue(errors != null && errors.toString().contains("couldn't be routed")); + } catch (SolrException e) { + assertTrue(e.getMessage().contains("couldn't be routed")); + } + numDocsDeletedOrFailed++; + + // add another collection, add to alias (soonest comes first) + final String col24th = alias + "_2017-10-24"; + CollectionAdminRequest.createCollection(col24th, configName, 2, 2) // more shards and replicas now + .setMaxShardsPerNode(2) + .withProperty("timePartitionAliasName", alias) + .process(solrClient); + CollectionAdminRequest.createAlias(alias, col24th + "," + col23rd).process(solrClient); + + // index 3 documents in a random fashion + addDocsAndCommit( + newDoc(Instant.parse("2017-10-23T00:00:00Z")), + newDoc(Instant.parse("2017-10-24T01:00:00Z")), + newDoc(Instant.parse("2017-10-24T02:00:00Z")) + ); + assertInvariants(); + + // assert that the IncrementURP has updated all '0' to '1' + final SolrDocumentList checkIncResults = solrClient.query(alias, params("q", "NOT " + intField + ":1")).getResults(); + assertEquals(checkIncResults.toString(), 0, checkIncResults.getNumFound()); + + //delete a random document id; ensure we don't find it + int idToDelete = 1 + random().nextInt(lastDocId); + if (idToDelete == 2) { // #2 didn't make it + idToDelete++; + } + solrClient.deleteById(alias, Integer.toString(idToDelete)); + solrClient.commit(alias); + numDocsDeletedOrFailed++; + assertInvariants(); + } + + private void checkNoError(NamedList response) { + Object errors = response.get("errorMessages"); + assertNull("" + errors, errors); + } + + /** Adds these documents and commits, returning when they are committed. + * We randomly go about this in different ways. */ + private void addDocsAndCommit(SolrInputDocument... solrInputDocuments) throws Exception { + // we assume these are not old docs! + + // this is a list of the collections & the alias name. Use to pick randomly where to send. + // (it doesn't matter where we send docs since the alias is honored at the URP level) + List collections = new ArrayList<>(); + collections.add(alias); + collections.addAll(new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias)); + + int commitWithin = random().nextBoolean() ? -1 : 500; // if -1, we commit explicitly instead + int numDocsBefore = queryNumDocs(); + if (random().nextBoolean()) { + // send in separate requests + for (SolrInputDocument solrInputDocument : solrInputDocuments) { + String col = collections.get(random().nextInt(collections.size())); + solrClient.add(col, solrInputDocument, commitWithin); + } + } else { + // send in a batch. + String col = collections.get(random().nextInt(collections.size())); + solrClient.add(col, Arrays.asList(solrInputDocuments), commitWithin); + } + String col = collections.get(random().nextInt(collections.size())); + if (commitWithin == -1) { + solrClient.commit(col); + } else { + // check that it all got committed eventually + int numDocs = queryNumDocs(); + if (numDocs == numDocsBefore + solrInputDocuments.length) { + System.err.println("Docs committed sooner than expected. Bug or slow test env?"); + return; + } + // wait until it's committed, plus some play time for commit to become visible + Thread.sleep(commitWithin + 200); + numDocs = queryNumDocs(); + assertEquals("not committed. Bug or a slow test?", + numDocsBefore + solrInputDocuments.length, numDocs); + } + } + + private int queryNumDocs() throws SolrServerException, IOException { + return (int) solrClient.query(alias, params("q", "*:*", "rows", "0")).getResults().getNumFound(); + } + + private void assertInvariants() throws IOException, SolrServerException { + final int expectNumFound = lastDocId - numDocsDeletedOrFailed; //lastDocId is effectively # generated docs + + final List cols = new CollectionAdminRequest.ListAliases().process(solrClient).getAliasesAsLists().get(alias); + assert !cols.isEmpty(); + + int totalNumFound = 0; + Instant colEndInstant = null; // exclusive end + for (String col : cols) { + final Instant colStartInstant = TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, col); + //TODO do this in parallel threads + final QueryResponse colStatsResp = solrClient.query(col, params( + "q", "*:*", + "rows", "0", + "stats", "true", + "stats.field", timeField)); + long numFound = colStatsResp.getResults().getNumFound(); + if (numFound > 0) { + totalNumFound += numFound; + final FieldStatsInfo timestampStats = colStatsResp.getFieldStatsInfo().get(timeField); + assertTrue(colStartInstant.toEpochMilli() <= ((Date)timestampStats.getMin()).getTime()); + if (colEndInstant != null) { + assertTrue(colEndInstant.toEpochMilli() > ((Date)timestampStats.getMax()).getTime()); + } + } + + colEndInstant = colStartInstant; // next older segment will max out at our current start time + } + assertEquals(expectNumFound, totalNumFound); + } + + private SolrInputDocument newDoc(Instant timestamp) { + return sdoc("id", Integer.toString(++lastDocId), + timeField, timestamp.toString(), + intField, "0"); // always 0 + } + + @Test + public void testParse() { + assertEquals(Instant.parse("2017-10-02T03:04:05Z"), + TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03_04_05")); + assertEquals(Instant.parse("2017-10-02T03:04:00Z"), + TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03_04")); + assertEquals(Instant.parse("2017-10-02T03:00:00Z"), + TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02_03")); + assertEquals(Instant.parse("2017-10-02T00:00:00Z"), + TimePartitionedUpdateProcessor.parseInstantFromCollectionName(alias, alias + "_2017-10-02")); + } + + public static class IncrementURPFactory extends FieldMutatingUpdateProcessorFactory { + + @Override + public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { + return FieldValueMutatingUpdateProcessor.valueMutator( getSelector(), next, + (src) -> Integer.valueOf(src.toString()) + 1); + } + } + +} diff --git a/solr/core/src/test/org/apache/solr/update/processor/UpdateRequestProcessorFactoryTest.java b/solr/core/src/test/org/apache/solr/update/processor/UpdateRequestProcessorFactoryTest.java index 228273f6cbb..dc7aba1bbb6 100644 --- a/solr/core/src/test/org/apache/solr/update/processor/UpdateRequestProcessorFactoryTest.java +++ b/solr/core/src/test/org/apache/solr/update/processor/UpdateRequestProcessorFactoryTest.java @@ -141,14 +141,14 @@ public class UpdateRequestProcessorFactoryTest extends AbstractSolrTestCase { && procs.get(1) instanceof LogUpdateProcessorFactory.LogUpdateProcessor)); // fetch the distributed version of this chain - proc = chain.createProcessor(req(DISTRIB_UPDATE_PARAM, "non_blank_value"), + proc = chain.createProcessor(req(DISTRIB_UPDATE_PARAM, "NONE"), // just some non-blank value new SolrQueryResponse()); procs = procToList(proc); assertNotNull(name + " (distrib) chain produced null proc", proc); assertFalse(name + " (distrib) procs is empty", procs.isEmpty()); // for these 3 (distrib) chains, the first proc should always be LogUpdateProcessor - assertTrue(name + " (distrib) first proc should be LogUpdateProcessor because of @RunAllways: " + assertTrue(name + " (distrib) first proc should be LogUpdateProcessor because of @RunAlways: " + procs.toString(), ( // compare them both just because i'm going insane and the more checks the better proc instanceof LogUpdateProcessorFactory.LogUpdateProcessor @@ -173,7 +173,7 @@ public class UpdateRequestProcessorFactoryTest extends AbstractSolrTestCase { * walks the "next" values of the proc building up a List of the procs for easier testing */ public static List procToList(UpdateRequestProcessor proc) { - List result = new ArrayList(7); + List result = new ArrayList<>(7); while (null != proc) { result.add(proc); proc = proc.next;