diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 4bd24df5885..fcdfad47dbc 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -49,6 +49,13 @@ New Features * SOLR-8859: AbstractSpatialFieldType will now convert Shapes to/from Strings using the SpatialContext. (ryan) +* SOLR-445: new ToleranteUpdateProcessorFactory to support skipping update commands that cause + failures when sending multiple updates in a single request. + (Erick Erickson, Tomás Fernández Löbbe, Anshum Gupta, hossman) + +* SOLR-8890: New static method in DistributedUpdateProcessorFactory to allow UpdateProcessorFactories + to indicate request params that should be forwarded when DUP distributes updates. (hossman) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/response/SolrQueryResponse.java b/solr/core/src/java/org/apache/solr/response/SolrQueryResponse.java index f1ccd085f86..378dee82e62 100644 --- a/solr/core/src/java/org/apache/solr/response/SolrQueryResponse.java +++ b/solr/core/src/java/org/apache/solr/response/SolrQueryResponse.java @@ -161,6 +161,10 @@ public class SolrQueryResponse { /** * Causes an error to be returned instead of the results. + * + * In general, new calls to this method should not be added. In most cases + * you should simply throw an exception and let it bubble out to + * RequestHandlerBase, which will set the exception thrown. */ public void setException(Exception e) { err=e; diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java index d9b6478119f..a99952dbd57 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java +++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java @@ -21,6 +21,7 @@ import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.BinaryResponseParser; import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient; // jdoc import org.apache.solr.client.solrj.request.AbstractUpdateRequest; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.common.SolrException; @@ -138,7 +139,7 @@ public class SolrCmdDistributor { SolrException.log(SolrCmdDistributor.log, "forwarding update to " + oldNodeUrl + " failed - retrying ... retries: " - + err.req.retries + " " + err.req.cmdString + " params:" + + err.req.retries + " " + err.req.cmd.toString() + " params:" + err.req.uReq.getParams() + " rsp:" + rspCode, err.e); try { Thread.sleep(retryPause); @@ -187,7 +188,7 @@ public class SolrCmdDistributor { uReq.deleteByQuery(cmd.query); } - submit(new Req(cmd.toString(), node, uReq, sync), false); + submit(new Req(cmd, node, uReq, sync), false); } } @@ -200,14 +201,13 @@ public class SolrCmdDistributor { } public void distribAdd(AddUpdateCommand cmd, List nodes, ModifiableSolrParams params, boolean synchronous, RequestReplicationTracker rrt) throws IOException { - String cmdStr = cmd.toString(); for (Node node : nodes) { UpdateRequest uReq = new UpdateRequest(); if (cmd.isLastDocInBatch) uReq.lastDocInBatch(); uReq.setParams(params); uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite); - submit(new Req(cmdStr, node, uReq, synchronous, rrt, cmd.pollQueueTime), false); + submit(new Req(cmd, node, uReq, synchronous, rrt, cmd.pollQueueTime), false); } } @@ -226,7 +226,7 @@ public class SolrCmdDistributor { log.debug("Distrib commit to: {} params: {}", nodes, params); for (Node node : nodes) { - submit(new Req(cmd.toString(), node, uReq, false), true); + submit(new Req(cmd, node, uReq, false), true); } } @@ -272,7 +272,7 @@ public class SolrCmdDistributor { if (log.isDebugEnabled()) { log.debug("sending update to " + req.node.getUrl() + " retry:" - + req.retries + " " + req.cmdString + " params:" + req.uReq.getParams()); + + req.retries + " " + req.cmd + " params:" + req.uReq.getParams()); } if (isCommit) { @@ -314,26 +314,26 @@ public class SolrCmdDistributor { public UpdateRequest uReq; public int retries; public boolean synchronous; - public String cmdString; + public UpdateCommand cmd; public RequestReplicationTracker rfTracker; public int pollQueueTime; - public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous) { - this(cmdString, node, uReq, synchronous, null, 0); + public Req(UpdateCommand cmd, Node node, UpdateRequest uReq, boolean synchronous) { + this(cmd, node, uReq, synchronous, null, 0); } - public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker, int pollQueueTime) { + public Req(UpdateCommand cmd, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker, int pollQueueTime) { this.node = node; this.uReq = uReq; this.synchronous = synchronous; - this.cmdString = cmdString; + this.cmd = cmd; this.rfTracker = rfTracker; this.pollQueueTime = pollQueueTime; } public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("SolrCmdDistributor$Req: cmd=").append(String.valueOf(cmdString)); + sb.append("SolrCmdDistributor$Req: cmd=").append(cmd.toString()); sb.append("; node=").append(String.valueOf(node)); return sb.toString(); } @@ -382,6 +382,13 @@ public class SolrCmdDistributor { public static class Error { public Exception e; public int statusCode = -1; + + /** + * NOTE: This is the request that happened to be executed when this error was triggered the error, + * but because of how {@link StreamingSolrClients} uses {@link ConcurrentUpdateSolrClient} it might not + * actaully be the request that caused the error -- multiple requests are merged & processed as + * a sequential batch. + */ public Req req; public String toString() { diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 365de6cbd8f..5f4e4f1dd66 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -92,7 +92,8 @@ import org.slf4j.LoggerFactory; // NOT mt-safe... create a new processor for each add thread // TODO: we really should not wait for distrib after local? unless a certain replication factor is asked for public class DistributedUpdateProcessor extends UpdateRequestProcessor { - + + final static String PARAM_WHITELIST_CTX_KEY = DistributedUpdateProcessor.class + "PARAM_WHITELIST_CTX_KEY"; public static final String DISTRIB_FROM_SHARD = "distrib.from.shard"; public static final String DISTRIB_FROM_COLLECTION = "distrib.from.collection"; public static final String DISTRIB_FROM_PARENT = "distrib.from.parent"; @@ -292,6 +293,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { this.req = req; + // this should always be used - see filterParams + DistributedUpdateProcessorFactory.addParamToDistributedRequestWhitelist + (this.req, UpdateParams.UPDATE_CHAIN, TEST_DISTRIB_SKIP_SERVERS); + CoreDescriptor coreDesc = req.getCore().getCoreDescriptor(); this.zkEnabled = coreDesc.getCoreContainer().isZooKeeperAware(); @@ -790,38 +795,30 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { cmdDistrib.finish(); List errors = cmdDistrib.getErrors(); // TODO - we may need to tell about more than one error... - - // if it's a forward, any fail is a problem - - // otherwise we assume things are fine if we got it locally - // until we start allowing min replication param - if (errors.size() > 0) { - // if one node is a RetryNode, this was a forward request - if (errors.get(0).req.node instanceof RetryNode) { - rsp.setException(errors.get(0).e); - } else { - if (log.isWarnEnabled()) { - for (Error error : errors) { - log.warn("Error sending update to " + error.req.node.getBaseUrl(), error.e); - } - } - } - // else - // for now we don't error - we assume if it was added locally, we - // succeeded - } - - - // if it is not a forward request, for each fail, try to tell them to - // recover - the doc was already added locally, so it should have been - // legit + List errorsForClient = new ArrayList<>(errors.size()); + for (final SolrCmdDistributor.Error error : errors) { if (error.req.node instanceof RetryNode) { - // we don't try to force a leader to recover - // when we cannot forward to it + // if it's a forward, any fail is a problem - + // otherwise we assume things are fine if we got it locally + // until we start allowing min replication param + errorsForClient.add(error); continue; } + + // else... + + // for now we don't error - we assume if it was added locally, we + // succeeded + if (log.isWarnEnabled()) { + log.warn("Error sending update to " + error.req.node.getBaseUrl(), error.e); + } + + // Since it is not a forward request, for each fail, try to tell them to + // recover - the doc was already added locally, so it should have been + // legit DistribPhase phase = DistribPhase.parseParam(error.req.uReq.getParams().get(DISTRIB_UPDATE_PARAM)); @@ -841,8 +838,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // let's just fail this request and let the client retry? or just call processAdd again? log.error("On "+cloudDesc.getCoreNodeName()+", replica "+replicaUrl+ " now thinks it is the leader! Failing the request to let the client retry! "+error.e); - rsp.setException(error.e); - break; + errorsForClient.add(error); + continue; } String collection = null; @@ -927,7 +924,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { rsp.getResponseHeader().add(UpdateRequest.REPFACT, replicationTracker.getAchievedRf()); rsp.getResponseHeader().add(UpdateRequest.MIN_REPFACT, replicationTracker.minRf); replicationTracker = null; - } + } + + + if (0 < errorsForClient.size()) { + throw new DistributedUpdatesAsyncException(errorsForClient); + } } @@ -1210,10 +1212,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } } + /** @see DistributedUpdateProcessorFactory#addParamToDistributedRequestWhitelist */ protected ModifiableSolrParams filterParams(SolrParams params) { ModifiableSolrParams fparams = new ModifiableSolrParams(); - passParam(params, fparams, UpdateParams.UPDATE_CHAIN); - passParam(params, fparams, TEST_DISTRIB_SKIP_SERVERS); + + Set whitelist = (Set) this.req.getContext().get(PARAM_WHITELIST_CTX_KEY); + assert null != whitelist : "whitelist can't be null, constructor adds to it"; + + for (String p : whitelist) { + passParam(params, fparams, p); + } return fparams; } @@ -1698,4 +1706,67 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // definitely not the leader. Otherwise assume we are. return DistribPhase.FROMLEADER != phase; } + + public static final class DistributedUpdatesAsyncException extends SolrException { + public final List errors; + public DistributedUpdatesAsyncException(List errors) { + super(buildCode(errors), buildMsg(errors), null); + this.errors = errors; + + // create a merged copy of the metadata from all wrapped exceptions + NamedList metadata = new NamedList(); + for (Error error : errors) { + if (error.e instanceof SolrException) { + SolrException e = (SolrException) error.e; + NamedList eMeta = e.getMetadata(); + if (null != eMeta) { + metadata.addAll(eMeta); + } + } + } + if (0 < metadata.size()) { + this.setMetadata(metadata); + } + } + + /** Helper method for constructor */ + private static final int buildCode(List errors) { + assert null != errors; + assert 0 < errors.size(); + + int minCode = Integer.MAX_VALUE; + int maxCode = Integer.MIN_VALUE; + for (Error error : errors) { + log.trace("REMOTE ERROR: {}", error); + minCode = Math.min(error.statusCode, minCode); + maxCode = Math.max(error.statusCode, maxCode); + } + if (minCode == maxCode) { + // all codes are consistent, use that... + return minCode; + } else if (400 <= minCode && maxCode < 500) { + // all codes are 4xx, use 400 + return ErrorCode.BAD_REQUEST.code; + } + // ...otherwise use sensible default + return ErrorCode.SERVER_ERROR.code; + } + + /** Helper method for constructor */ + private static final String buildMsg(List errors) { + assert null != errors; + assert 0 < errors.size(); + + if (1 == errors.size()) { + return "Async exception during distributed update: " + errors.get(0).e.getMessage(); + } else { + StringBuilder buf = new StringBuilder(errors.size() + " Async exceptions during distributed update: "); + for (Error error : errors) { + buf.append("\n"); + buf.append(error.e.getMessage()); + } + return buf.toString(); + } + } + } } diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java index 4b64dec77e9..6446b1ac576 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java @@ -16,6 +16,9 @@ */ package org.apache.solr.update.processor; +import java.util.Set; +import java.util.TreeSet; + import org.apache.solr.common.util.NamedList; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; @@ -28,6 +31,23 @@ import org.apache.solr.response.SolrQueryResponse; public class DistributedUpdateProcessorFactory extends UpdateRequestProcessorFactory implements DistributingUpdateProcessorFactory { + + /** + * By default, the {@link DistributedUpdateProcessor} is extremely conservative in the list of request + * params that will be copied/included when updates are forwarded to other nodes. This method may be + * used by any {@link UpdateRequestProcessorFactory#getInstance} call to annotate a + * SolrQueryRequest with the names of parameters that should also be forwarded. + */ + public static void addParamToDistributedRequestWhitelist(final SolrQueryRequest req, final String... paramNames) { + Set whitelist = (Set) req.getContext().get(DistributedUpdateProcessor.PARAM_WHITELIST_CTX_KEY); + if (null == whitelist) { + whitelist = new TreeSet(); + req.getContext().put(DistributedUpdateProcessor.PARAM_WHITELIST_CTX_KEY, whitelist); + } + for (String p : paramNames) { + whitelist.add(p); + } + } @Override public void init(NamedList args) { diff --git a/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java new file mode 100644 index 00000000000..f9437f58e58 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessor.java @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.update.processor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.lang.invoke.MethodHandles; + +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.CharsRefBuilder; +import org.apache.solr.cloud.ZkController; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.ToleratedUpdateError; +import org.apache.solr.common.ToleratedUpdateError.CmdType; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.params.ShardParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.schema.SchemaField; +import org.apache.solr.update.AddUpdateCommand; +import org.apache.solr.update.CommitUpdateCommand; +import org.apache.solr.update.DeleteUpdateCommand; +import org.apache.solr.update.MergeIndexesCommand; +import org.apache.solr.update.RollbackUpdateCommand; +import org.apache.solr.update.SolrCmdDistributor.Error; +import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + *

+ * Suppresses errors for individual add/delete commands within a single request. + * Instead of failing on the first error, at most maxErrors errors (or unlimited + * if -1==maxErrors) are logged and recorded the batch continues. + * The client will receive a status==200 response, which includes a list of errors + * that were tolerated. + *

+ *

+ * If more then maxErrors occur, the first exception recorded will be re-thrown, + * Solr will respond with status==5xx or status==4xx + * (depending on the underlying exceptions) and it won't finish processing any more updates in the request. + * (ie: subsequent update commands in the request will not be processed even if they are valid). + *

+ * + *

+ * NOTE: In cloud based collections, this processor expects to NOT be used on {@link DistribPhase#FROMLEADER} + * requests (because any successes that occur locally on the leader are considered successes even if there is some + * subsequent error on a replica). {@link TolerantUpdateProcessorFactory} will short circut it away in those + * requests. + *

+ * + * @see TolerantUpdateProcessorFactory + */ +public class TolerantUpdateProcessor extends UpdateRequestProcessor { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * String to be used as document key for errors when a real uniqueKey can't be determined + */ + private static final String UNKNOWN_ID = "(unknown)"; + + /** + * Response Header + */ + private final NamedList header; + + /** + * Number of errors this UpdateRequestProcessor will tolerate. If more then this occur, + * the original exception will be thrown, interrupting the processing of the document + * batch + */ + private final int maxErrors; + + /** The uniqueKey field */ + private SchemaField uniqueKeyField; + + private final SolrQueryRequest req; + private ZkController zkController; + + /** + * Known errors that occurred in this batch, in order encountered (may not be the same as the + * order the commands were originally executed in due to the async distributed updates). + */ + private final List knownErrors = new ArrayList(); + + // Kludge: Because deleteByQuery updates are forwarded to every leader, we can get identical + // errors reported by every leader for the same underlying problem. + // + // It would be nice if we could cleanly handle the unlikely (but possible) situation of an + // update stream that includes multiple identical DBQs, with identical failures, and + // to report each one once, for example... + // add: id#1 + // dbq: foo:bar + // add: id#2 + // add: id#3 + // dbq: foo:bar + // + // ...but i can't figure out a way to accurately identify & return duplicate + // ToleratedUpdateErrors from duplicate identical underlying requests w/o erroneously returning identical + // ToleratedUpdateErrors for the *same* underlying request but from diff shards. + // + // So as a kludge, we keep track of them for deduping against identical remote failures + // + private Set knownDBQErrors = new HashSet<>(); + + private final FirstErrTracker firstErrTracker = new FirstErrTracker(); + private final DistribPhase distribPhase; + + public TolerantUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next, int maxErrors, DistribPhase distribPhase) { + super(next); + assert maxErrors >= -1; + + header = rsp.getResponseHeader(); + this.maxErrors = ToleratedUpdateError.getEffectiveMaxErrors(maxErrors); + this.req = req; + this.distribPhase = distribPhase; + assert ! DistribPhase.FROMLEADER.equals(distribPhase); + + this.zkController = this.req.getCore().getCoreDescriptor().getCoreContainer().getZkController(); + this.uniqueKeyField = this.req.getCore().getLatestSchema().getUniqueKeyField(); + assert null != uniqueKeyField : "Factory didn't enforce uniqueKey field?"; + } + + @Override + public void processAdd(AddUpdateCommand cmd) throws IOException { + BytesRef id = null; + + try { + // force AddUpdateCommand to validate+cache the id before proceeding + id = cmd.getIndexedId(); + + super.processAdd(cmd); + + } catch (Throwable t) { + firstErrTracker.caught(t); + knownErrors.add(new ToleratedUpdateError + (CmdType.ADD, + getPrintableId(id), + t.getMessage())); + + if (knownErrors.size() > maxErrors) { + firstErrTracker.throwFirst(); + } + } + } + + @Override + public void processDelete(DeleteUpdateCommand cmd) throws IOException { + + try { + + super.processDelete(cmd); + + } catch (Throwable t) { + firstErrTracker.caught(t); + + ToleratedUpdateError err = new ToleratedUpdateError(cmd.isDeleteById() ? CmdType.DELID : CmdType.DELQ, + cmd.isDeleteById() ? cmd.id : cmd.query, + t.getMessage()); + knownErrors.add(err); + + // NOTE: we're not using this to dedup before adding to knownErrors. + // if we're lucky enough to get an immediate local failure (ie: we're a leader, or some other processor + // failed) then recording the multiple failures is a good thing -- helps us with an accurate fail + // fast if we exceed maxErrors + if (CmdType.DELQ.equals(err.getType())) { + knownDBQErrors.add(err); + } + + if (knownErrors.size() > maxErrors) { + firstErrTracker.throwFirst(); + } + } + } + + @Override + public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException { + try { + super.processMergeIndexes(cmd); + } catch (Throwable t) { + // we're not tolerante of errors from this type of command, but we + // do need to track it so we can annotate it with any other errors we were allready tolerant of + firstErrTracker.caught(t); + throw t; + } + } + + @Override + public void processCommit(CommitUpdateCommand cmd) throws IOException { + try { + super.processCommit(cmd); + } catch (Throwable t) { + // we're not tolerante of errors from this type of command, but we + // do need to track it so we can annotate it with any other errors we were allready tolerant of + firstErrTracker.caught(t); + throw t; + } + } + + @Override + public void processRollback(RollbackUpdateCommand cmd) throws IOException { + try { + super.processRollback(cmd); + } catch (Throwable t) { + // we're not tolerante of errors from this type of command, but we + // do need to track it so we can annotate it with any other errors we were allready tolerant of + firstErrTracker.caught(t); + throw t; + } + } + + @Override + public void finish() throws IOException { + + // even if processAdd threw an error, this.finish() is still called and we might have additional + // errors from other remote leaders that we need to check for from the finish method of downstream processors + // (like DUP) + + try { + super.finish(); + } catch (DistributedUpdateProcessor.DistributedUpdatesAsyncException duae) { + firstErrTracker.caught(duae); + + + // adjust our stats based on each of the distributed errors + for (Error error : duae.errors) { + // we can't trust the req info from the Error, because multiple original requests might have been + // lumped together + // + // instead we trust the metadata that the TolerantUpdateProcessor running on the remote node added + // to the exception when it failed. + if ( ! (error.e instanceof SolrException) ) { + log.error("async update exception is not SolrException, no metadata to process", error.e); + continue; + } + SolrException remoteErr = (SolrException) error.e; + NamedList remoteErrMetadata = remoteErr.getMetadata(); + + if (null == remoteErrMetadata) { + log.warn("remote error has no metadata to aggregate: " + remoteErr.getMessage(), remoteErr); + continue; + } + + for (int i = 0; i < remoteErrMetadata.size(); i++) { + ToleratedUpdateError err = + ToleratedUpdateError.parseMetadataIfToleratedUpdateError(remoteErrMetadata.getName(i), + remoteErrMetadata.getVal(i)); + if (null == err) { + // some metadata unrelated to this update processor + continue; + } + + if (CmdType.DELQ.equals(err.getType())) { + if (knownDBQErrors.contains(err)) { + // we've already seen this identical error, probably a dup from another shard + continue; + } else { + knownDBQErrors.add(err); + } + } + + knownErrors.add(err); + } + } + } + + header.add("errors", ToleratedUpdateError.formatForResponseHeader(knownErrors)); + // include in response so client knows what effective value was (may have been server side config) + header.add("maxErrors", ToleratedUpdateError.getUserFriendlyMaxErrors(maxErrors)); + + // annotate any error that might be thrown (or was already thrown) + firstErrTracker.annotate(knownErrors); + + // decide if we have hit a situation where we know an error needs to be thrown. + + if ((DistribPhase.TOLEADER.equals(distribPhase) ? 0 : maxErrors) < knownErrors.size()) { + // NOTE: even if maxErrors wasn't exceeeded, we need to throw an error when we have any errors if we're + // a leader that was forwarded to by another node so that the forwarding node knows we encountered some + // problems and can aggregate the results + + firstErrTracker.throwFirst(); + } + } + + /** + * Returns the output of {@link org.apache.solr.schema.FieldType# + * indexedToReadable(BytesRef, CharsRefBuilder)} of the field + * type of the uniqueKey on the {@link BytesRef} passed as parameter. + * ref should be the indexed representation of the id -- if null + * (possibly because it's missing in the update) this method will return {@link #UNKNOWN_ID} + */ + private String getPrintableId(BytesRef ref) { + if (ref == null) { + return UNKNOWN_ID; + } + return uniqueKeyField.getType().indexedToReadable(ref, new CharsRefBuilder()).toString(); + } + + /** + * Simple helper class for "tracking" any exceptions encountered. + * + * Only remembers the "first" exception encountered, and wraps it in a SolrException if needed, so that + * it can later be annotated with the metadata our users expect and re-thrown. + * + * NOTE: NOT THREAD SAFE + */ + private static final class FirstErrTracker { + + + SolrException first = null; + boolean thrown = false; + + public FirstErrTracker() { + /* NOOP */ + } + + /** + * Call this method immediately anytime an exception is caught from a down stream method -- + * even if you are going to ignore it (for now). If you plan to rethrow the Exception, use + * {@link #throwFirst} instead. + */ + public void caught(Throwable t) { + assert null != t; + if (null == first) { + if (t instanceof SolrException) { + first = (SolrException)t; + } else { + first = new SolrException(ErrorCode.SERVER_ERROR, "Tolerantly Caught Exception: " + t.getMessage(), t); + } + } + } + + /** + * Call this method in place of any situation where you would normally (re)throw an exception + * (already passed to the {@link #caught} method because maxErrors was exceeded + * is exceed. + * + * This method will keep a record that this update processor has already thrown the exception, and do + * nothing on future calls, so subsequent update processor methods can update the metadata but won't + * inadvertantly re-throw this (or any other) cascading exception by mistake. + */ + public void throwFirst() throws SolrException { + assert null != first : "caught was never called?"; + if (! thrown) { + thrown = true; + throw first; + } + } + + /** + * Annotates the first exception (which may already have been thrown, or be thrown in the future) with + * the metadata from this update processor. For use in {@link TolerantUpdateProcessor#finish} + */ + public void annotate(List errors) { + + if (null == first) { + return; // no exception to annotate + } + + assert null != errors : "how do we have an exception to annotate w/o any errors?"; + + NamedList firstErrMetadata = first.getMetadata(); + if (null == firstErrMetadata) { // obnoxious + firstErrMetadata = new NamedList(); + first.setMetadata(firstErrMetadata); + } else { + // any existing metadata representing ToleratedUpdateErrors in this single exception needs removed + // so we can add *all* of the known ToleratedUpdateErrors (from this and other exceptions) + for (int i = 0; i < firstErrMetadata.size(); i++) { + if (null != ToleratedUpdateError.parseMetadataIfToleratedUpdateError + (firstErrMetadata.getName(i), firstErrMetadata.getVal(i))) { + + firstErrMetadata.remove(i); + // NOTE: post decrementing index so we don't miss anything as we remove items + i--; + } + } + } + + for (ToleratedUpdateError te : errors) { + firstErrMetadata.add(te.getMetadataKey(), te.getMetadataValue()); + } + } + + + /** The first exception that was thrown (or may be thrown) whose metadata can be annotated. */ + public SolrException getFirst() { + return first; + } + } +} diff --git a/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessorFactory.java new file mode 100644 index 00000000000..8cd35007f9f --- /dev/null +++ b/solr/core/src/java/org/apache/solr/update/processor/TolerantUpdateProcessorFactory.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.update.processor; + +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.SolrCore; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.schema.SchemaField; +import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase; +import org.apache.solr.util.plugin.SolrCoreAware; + +import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; + +/** + *

+ * Suppresses errors for individual add/delete commands within a single request. + * Instead of failing on the first error, at most maxErrors errors (or unlimited + * if -1==maxErrors) are logged and recorded the batch continues. + * The client will receive a status==200 response, which includes a list of errors + * that were tolerated. + *

+ *

+ * If more then maxErrors occur, the first exception recorded will be re-thrown, + * Solr will respond with status==5xx or status==4xx + * (depending on the underlying exceptions) and it won't finish processing any more updates in the request. + * (ie: subsequent update commands in the request will not be processed even if they are valid). + *

+ * + *

+ * maxErrors is an int value that can be specified in the configuration and/or overridden + * per request. If unset, it will default to {@link Integer#MAX_VALUE}. Specifying an explicit value + * of -1 is supported as shorthand for {@link Integer#MAX_VALUE}, all other negative + * integer values are not supported. + *

+ *

+ * An example configuration would be: + *

+ *
+ * <updateRequestProcessorChain name="tolerant-chain">
+ *   <processor class="solr.TolerantUpdateProcessorFactory">
+ *     <int name="maxErrors">10</int>
+ *   </processor>
+ *   <processor class="solr.RunUpdateProcessorFactory" />
+ * </updateRequestProcessorChain>
+ * 
+ * 
+ * + *

+ * The maxErrors parameter in the above chain could be overwritten per request, for example: + *

+ *
+ * curl http://localhost:8983/update?update.chain=tolerant-chain&maxErrors=100 -H "Content-Type: text/xml" -d @myfile.xml
+ * 
+ * + *

+ * NOTE: The behavior of this UpdateProcessofFactory in conjunction with indexing operations + * while a Shard Split is actively in progress is not well defined (or sufficiently tested). Users + * of this update processor are encouraged to either disable it, or pause updates, while any shard + * splitting is in progress (see SOLR-8881 + * for more details.) + *

+ */ +public class TolerantUpdateProcessorFactory extends UpdateRequestProcessorFactory + implements SolrCoreAware, UpdateRequestProcessorFactory.RunAlways { + + /** + * Parameter that defines how many errors the UpdateRequestProcessor will tolerate + */ + private final static String MAX_ERRORS_PARAM = "maxErrors"; + + /** + * Default maxErrors value that will be use if the value is not set in configuration + * or in the request + */ + private int defaultMaxErrors = Integer.MAX_VALUE; + + private boolean informed = false; + + @SuppressWarnings("rawtypes") + @Override + public void init( NamedList args ) { + + Object maxErrorsObj = args.get(MAX_ERRORS_PARAM); + if (maxErrorsObj != null) { + try { + defaultMaxErrors = Integer.valueOf(maxErrorsObj.toString()); + } catch (Exception e) { + throw new SolrException(ErrorCode.SERVER_ERROR, "Unnable to parse maxErrors parameter: " + maxErrorsObj, e); + } + if (defaultMaxErrors < -1) { + throw new SolrException(ErrorCode.SERVER_ERROR, "Config option '"+MAX_ERRORS_PARAM + "' must either be non-negative, or -1 to indicate 'unlimiited': " + maxErrorsObj.toString()); + } + } + } + + @Override + public void inform(SolrCore core) { + informed = true; + if (null == core.getLatestSchema().getUniqueKeyField()) { + throw new SolrException(ErrorCode.SERVER_ERROR, this.getClass().getName() + + " requires a schema that includes a uniqueKey field."); + } + } + + @Override + public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { + + assert informed : "inform(SolrCore) never called?"; + + // short circut if we're a replica processing commands from our leader + DistribPhase distribPhase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); + if (DistribPhase.FROMLEADER.equals(distribPhase)) { + return next; + } + + DistributedUpdateProcessorFactory.addParamToDistributedRequestWhitelist(req, MAX_ERRORS_PARAM); + int maxErrors = req.getParams().getInt(MAX_ERRORS_PARAM, defaultMaxErrors); + if (maxErrors < -1) { + throw new SolrException(ErrorCode.BAD_REQUEST, "'"+MAX_ERRORS_PARAM + "' must either be non-negative, or -1 to indicate 'unlimiited': " + maxErrors); + } + + // NOTE: even if 0==maxErrors, we still inject processor into chain so respones has expected header info + return new TolerantUpdateProcessor(req, rsp, next, maxErrors, distribPhase); + } +} diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-distrib-update-processor-chains.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-distrib-update-processor-chains.xml new file mode 100644 index 00000000000..97ed18b53e0 --- /dev/null +++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-distrib-update-processor-chains.xml @@ -0,0 +1,85 @@ + + + + + + + + ${tests.luceneMatchVersion:LUCENE_CURRENT} + + + + ${solr.hdfs.blockcache.enabled:true} + ${solr.hdfs.blockcache.blocksperbank:1024} + ${solr.hdfs.home:} + ${solr.hdfs.confdir:} + ${solr.hdfs.blockcache.global:false} + + + ${solr.data.dir:} + + + + + + + + + + + + + + + true + + + + + + + + + + ${solr.ulog.dir:} + + + + + + + 10 + + + + + + + + + + + + + + + + + diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-tolerant-update-minimal.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-tolerant-update-minimal.xml new file mode 100644 index 00000000000..d3b90db50f6 --- /dev/null +++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-tolerant-update-minimal.xml @@ -0,0 +1,40 @@ + + + + + + + ${tests.luceneMatchVersion:LATEST} + + + + + + explicit + true + text + + + + + + + + + + diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-update-processor-chains.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-update-processor-chains.xml index d0b547241c3..bb1cbcfc613 100644 --- a/solr/core/src/test-files/solr/collection1/conf/solrconfig-update-processor-chains.xml +++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-update-processor-chains.xml @@ -26,6 +26,7 @@ ${tests.luceneMatchVersion:LATEST} + @@ -628,4 +629,20 @@ + + + 10 + + + + + + + + + + + + + diff --git a/solr/core/src/test/org/apache/solr/cloud/TestTolerantUpdateProcessorCloud.java b/solr/core/src/test/org/apache/solr/cloud/TestTolerantUpdateProcessorCloud.java new file mode 100644 index 00000000000..054c0745fd4 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/TestTolerantUpdateProcessorCloud.java @@ -0,0 +1,1065 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud; + +import java.io.File; +import java.lang.invoke.MethodHandles; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.SolrInputField; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.ToleratedUpdateError; +import org.apache.solr.common.ToleratedUpdateError.CmdType; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.util.RevertDefaultThreadHandlerRule; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test of TolerantUpdateProcessor using a MiniSolrCloud. Updates (that include failures which + * should be tolerated) are explicitly tested against various initial nodes to confirm correct + * behavior regardless of routing. + * + *

+ * NOTE: This test sets up a static instance of MiniSolrCloud with a single collection + * and several clients pointed at specific nodes. These are all re-used across multiple test methods, + * and assumes that the state of the cluster is healthy. + *

+ * + */ +public class TestTolerantUpdateProcessorCloud extends SolrCloudTestCase { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final int NUM_SHARDS = 2; + private static final int REPLICATION_FACTOR = 2; + private static final int NUM_SERVERS = 5; + + private static final String COLLECTION_NAME = "test_col"; + + /** A basic client for operations at the cloud level, default collection will be set */ + private static CloudSolrClient CLOUD_CLIENT; + + /** A client for talking directly to the leader of shard1 */ + private static HttpSolrClient S_ONE_LEADER_CLIENT; + + /** A client for talking directly to the leader of shard2 */ + private static HttpSolrClient S_TWO_LEADER_CLIENT; + + /** A client for talking directly to a passive replica of shard1 */ + private static HttpSolrClient S_ONE_NON_LEADER_CLIENT; + + /** A client for talking directly to a passive replica of shard2 */ + private static HttpSolrClient S_TWO_NON_LEADER_CLIENT; + + /** A client for talking directly to a node that has no piece of the collection */ + private static HttpSolrClient NO_COLLECTION_CLIENT; + + /** id field doc routing prefix for shard1 */ + private static final String S_ONE_PRE = "abc!"; + + /** id field doc routing prefix for shard2 */ + private static final String S_TWO_PRE = "XYZ!"; + + @BeforeClass + private static void createMiniSolrCloudCluster() throws Exception { + + final String configName = "solrCloudCollectionConfig"; + final File configDir = new File(TEST_HOME() + File.separator + "collection1" + File.separator + "conf"); + + configureCluster(NUM_SERVERS) + .addConfig(configName, configDir.toPath()) + .configure(); + assertSpinLoopAllJettyAreRunning(cluster); + + Map collectionProperties = new HashMap<>(); + collectionProperties.put("config", "solrconfig-distrib-update-processor-chains.xml"); + collectionProperties.put("schema", "schema15.xml"); // string id for doc routing prefix + + assertNotNull(cluster.createCollection(COLLECTION_NAME, NUM_SHARDS, REPLICATION_FACTOR, + configName, null, null, collectionProperties)); + + CLOUD_CLIENT = cluster.getSolrClient(); + CLOUD_CLIENT.setDefaultCollection(COLLECTION_NAME); + + ZkStateReader zkStateReader = CLOUD_CLIENT.getZkStateReader(); + AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION_NAME, zkStateReader, true, true, 330); + + + // really hackish way to get a URL for specific nodes based on shard/replica hosting + // inspired by TestMiniSolrCloudCluster + HashMap urlMap = new HashMap<>(); + for (JettySolrRunner jetty : cluster.getJettySolrRunners()) { + URL jettyURL = jetty.getBaseUrl(); + String nodeKey = jettyURL.getHost() + ":" + jettyURL.getPort() + jettyURL.getPath().replace("/","_"); + urlMap.put(nodeKey, jettyURL.toString()); + } + zkStateReader.updateClusterState(); + ClusterState clusterState = zkStateReader.getClusterState(); + for (Slice slice : clusterState.getSlices(COLLECTION_NAME)) { + String shardName = slice.getName(); + Replica leader = slice.getLeader(); + assertNotNull("slice has null leader: " + slice.toString(), leader); + assertNotNull("slice leader has null node name: " + slice.toString(), leader.getNodeName()); + String leaderUrl = urlMap.remove(leader.getNodeName()); + assertNotNull("could not find URL for " + shardName + " leader: " + leader.getNodeName(), + leaderUrl); + assertEquals("expected two total replicas for: " + slice.getName(), + 2, slice.getReplicas().size()); + + String passiveUrl = null; + + for (Replica replica : slice.getReplicas()) { + if ( ! replica.equals(leader)) { + passiveUrl = urlMap.remove(replica.getNodeName()); + assertNotNull("could not find URL for " + shardName + " replica: " + replica.getNodeName(), + passiveUrl); + } + } + assertNotNull("could not find URL for " + shardName + " replica", passiveUrl); + + if (shardName.equals("shard1")) { + S_ONE_LEADER_CLIENT = new HttpSolrClient(leaderUrl + "/" + COLLECTION_NAME + "/"); + S_ONE_NON_LEADER_CLIENT = new HttpSolrClient(passiveUrl + "/" + COLLECTION_NAME + "/"); + } else if (shardName.equals("shard2")) { + S_TWO_LEADER_CLIENT = new HttpSolrClient(leaderUrl + "/" + COLLECTION_NAME + "/"); + S_TWO_NON_LEADER_CLIENT = new HttpSolrClient(passiveUrl + "/" + COLLECTION_NAME + "/"); + } else { + fail("unexpected shard: " + shardName); + } + } + assertEquals("Should be exactly one server left (nost hosting either shard)", 1, urlMap.size()); + NO_COLLECTION_CLIENT = new HttpSolrClient(urlMap.values().iterator().next() + + "/" + COLLECTION_NAME + "/"); + + assertNotNull(S_ONE_LEADER_CLIENT); + assertNotNull(S_TWO_LEADER_CLIENT); + assertNotNull(S_ONE_NON_LEADER_CLIENT); + assertNotNull(S_TWO_NON_LEADER_CLIENT); + assertNotNull(NO_COLLECTION_CLIENT); + + // sanity check that our S_ONE_PRE & S_TWO_PRE really do map to shard1 & shard2 with default routing + assertEquals(0, CLOUD_CLIENT.add(doc(f("id", S_ONE_PRE + random().nextInt()), + f("expected_shard_s", "shard1"))).getStatus()); + assertEquals(0, CLOUD_CLIENT.add(doc(f("id", S_TWO_PRE + random().nextInt()), + f("expected_shard_s", "shard2"))).getStatus()); + assertEquals(0, CLOUD_CLIENT.commit().getStatus()); + SolrDocumentList docs = CLOUD_CLIENT.query(params("q", "*:*", + "fl","id,expected_shard_s,[shard]")).getResults(); + assertEquals(2, docs.getNumFound()); + assertEquals(2, docs.size()); + for (SolrDocument doc : docs) { + String expected = COLLECTION_NAME + "_" + doc.getFirstValue("expected_shard_s") + "_replica"; + String docShard = doc.getFirstValue("[shard]").toString(); + assertTrue("shard routing prefixes don't seem to be aligned anymore, " + + "did someone change the default routing rules? " + + "and/or the the default core name rules? " + + "and/or the numShards used by this test? ... " + + "couldn't find " + expected + " as substring of [shard] == '" + docShard + + "' ... for docId == " + doc.getFirstValue("id"), + docShard.contains(expected)); + } + } + + @Before + private void clearCollection() throws Exception { + assertEquals(0, CLOUD_CLIENT.deleteByQuery("*:*").getStatus()); + assertEquals(0, CLOUD_CLIENT.commit().getStatus()); + } + + public void testSanity() throws Exception { + + // verify some basic sanity checking of indexing & querying across the collection + // w/o using our custom update processor chain + + assertEquals(0, CLOUD_CLIENT.add(doc(f("id", S_ONE_PRE + "1"), + f("foo_i", 42))).getStatus()); + assertEquals(0, CLOUD_CLIENT.add(doc(f("id", S_TWO_PRE + "2"), + f("foo_i", 66))).getStatus()); + assertEquals(0, CLOUD_CLIENT.commit().getStatus()); + + for (SolrClient c : Arrays.asList(S_ONE_LEADER_CLIENT, S_TWO_LEADER_CLIENT, + S_ONE_NON_LEADER_CLIENT, S_TWO_NON_LEADER_CLIENT, + NO_COLLECTION_CLIENT, CLOUD_CLIENT)) { + assertQueryDocIds(c, true, S_ONE_PRE + "1", S_TWO_PRE + "2"); + assertQueryDocIds(c, false, "id_not_exists"); + + // verify adding 2 broken docs causes a clint exception + try { + UpdateResponse rsp = update(params(), + doc(f("id", S_ONE_PRE + "X"), f("foo_i", "bogus_val_X")), + doc(f("id", S_TWO_PRE + "Y"), f("foo_i", "bogus_val_Y")) + ).process(c); + fail("did not get a top level exception when more then 10 docs failed: " + rsp.toString()); + } catch (SolrException e) { + assertEquals("not the type of error we were expecting ("+e.code()+"): " + e.toString(), + 400, e.code()); + } + + // verify malformed deleteByQuerys fail + try { + UpdateResponse rsp = update(params()).deleteByQuery("foo_i:not_a_num").process(c); + fail("sanity check for malformed DBQ didn't fail: " + rsp.toString()); + } catch (SolrException e) { + assertEquals("not the expected DBQ failure: " + e.getMessage(), 400, e.code()); + } + + // verify oportunistic concurrency deletions fail as we expect when docs are / aren't present + for (UpdateRequest r : new UpdateRequest[] { + update(params("commit", "true")).deleteById(S_ONE_PRE + "1", -1L), + update(params("commit", "true")).deleteById(S_TWO_PRE + "2", -1L), + update(params("commit", "true")).deleteById("id_not_exists", 1L) }) { + try { + UpdateResponse rsp = r.process(c); + fail("sanity check for oportunistic concurrency delete didn't fail: " + + r.toString() + " => " + rsp.toString()); + } catch (SolrException e) { + assertEquals("not the expected oportunistic concurrency failure code: " + + r.toString() + " => " + e.getMessage(), 409, e.code()); + } + } + } + } + + // + public void testVariousDeletesViaCloudClient() throws Exception { + testVariousDeletes(CLOUD_CLIENT); + } + public void testVariousDeletesViaShard1LeaderClient() throws Exception { + testVariousDeletes(S_ONE_LEADER_CLIENT); + } + public void testVariousDeletesViaShard2LeaderClient() throws Exception { + testVariousDeletes(S_TWO_LEADER_CLIENT); + } + public void testVariousDeletesViaShard1NonLeaderClient() throws Exception { + testVariousDeletes(S_ONE_NON_LEADER_CLIENT); + } + public void testVariousDeletesViaShard2NonLeaderClient() throws Exception { + testVariousDeletes(S_TWO_NON_LEADER_CLIENT); + } + public void testVariousDeletesViaNoCollectionClient() throws Exception { + testVariousDeletes(NO_COLLECTION_CLIENT); + } + + protected static void testVariousDeletes(SolrClient client) throws Exception { + assertNotNull("client not initialized", client); + + // 2 docs, one on each shard + final String docId1 = S_ONE_PRE + "42"; + final String docId2 = S_TWO_PRE + "666"; + + UpdateResponse rsp = null; + + // add 1 doc to each shard + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "commit", "true"), + doc(f("id", docId1), f("foo_i", "2001")), + doc(f("id", docId2), f("foo_i", "1976"))).process(client); + assertEquals(0, rsp.getStatus()); + + // attempt to delete individual doc id(s) that should fail because of oportunistic concurrency constraints + for (String id : new String[] { docId1, docId2 }) { + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "commit", "true")).deleteById(id, -1L).process(client); + assertEquals(0, rsp.getStatus()); + assertUpdateTolerantErrors("failed oportunistic concurrent delId="+id, rsp, + delIErr(id)); + } + + // multiple failed deletes from the same shard (via oportunistic concurrent w/ bogus ids) + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "commit", "true") + ).deleteById(S_ONE_PRE + "X", +1L).deleteById(S_ONE_PRE + "Y", +1L).process(client); + assertEquals(0, rsp.getStatus()); + assertUpdateTolerantErrors("failed oportunistic concurrent delete by id for 2 bogus docs", rsp, + delIErr(S_ONE_PRE + "X"), delIErr(S_ONE_PRE + "Y")); + assertQueryDocIds(client, true, docId1, docId2); + + // multiple failed deletes from the diff shards due to oportunistic concurrency constraints + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "commit", "true")).deleteById(docId2, -1L).deleteById(docId1, -1L).process(client); + assertEquals(0, rsp.getStatus()); + assertUpdateTolerantErrors("failed oportunistic concurrent delete by id for 2 docs", rsp, + delIErr(docId1), delIErr(docId2)); + assertQueryDocIds(client, true, docId1, docId2); + + // deleteByQuery using malformed query (fail) + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "commit", "true")).deleteByQuery("bogus_field:foo").process(client); + assertEquals(0, rsp.getStatus()); + assertUpdateTolerantErrors("failed oportunistic concurrent delete by query", rsp, + delQErr("bogus_field:foo")); + assertQueryDocIds(client, true, docId1, docId2); + + // mix 2 deleteByQuery, one malformed (fail), one that doesn't match anything (ok) + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "commit", "true") + ).deleteByQuery("bogus_field:foo").deleteByQuery("foo_i:23").process(client); + assertEquals(0, rsp.getStatus()); + assertUpdateTolerantErrors("failed oportunistic concurrent delete by query", rsp, + delQErr("bogus_field:foo")); + assertQueryDocIds(client, true, docId1, docId2); + + // mix 2 deleteById using _version_=-1, one for real doc1 (fail), one for bogus id (ok) + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "commit", "true") + ).deleteById(docId1, -1L).deleteById("bogus", -1L).process(client); + assertEquals(0, rsp.getStatus()); + assertUpdateTolerantErrors("failed oportunistic concurrent delete by id: exists", rsp, + delIErr(docId1)); + assertQueryDocIds(client, true, docId1, docId2); + + // mix 2 deleteById using _version_=1, one for real doc1 (ok, deleted), one for bogus id (fail) + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "commit", "true") + ).deleteById(docId1, +1L).deleteById("bogusId", +1L).process(client); + assertEquals(0, rsp.getStatus()); + assertUpdateTolerantErrors("failed oportunistic concurrent delete by id: bogus", rsp, + delIErr("bogusId")); + assertQueryDocIds(client, false, docId1); + assertQueryDocIds(client, true, docId2); + + // mix 2 deleteByQuery, one malformed (fail), one that alctaully removes some docs (ok) + assertQueryDocIds(client, true, docId2); + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "commit", "true") + ).deleteByQuery("bogus_field:foo").deleteByQuery("foo_i:1976").process(client); + assertEquals(0, rsp.getStatus()); + assertUpdateTolerantErrors("failed oportunistic concurrent delete by query", rsp, + delQErr("bogus_field:foo")); + assertQueryDocIds(client, false, docId2); + + } + + + // + public void testVariousAddsViaCloudClient() throws Exception { + testVariousAdds(CLOUD_CLIENT); + } + public void testVariousAddsViaShard1LeaderClient() throws Exception { + testVariousAdds(S_ONE_LEADER_CLIENT); + } + public void testVariousAddsViaShard2LeaderClient() throws Exception { + testVariousAdds(S_TWO_LEADER_CLIENT); + } + public void testVariousAddsViaShard1NonLeaderClient() throws Exception { + testVariousAdds(S_ONE_NON_LEADER_CLIENT); + } + public void testVariousAddsViaShard2NonLeaderClient() throws Exception { + testVariousAdds(S_TWO_NON_LEADER_CLIENT); + } + public void testVariousAddsViaNoCollectionClient() throws Exception { + testVariousAdds(NO_COLLECTION_CLIENT); + } + + protected static void testVariousAdds(SolrClient client) throws Exception { + assertNotNull("client not initialized", client); + + UpdateResponse rsp = null; + + // 2 docs that are both on shard1, the first one should fail + for (int maxErrors : new int[] { -1, 2, 47, 10 }) { + // regardless of which of these maxErrors values we use, behavior should be the same... + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "maxErrors", ""+maxErrors, + "commit", "true"), + doc(f("id", S_ONE_PRE + "42"), f("foo_i", "bogus_value")), + doc(f("id", S_ONE_PRE + "666"), f("foo_i", "1976"))).process(client); + + assertEquals(0, rsp.getStatus()); + assertUpdateTolerantAddErrors("single shard, 1st doc should fail", rsp, S_ONE_PRE + "42"); + assertEquals(0, client.commit().getStatus()); + assertQueryDocIds(client, false, S_ONE_PRE + "42"); + assertQueryDocIds(client, true, S_ONE_PRE + "666"); + + // ...only diff should be that we get an accurate report of the effective maxErrors + assertEquals(maxErrors, rsp.getResponseHeader().get("maxErrors")); + } + + // 2 docs that are both on shard1, the second one should fail + + rsp = update(params("update.chain", "tolerant-chain-max-errors-not-set", + "commit", "true"), + doc(f("id", S_ONE_PRE + "55"), f("foo_i", "1976")), + doc(f("id", S_ONE_PRE + "77"), f("foo_i", "bogus_val"))).process(client); + + assertEquals(0, rsp.getStatus()); + assertUpdateTolerantAddErrors("single shard, 2nd doc should fail", rsp, S_ONE_PRE + "77"); + assertQueryDocIds(client, false, S_ONE_PRE + "77"); + assertQueryDocIds(client, true, S_ONE_PRE + "666", S_ONE_PRE + "55"); + // since maxErrors is unset, we should get an "unlimited" value back + assertEquals(-1, rsp.getResponseHeader().get("maxErrors")); + + // clean slate + assertEquals(0, client.deleteByQuery("*:*").getStatus()); + + // 2 docs on 2 diff shards, first of which should fail + + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "commit", "true"), + doc(f("id", S_ONE_PRE + "42"), f("foo_i", "bogus_value")), + doc(f("id", S_TWO_PRE + "666"), f("foo_i", "1976"))).process(client); + + assertEquals(0, rsp.getStatus()); + assertUpdateTolerantAddErrors("two shards, 1st doc should fail", rsp, S_ONE_PRE + "42"); + assertEquals(0, client.commit().getStatus()); + assertQueryDocIds(client, false, S_ONE_PRE + "42"); + assertQueryDocIds(client, true, S_TWO_PRE + "666"); + + // 2 docs on 2 diff shards, second of which should fail + + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "commit", "true"), + doc(f("id", S_ONE_PRE + "55"), f("foo_i", "1976")), + doc(f("id", S_TWO_PRE + "77"), f("foo_i", "bogus_val"))).process(client); + + assertEquals(0, rsp.getStatus()); + assertUpdateTolerantAddErrors("two shards, 2nd doc should fail", rsp, S_TWO_PRE + "77"); + assertQueryDocIds(client, false, S_TWO_PRE + "77"); + assertQueryDocIds(client, true, S_TWO_PRE + "666", S_ONE_PRE + "55"); + + // clean slate + assertEquals(0, client.deleteByQuery("*:*").getStatus()); + + // many docs from diff shards, 1 from each shard should fail + + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "commit", "true"), + doc(f("id", S_ONE_PRE + "11")), + doc(f("id", S_TWO_PRE + "21")), + doc(f("id", S_ONE_PRE + "12")), + doc(f("id", S_TWO_PRE + "22"), f("foo_i", "bogus_val")), + doc(f("id", S_ONE_PRE + "13")), + doc(f("id", S_TWO_PRE + "23")), + doc(f("id", S_ONE_PRE + "14")), + doc(f("id", S_TWO_PRE + "24")), + doc(f("id", S_ONE_PRE + "15"), f("foo_i", "bogus_val")), + doc(f("id", S_TWO_PRE + "25")), + doc(f("id", S_ONE_PRE + "16")), + doc(f("id", S_TWO_PRE + "26"))).process(client); + + assertEquals(0, rsp.getStatus()); + assertUpdateTolerantAddErrors("many docs, 1 from each shard should fail", rsp, + S_ONE_PRE + "15", + S_TWO_PRE + "22"); + assertQueryDocIds(client, false, S_TWO_PRE + "22", S_ONE_PRE + "15"); + assertQueryDocIds(client, true, + S_ONE_PRE + "11", S_TWO_PRE + "21", S_ONE_PRE + "12", + S_ONE_PRE + "13", S_TWO_PRE + "23", S_ONE_PRE + "14", S_TWO_PRE + "24", + S_TWO_PRE + "25", S_ONE_PRE + "16", S_TWO_PRE + "26"); + + // clean slate + assertEquals(0, client.deleteByQuery("*:*").getStatus()); + + // many docs from diff shards, 1 from each shard should fail and 1 w/o uniqueKey + + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "commit", "true"), + doc(f("id", S_ONE_PRE + "11")), + doc(f("id", S_TWO_PRE + "21")), + doc(f("id", S_ONE_PRE + "12")), + doc(f("id", S_TWO_PRE + "22"), f("foo_i", "bogus_val")), + doc(f("id", S_ONE_PRE + "13")), + doc(f("id", S_TWO_PRE + "23")), + doc(f("foo_i", "42")), // no "id" + doc(f("id", S_ONE_PRE + "14")), + doc(f("id", S_TWO_PRE + "24")), + doc(f("id", S_ONE_PRE + "15"), f("foo_i", "bogus_val")), + doc(f("id", S_TWO_PRE + "25")), + doc(f("id", S_ONE_PRE + "16")), + doc(f("id", S_TWO_PRE + "26"))).process(client); + + assertEquals(0, rsp.getStatus()); + assertUpdateTolerantAddErrors("many docs, 1 from each shard (+ no id) should fail", rsp, + S_ONE_PRE + "15", + "(unknown)", + S_TWO_PRE + "22"); + assertQueryDocIds(client, false, S_TWO_PRE + "22", S_ONE_PRE + "15"); + assertQueryDocIds(client, true, + S_ONE_PRE + "11", S_TWO_PRE + "21", S_ONE_PRE + "12", + S_ONE_PRE + "13", S_TWO_PRE + "23", S_ONE_PRE + "14", S_TWO_PRE + "24", + S_TWO_PRE + "25", S_ONE_PRE + "16", S_TWO_PRE + "26"); + + // clean slate + assertEquals(0, client.deleteByQuery("*:*").getStatus()); + + // many docs from diff shards, more then 10 (total) should fail + + try { + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "commit", "true"), + doc(f("id", S_ONE_PRE + "11")), + doc(f("id", S_TWO_PRE + "21"), f("foo_i", "bogus_val")), + doc(f("id", S_ONE_PRE + "12")), + doc(f("id", S_TWO_PRE + "22"), f("foo_i", "bogus_val")), + doc(f("id", S_ONE_PRE + "13")), + doc(f("id", S_TWO_PRE + "23"), f("foo_i", "bogus_val")), + doc(f("id", S_ONE_PRE + "14"), f("foo_i", "bogus_val")), + doc(f("id", S_TWO_PRE + "24")), + doc(f("id", S_ONE_PRE + "15"), f("foo_i", "bogus_val")), + doc(f("id", S_TWO_PRE + "25")), + doc(f("id", S_ONE_PRE + "16"), f("foo_i", "bogus_val")), + doc(f("id", S_TWO_PRE + "26"), f("foo_i", "bogus_val")), + doc(f("id", S_ONE_PRE + "17")), + doc(f("id", S_TWO_PRE + "27")), + doc(f("id", S_ONE_PRE + "18"), f("foo_i", "bogus_val")), + doc(f("id", S_TWO_PRE + "28"), f("foo_i", "bogus_val")), + doc(f("id", S_ONE_PRE + "19"), f("foo_i", "bogus_val")), + doc(f("id", S_TWO_PRE + "29"), f("foo_i", "bogus_val")), + doc(f("id", S_ONE_PRE + "10")), // may be skipped, more then 10 fails + doc(f("id", S_TWO_PRE + "20")) // may be skipped, more then 10 fails + ).process(client); + + fail("did not get a top level exception when more then 10 docs failed: " + rsp.toString()); + } catch (SolrException e) { + // we can't make any reliable assertions about the error message, because + // it varies based on how the request was routed -- see SOLR-8830 + assertEquals("not the type of error we were expecting ("+e.code()+"): " + e.toString(), + // NOTE: we always expect a 400 because we know that's what we would get from these types of errors + // on a single node setup -- a 5xx type error isn't something we should have triggered + 400, e.code()); + + // verify that the Exceptions metadata can tell us what failed. + NamedList remoteErrMetadata = e.getMetadata(); + assertNotNull("no metadata in: " + e.toString(), remoteErrMetadata); + Set actualKnownErrs + = new LinkedHashSet(remoteErrMetadata.size()); + int actualKnownErrsCount = 0; + for (int i = 0; i < remoteErrMetadata.size(); i++) { + ToleratedUpdateError err = + ToleratedUpdateError.parseMetadataIfToleratedUpdateError(remoteErrMetadata.getName(i), + remoteErrMetadata.getVal(i)); + if (null == err) { + // some metadata unrelated to this update processor + continue; + } + actualKnownErrsCount++; + actualKnownErrs.add(err); + } + assertEquals("wrong number of errors in metadata: " + remoteErrMetadata.toString(), + 11, actualKnownErrsCount); + assertEquals("at least one dup error in metadata: " + remoteErrMetadata.toString(), + actualKnownErrsCount, actualKnownErrs.size()); + for (ToleratedUpdateError err : actualKnownErrs) { + assertEquals("only expected type of error is ADD: " + err, + CmdType.ADD, err.getType()); + assertTrue("failed err msg didn't match expected value: " + err, + err.getMessage().contains("bogus_val")); + } + } + assertEquals(0, client.commit().getStatus()); // need to force since update didn't finish + assertQueryDocIds(client, false + // explicitly failed + , S_TWO_PRE + "21", S_TWO_PRE + "22", S_TWO_PRE + "23", S_ONE_PRE + "14" + , S_ONE_PRE + "15", S_ONE_PRE + "16", S_TWO_PRE + "26", S_ONE_PRE + "18" + , S_TWO_PRE + "28", S_ONE_PRE + "19", S_TWO_PRE + "29" + // + // // we can't assert for sure these docs were skipped + // // depending on shard we hit, they may have been added async before errors were exceeded + // , S_ONE_PRE + "10", S_TWO_PRE + "20" // skipped + ); + assertQueryDocIds(client, true, + S_ONE_PRE + "11", S_ONE_PRE + "12", S_ONE_PRE + "13", S_TWO_PRE + "24", + S_TWO_PRE + "25", S_ONE_PRE + "17", S_TWO_PRE + "27"); + + // clean slate + assertEquals(0, client.deleteByQuery("*:*").getStatus()); + + // many docs from diff shards, more then 10 from a single shard (two) should fail + + try { + ArrayList docs = new ArrayList(30); + docs.add(doc(f("id", S_ONE_PRE + "z"))); + docs.add(doc(f("id", S_TWO_PRE + "z"))); + docs.add(doc(f("id", S_ONE_PRE + "y"))); + docs.add(doc(f("id", S_TWO_PRE + "y"))); + for (int i = 0; i < 11; i++) { + docs.add(doc(f("id", S_ONE_PRE + i))); + docs.add(doc(f("id", S_TWO_PRE + i), f("foo_i", "bogus_val"))); + } + docs.add(doc(f("id", S_ONE_PRE + "x"))); // may be skipped, more then 10 fails + docs.add(doc(f("id", S_TWO_PRE + "x"))); // may be skipped, more then 10 fails + + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "commit", "true"), + docs.toArray(new SolrInputDocument[docs.size()])).process(client); + + fail("did not get a top level exception when more then 10 docs failed: " + rsp.toString()); + } catch (SolrException e) { + // we can't make any reliable assertions about the error message, because + // it varies based on how the request was routed -- see SOLR-8830 + assertEquals("not the type of error we were expecting ("+e.code()+"): " + e.toString(), + // NOTE: we always expect a 400 because we know that's what we would get from these types of errors + // on a single node setup -- a 5xx type error isn't something we should have triggered + 400, e.code()); + + // verify that the Exceptions metadata can tell us what failed. + NamedList remoteErrMetadata = e.getMetadata(); + assertNotNull("no metadata in: " + e.toString(), remoteErrMetadata); + Set actualKnownErrs + = new LinkedHashSet(remoteErrMetadata.size()); + int actualKnownErrsCount = 0; + for (int i = 0; i < remoteErrMetadata.size(); i++) { + ToleratedUpdateError err = + ToleratedUpdateError.parseMetadataIfToleratedUpdateError(remoteErrMetadata.getName(i), + remoteErrMetadata.getVal(i)); + if (null == err) { + // some metadata unrelated to this update processor + continue; + } + actualKnownErrsCount++; + actualKnownErrs.add(err); + } + assertEquals("wrong number of errors in metadata: " + remoteErrMetadata.toString(), + 11, actualKnownErrsCount); + assertEquals("at least one dup error in metadata: " + remoteErrMetadata.toString(), + actualKnownErrsCount, actualKnownErrs.size()); + for (ToleratedUpdateError err : actualKnownErrs) { + assertEquals("only expected type of error is ADD: " + err, + CmdType.ADD, err.getType()); + assertTrue("failed id had unexpected prefix: " + err, + err.getId().startsWith(S_TWO_PRE)); + assertTrue("failed err msg didn't match expected value: " + err, + err.getMessage().contains("bogus_val")); + } + + } + assertEquals(0, client.commit().getStatus()); // need to force since update didn't finish + assertQueryDocIds(client, true + , S_ONE_PRE + "z", S_ONE_PRE + "y", S_TWO_PRE + "z", S_TWO_PRE + "y" // first + // + , S_ONE_PRE + "0", S_ONE_PRE + "1", S_ONE_PRE + "2", S_ONE_PRE + "3", S_ONE_PRE + "4" + , S_ONE_PRE + "5", S_ONE_PRE + "6", S_ONE_PRE + "7", S_ONE_PRE + "8", S_ONE_PRE + "9" + ); + assertQueryDocIds(client, false + // explicitly failed + , S_TWO_PRE + "0", S_TWO_PRE + "1", S_TWO_PRE + "2", S_TWO_PRE + "3", S_TWO_PRE + "4" + , S_TWO_PRE + "5", S_TWO_PRE + "6", S_TWO_PRE + "7", S_TWO_PRE + "8", S_TWO_PRE + "9" + // + // // we can't assert for sure these docs were skipped + // // depending on shard we hit, they may have been added async before errors were exceeded + // , S_ONE_PRE + "x", S_TWO_PRE + "x", // skipped + ); + + // clean slate + assertEquals(0, client.deleteByQuery("*:*").getStatus()); + + // many docs from diff shards, more then 10 don't have any uniqueKey specified + + try { + ArrayList docs = new ArrayList(30); + docs.add(doc(f("id", S_ONE_PRE + "z"))); + docs.add(doc(f("id", S_TWO_PRE + "z"))); + docs.add(doc(f("id", S_ONE_PRE + "y"))); + docs.add(doc(f("id", S_TWO_PRE + "y"))); + for (int i = 0; i < 11; i++) { + // no "id" field + docs.add(doc(f("foo_i", "" + i))); + } + docs.add(doc(f("id", S_ONE_PRE + "x"))); // may be skipped, more then 10 fails + docs.add(doc(f("id", S_TWO_PRE + "x"))); // may be skipped, more then 10 fails + + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "commit", "true"), + docs.toArray(new SolrInputDocument[docs.size()])).process(client); + + fail("did not get a top level exception when more then 10 docs mising uniqueKey: " + rsp.toString()); + } catch (SolrException e) { + // we can't make any reliable assertions about the error message, because + // it varies based on how the request was routed -- see SOLR-8830 + assertEquals("not the type of error we were expecting ("+e.code()+"): " + e.toString(), + // NOTE: we always expect a 400 because we know that's what we would get from these types of errors + // on a single node setup -- a 5xx type error isn't something we should have triggered + 400, e.code()); + + // verify that the Exceptions metadata can tell us what failed. + NamedList remoteErrMetadata = e.getMetadata(); + assertNotNull("no metadata in: " + e.toString(), remoteErrMetadata); + int actualKnownErrsCount = 0; + for (int i = 0; i < remoteErrMetadata.size(); i++) { + ToleratedUpdateError err = + ToleratedUpdateError.parseMetadataIfToleratedUpdateError(remoteErrMetadata.getName(i), + remoteErrMetadata.getVal(i)); + if (null == err) { + // some metadata unrelated to this update processor + continue; + } + actualKnownErrsCount++; + assertEquals("only expected type of error is ADD: " + err, + CmdType.ADD, err.getType()); + assertTrue("failed id didn't match 'unknown': " + err, + err.getId().contains("unknown")); + } + assertEquals("wrong number of errors in metadata: " + remoteErrMetadata.toString(), + 11, actualKnownErrsCount); + } + assertEquals(0, client.commit().getStatus()); // need to force since update didn't finish + assertQueryDocIds(client, true + , S_ONE_PRE + "z", S_ONE_PRE + "y", S_TWO_PRE + "z", S_TWO_PRE + "y" // first + // // we can't assert for sure these docs were skipped or added + // // depending on shard we hit, they may have been added async before errors were exceeded + // , S_ONE_PRE + "x", S_TWO_PRE + "x" // skipped + ); + + // clean slate + assertEquals(0, client.deleteByQuery("*:*").getStatus()); + + // many docs from diff shards, more then 10 from a single shard (two) should fail but + // request should still succeed because of maxErrors=-1 param + + ArrayList docs = new ArrayList(30); + ArrayList expectedErrs = new ArrayList(30); + docs.add(doc(f("id", S_ONE_PRE + "z"))); + docs.add(doc(f("id", S_TWO_PRE + "z"))); + docs.add(doc(f("id", S_ONE_PRE + "y"))); + docs.add(doc(f("id", S_TWO_PRE + "y"))); + for (int i = 0; i < 11; i++) { + docs.add(doc(f("id", S_ONE_PRE + i))); + docs.add(doc(f("id", S_TWO_PRE + i), f("foo_i", "bogus_val"))); + expectedErrs.add(addErr(S_TWO_PRE + i)); + } + docs.add(doc(f("id", S_ONE_PRE + "x"))); + docs.add(doc(f("id", S_TWO_PRE + "x"))); + + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "maxErrors", "-1", + "commit", "true"), + docs.toArray(new SolrInputDocument[docs.size()])).process(client); + assertUpdateTolerantErrors("many docs from shard2 fail, but req should succeed", rsp, + expectedErrs.toArray(new ExpectedErr[expectedErrs.size()])); + assertQueryDocIds(client, true + , S_ONE_PRE + "z", S_ONE_PRE + "y", S_TWO_PRE + "z", S_TWO_PRE + "y" // first + , S_ONE_PRE + "x", S_TWO_PRE + "x" // later + ); + + } + + // + public void testAddsMixedWithDeletesViaCloudClient() throws Exception { + testAddsMixedWithDeletes(CLOUD_CLIENT); + } + public void testAddsMixedWithDeletesViaShard1LeaderClient() throws Exception { + testAddsMixedWithDeletes(S_ONE_LEADER_CLIENT); + } + public void testAddsMixedWithDeletesViaShard2LeaderClient() throws Exception { + testAddsMixedWithDeletes(S_TWO_LEADER_CLIENT); + } + public void testAddsMixedWithDeletesViaShard1NonLeaderClient() throws Exception { + testAddsMixedWithDeletes(S_ONE_NON_LEADER_CLIENT); + } + public void testAddsMixedWithDeletesViaShard2NonLeaderClient() throws Exception { + testAddsMixedWithDeletes(S_TWO_NON_LEADER_CLIENT); + } + public void testAddsMixedWithDeletesViaNoCollectionClient() throws Exception { + testAddsMixedWithDeletes(NO_COLLECTION_CLIENT); + } + + protected static void testAddsMixedWithDeletes(SolrClient client) throws Exception { + assertNotNull("client not initialized", client); + + // 3 doc ids, exactly one on shard1 + final String docId1 = S_ONE_PRE + "42"; + final String docId21 = S_TWO_PRE + "42"; + final String docId22 = S_TWO_PRE + "666"; + + UpdateResponse rsp = null; + + // add 2 docs, one to each shard + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "commit", "true"), + doc(f("id", docId1), f("foo_i", "2001")), + doc(f("id", docId21), f("foo_i", "1976"))).process(client); + assertEquals(0, rsp.getStatus()); + + // add failure on shard2, delete failure on shard1 + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "commit", "true"), + doc(f("id", docId22), f("foo_i", "not_a_num"))) + .deleteById(docId1, -1L) + .process(client); + assertEquals(0, rsp.getStatus()); + assertUpdateTolerantErrors("shard2 add fail, shard1 delI fail", rsp, + delIErr(docId1, "version conflict"), + addErr(docId22,"not_a_num")); + + // attempt a request containing 4 errors of various types (add, delI, delQ) + for (String maxErrors : new String[] {"4", "-1", "100"}) { + // for all of these maxErrors values, the overall request should still succeed + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "maxErrors", maxErrors, + "commit", "true"), + doc(f("id", docId22), f("foo_i", "bogus_val"))) + .deleteById(docId1, -1L) + .deleteByQuery("malformed:[") + .deleteById(docId21, -1L) + .process(client); + + assertEquals(0, rsp.getStatus()); + assertUpdateTolerantErrors("failed variety of updates", rsp, + delIErr(docId1, "version conflict"), + delQErr("malformed:[", "SyntaxError"), + delIErr(docId21,"version conflict"), + addErr(docId22,"bogus_val")); + } + + // attempt a request containing 4 errors of various types (add, delI, delQ) .. 1 too many + try { + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "maxErrors", "3", + "commit", "true"), + doc(f("id", docId22), f("foo_i", "bogus_val"))) + .deleteById(docId1, -1L) + .deleteByQuery("malformed:[") + .deleteById(docId21, -1L) + .process(client); + fail("did not get a top level exception when more then 4 updates failed: " + rsp.toString()); + } catch (SolrException e) { + // we can't make any reliable assertions about the error message, because + // it varies based on how the request was routed -- see SOLR-8830 + + // likewise, we can't make a firm(er) assertion about the response code... + assertTrue("not the type of error we were expecting ("+e.code()+"): " + e.toString(), + // should be one these 2 depending on order that the async errors were hit... + // on a single node setup -- a 5xx type error isn't something we should have triggered + 400 == e.code() || 409 == e.code()); + + // verify that the Exceptions metadata can tell us what failed. + NamedList remoteErrMetadata = e.getMetadata(); + assertNotNull("no metadata in: " + e.toString(), remoteErrMetadata); + Set actualKnownErrs + = new LinkedHashSet(remoteErrMetadata.size()); + int actualKnownErrsCount = 0; + for (int i = 0; i < remoteErrMetadata.size(); i++) { + ToleratedUpdateError err = + ToleratedUpdateError.parseMetadataIfToleratedUpdateError(remoteErrMetadata.getName(i), + remoteErrMetadata.getVal(i)); + if (null == err) { + // some metadata unrelated to this update processor + continue; + } + actualKnownErrsCount++; + actualKnownErrs.add(err); + } + assertEquals("wrong number of errors in metadata: " + remoteErrMetadata.toString(), + 4, actualKnownErrsCount); + assertEquals("at least one dup error in metadata: " + remoteErrMetadata.toString(), + actualKnownErrsCount, actualKnownErrs.size()); + } + + // sanity check our 2 existing docs are still here + assertQueryDocIds(client, true, docId1, docId21); + assertQueryDocIds(client, false, docId22); + + // tolerate some failures along with a DELQ that should succeed + rsp = update(params("update.chain", "tolerant-chain-max-errors-10", + "commit", "true"), + doc(f("id", docId22), f("foo_i", "not_a_num"))) + .deleteById(docId1, -1L) + .deleteByQuery("zot_i:[42 to gibberish...") + .deleteByQuery("foo_i:[50 TO 2000}") + .process(client); + assertEquals(0, rsp.getStatus()); + assertUpdateTolerantErrors("mix fails with one valid DELQ", rsp, + delIErr(docId1, "version conflict"), + delQErr("zot_i:[42 to gibberish..."), + addErr(docId22,"not_a_num")); + // one of our previous docs should have been deleted now + assertQueryDocIds(client, true, docId1); + assertQueryDocIds(client, false, docId21, docId22); + + } + + /** + * HACK: Loops over every Jetty instance in the specified MiniSolrCloudCluster to see if they are running, + * and sleeps small increments until they all report that they are, or a max num iters is reached + * + * (work around for SOLR-8862. Maybe something like this should be promoted into MiniSolrCloudCluster's + * start() method? or SolrCloudTestCase's configureCluster?) + */ + public static void assertSpinLoopAllJettyAreRunning(MiniSolrCloudCluster cluster) throws InterruptedException { + // NOTE: idealy we could use an ExecutorService that tried to open Sockets (with a long timeout) + // to each of the jetty instances in parallel w/o any sleeping -- but since they pick their ports + // dynamically and don't report them until/unless the server is up, that won't neccessarily do us + // any good. + final int numServers = cluster.getJettySolrRunners().size(); + int numRunning = 0; + for (int i = 5; 0 <= i; i--) { + numRunning = 0; + for (JettySolrRunner jetty : cluster.getJettySolrRunners()) { + if (jetty.isRunning()) { + numRunning++; + } + } + if (numServers == numRunning) { + return; + } else if (0 == i) { + // give up + break; + } + // the more nodes we're waiting on, the longer we should try to sleep (within reason) + Thread.sleep(Math.min((numServers - numRunning) * 100, 1000)); + } + assertEquals("giving up waiting for all jetty instances to be running", + numServers, numRunning); + } + + /** Asserts that the UpdateResponse contains the specified expectedErrs and no others */ + public static void assertUpdateTolerantErrors(String assertionMsgPrefix, + UpdateResponse response, + ExpectedErr... expectedErrs) { + @SuppressWarnings("unchecked") + List> errors = (List>) + response.getResponseHeader().get("errors"); + + assertNotNull(assertionMsgPrefix + ": Null errors: " + response.toString(), errors); + assertEquals(assertionMsgPrefix + ": Num error ids: " + errors.toString(), + expectedErrs.length, errors.size()); + + for (SimpleOrderedMap err : errors) { + String assertErrPre = assertionMsgPrefix + ": " + err.toString(); + + String id = err.get("id"); + assertNotNull(assertErrPre + " ... null id", id); + String type = err.get("type"); + assertNotNull(assertErrPre + " ... null type", type); + String message = err.get("message"); + assertNotNull(assertErrPre + " ... null message", message); + + // inefficient scan, but good nough for the size of sets we're dealing with + boolean found = false; + for (ExpectedErr expected : expectedErrs) { + if (expected.type.equals(type) && expected.id.equals(id) + && (null == expected.msgSubStr || message.contains(expected.msgSubStr))) { + found = true; + break; + } + } + assertTrue(assertErrPre + " ... unexpected err in: " + response.toString(), found); + + } + } + + /** convinience method when the only type of errors you expect are 'add' errors */ + public static void assertUpdateTolerantAddErrors(String assertionMsgPrefix, + UpdateResponse response, + String... errorIdsExpected) { + ExpectedErr[] expected = new ExpectedErr[errorIdsExpected.length]; + for (int i = 0; i < expected.length; i++) { + expected[i] = addErr(errorIdsExpected[i]); + } + assertUpdateTolerantErrors(assertionMsgPrefix, response, expected); + } + + /** + * Asserts that the specified document ids do/do-not exist in the index, using both the specified client, + * and the CLOUD_CLIENT + */ + public static void assertQueryDocIds(SolrClient client, boolean shouldExist, String... ids) throws Exception { + for (String id : ids) { + assertEquals(client.toString() + " should " + (shouldExist ? "" : "not ") + "find id: " + id, + (shouldExist ? 1 : 0), + CLOUD_CLIENT.query(params("q", "{!term f=id}" + id)).getResults().getNumFound()); + } + if (! CLOUD_CLIENT.equals(client) ) { + assertQueryDocIds(CLOUD_CLIENT, shouldExist, ids); + } + } + + public static UpdateRequest update(SolrParams params, SolrInputDocument... docs) { + UpdateRequest r = new UpdateRequest(); + r.setParams(new ModifiableSolrParams(params)); + r.add(Arrays.asList(docs)); + return r; + } + + public static SolrInputDocument doc(SolrInputField... fields) { + SolrInputDocument doc = new SolrInputDocument(); + for (SolrInputField f : fields) { + doc.put(f.getName(), f); + } + return doc; + } + + public static SolrInputField f(String fieldName, Object... values) { + SolrInputField f = new SolrInputField(fieldName); + f.setValue(values, 1.0F); + return f; + } + + /** simple helper struct */ + public static final class ExpectedErr { + final String type; + final String id; + final String msgSubStr; // ignored if null + + public ExpectedErr(String type, String id, String msgSubStr) { + this.type = type; + this.id = id; + this.msgSubStr = msgSubStr; + } + public String toString() { + return "type=<"+type+">,id=<"+id+">,msgSubStr=<"+msgSubStr+">"; + } + } + public static ExpectedErr addErr(String id, String msgSubStr) { + return new ExpectedErr("ADD", id, msgSubStr); + } + public static ExpectedErr delIErr(String id, String msgSubStr) { + return new ExpectedErr("DELID", id, msgSubStr); + } + public static ExpectedErr delQErr(String id, String msgSubStr) { + return new ExpectedErr("DELQ", id, msgSubStr); + } + public static ExpectedErr addErr(String id) { + return addErr(id, null); + } + public static ExpectedErr delIErr(String id) { + return delIErr(id, null); + } + public static ExpectedErr delQErr(String id) { + return delQErr(id, null); + } +} diff --git a/solr/core/src/test/org/apache/solr/cloud/TestTolerantUpdateProcessorRandomCloud.java b/solr/core/src/test/org/apache/solr/cloud/TestTolerantUpdateProcessorRandomCloud.java new file mode 100644 index 00000000000..b3f0423c999 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/TestTolerantUpdateProcessorRandomCloud.java @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud; + +import java.io.File; +import java.lang.invoke.MethodHandles; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import org.apache.lucene.util.TestUtil; +import org.apache.solr.cloud.SolrCloudTestCase; +import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.assertUpdateTolerantErrors; +import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.addErr; +import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.delIErr; +import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.delQErr; +import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.f; +import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.update; +import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.ExpectedErr; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.UpdateResponse; +import static org.apache.solr.common.params.CursorMarkParams.CURSOR_MARK_PARAM; +import static org.apache.solr.common.params.CursorMarkParams.CURSOR_MARK_NEXT; +import static org.apache.solr.common.params.CursorMarkParams.CURSOR_MARK_START; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.SolrInputField; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.ToleratedUpdateError; +import org.apache.solr.common.ToleratedUpdateError.CmdType; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.util.RevertDefaultThreadHandlerRule; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test of TolerantUpdateProcessor using a randomized MiniSolrCloud. + * Reuses some utility methods in {@link TestTolerantUpdateProcessorCloud} + * + *

+ * NOTE: This test sets up a static instance of MiniSolrCloud with a single collection + * and several clients pointed at specific nodes. These are all re-used across multiple test methods, + * and assumes that the state of the cluster is healthy between tests. + *

+ * + */ +public class TestTolerantUpdateProcessorRandomCloud extends SolrCloudTestCase { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final String COLLECTION_NAME = "test_col"; + + /** A basic client for operations at the cloud level, default collection will be set */ + private static CloudSolrClient CLOUD_CLIENT; + /** one HttpSolrClient for each server */ + private static List NODE_CLIENTS; + + @BeforeClass + private static void createMiniSolrCloudCluster() throws Exception { + + final String configName = "solrCloudCollectionConfig"; + final File configDir = new File(TEST_HOME() + File.separator + "collection1" + File.separator + "conf"); + + final int numShards = TestUtil.nextInt(random(), 2, TEST_NIGHTLY ? 5 : 3); + final int repFactor = TestUtil.nextInt(random(), 2, TEST_NIGHTLY ? 5 : 3); + // at least one server won't have any replicas + final int numServers = 1 + (numShards * repFactor); + + log.info("Configuring cluster: servers={}, shards={}, repfactor={}", numServers, numShards, repFactor); + configureCluster(numServers) + .addConfig(configName, configDir.toPath()) + .configure(); + + TestTolerantUpdateProcessorCloud.assertSpinLoopAllJettyAreRunning(cluster); + + Map collectionProperties = new HashMap<>(); + collectionProperties.put("config", "solrconfig-distrib-update-processor-chains.xml"); + collectionProperties.put("schema", "schema15.xml"); // string id + + + assertNotNull(cluster.createCollection(COLLECTION_NAME, numShards, repFactor, + configName, null, null, collectionProperties)); + + CLOUD_CLIENT = cluster.getSolrClient(); + CLOUD_CLIENT.setDefaultCollection(COLLECTION_NAME); + + NODE_CLIENTS = new ArrayList(numServers); + + for (JettySolrRunner jetty : cluster.getJettySolrRunners()) { + URL jettyURL = jetty.getBaseUrl(); + NODE_CLIENTS.add(new HttpSolrClient(jettyURL.toString() + "/" + COLLECTION_NAME + "/")); + } + assertEquals(numServers, NODE_CLIENTS.size()); + + ZkStateReader zkStateReader = CLOUD_CLIENT.getZkStateReader(); + AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION_NAME, zkStateReader, true, true, 330); + + } + + @Before + private void deleteAllDocs() throws Exception { + assertEquals(0, update(params("commit","true")).deleteByQuery("*:*").process(CLOUD_CLIENT).getStatus()); + assertEquals("index should be empty", 0L, countDocs(CLOUD_CLIENT)); + } + + public void testRandomUpdates() throws Exception { + final int maxDocId = atLeast(10000); + final BitSet expectedDocIds = new BitSet(maxDocId+1); + + final int numIters = atLeast(50); + for (int i = 0; i < numIters; i++) { + + log.info("BEGIN ITER #{}", i); + + final UpdateRequest req = update(params("maxErrors","-1", + "update.chain", "tolerant-chain-max-errors-10")); + final int numCmds = TestUtil.nextInt(random(), 1, 20); + final List expectedErrors = new ArrayList(numCmds); + int expectedErrorsCount = 0; + // it's ambigious/confusing which order mixed DELQ + ADD (or ADD and DELI for the same ID) + // in the same request wll be processed by various clients, so we keep things simple + // and ensure that no single doc Id is affected by more then one command in the same request + final BitSet docsAffectedThisRequest = new BitSet(maxDocId+1); + for (int cmdIter = 0; cmdIter < numCmds; cmdIter++) { + if ((maxDocId / 2) < docsAffectedThisRequest.cardinality()) { + // we're already mucking with more then half the docs in the index + break; + } + + final boolean causeError = random().nextBoolean(); + if (causeError) { + expectedErrorsCount++; + } + + if (random().nextBoolean()) { + // add a doc + String id = null; + SolrInputDocument doc = null; + if (causeError && (0 == TestUtil.nextInt(random(), 0, 21))) { + doc = doc(f("foo_s","no unique key")); + expectedErrors.add(addErr("(unknown)")); + } else { + final int id_i = randomUnsetBit(random(), docsAffectedThisRequest, maxDocId); + docsAffectedThisRequest.set(id_i); + id = "id_"+id_i; + if (causeError) { + expectedErrors.add(addErr(id)); + } else { + expectedDocIds.set(id_i); + } + final String val = causeError ? "bogus_val" : (""+TestUtil.nextInt(random(), 42, 666)); + doc = doc(f("id",id), + f("id_i", id_i), + f("foo_i", val)); + } + req.add(doc); + log.info("ADD: {} = {}", id, doc); + } else { + // delete something + if (random().nextBoolean()) { + // delete by id + final int id_i = randomUnsetBit(random(), docsAffectedThisRequest, maxDocId); + final String id = "id_"+id_i; + final boolean docExists = expectedDocIds.get(id_i); + docsAffectedThisRequest.set(id_i); + long versionConstraint = docExists ? 1 : -1; + if (causeError) { + versionConstraint = -1 * versionConstraint; + expectedErrors.add(delIErr(id)); + } else { + // if doc exists it will legitimately be deleted + expectedDocIds.clear(id_i); + } + req.deleteById(id, versionConstraint); + log.info("DEL: {} = {}", id, causeError ? "ERR" : "OK" ); + } else { + // delete by query + final String q; + if (causeError) { + // even though our DBQ is gibberish that's going to fail, record a docId as affected + // so that we don't generate the same random DBQ and get redundent errors + // (problematic because of how DUP forwarded DBQs have to have their errors deduped by TUP) + final int id_i = randomUnsetBit(random(), docsAffectedThisRequest, maxDocId); + docsAffectedThisRequest.set(id_i); + q = "foo_i:["+id_i+" TO ....giberish"; + expectedErrors.add(delQErr(q)); + } else { + // ensure our DBQ is only over a range of docs not already affected + // by any other cmds in this request + final int rangeAxis = randomUnsetBit(random(), docsAffectedThisRequest, maxDocId); + final int loBound = docsAffectedThisRequest.previousSetBit(rangeAxis); + final int hiBound = docsAffectedThisRequest.nextSetBit(rangeAxis); + final int lo = TestUtil.nextInt(random(), loBound+1, rangeAxis); + final int hi = TestUtil.nextInt(random(), rangeAxis, + // bound might be negative if no set bits above axis + (hiBound < 0) ? maxDocId : hiBound-1); + + if (lo != hi) { + assert lo < hi : "lo="+lo+" hi="+hi; + // NOTE: clear & set are exclusive of hi, so we use "}" in range query accordingly + q = "id_i:[" + lo + " TO " + hi + "}"; + expectedDocIds.clear(lo, hi); + docsAffectedThisRequest.set(lo, hi); + } else { + // edge case: special case DBQ of one doc + assert (lo == rangeAxis && hi == rangeAxis) : "lo="+lo+" axis="+rangeAxis+" hi="+hi; + q = "id_i:[" + lo + " TO " + lo + "]"; // have to be inclusive of both ends + expectedDocIds.clear(lo); + docsAffectedThisRequest.set(lo); + } + } + req.deleteByQuery(q); + log.info("DEL: {}", q); + } + } + } + assertEquals("expected error count sanity check: " + req.toString(), + expectedErrorsCount, expectedErrors.size()); + + final SolrClient client = random().nextBoolean() ? CLOUD_CLIENT + : NODE_CLIENTS.get(TestUtil.nextInt(random(), 0, NODE_CLIENTS.size()-1)); + + final UpdateResponse rsp = req.process(client); + assertUpdateTolerantErrors(client.toString() + " => " + expectedErrors.toString(), rsp, + expectedErrors.toArray(new ExpectedErr[expectedErrors.size()])); + + log.info("END ITER #{}, expecting #docs: {}", i, expectedDocIds.cardinality()); + + assertEquals("post update commit failed?", 0, CLOUD_CLIENT.commit().getStatus()); + + for (int j = 0; j < 5; j++) { + if (expectedDocIds.cardinality() == countDocs(CLOUD_CLIENT)) { + break; + } + log.info("sleeping to give searchers a chance to re-open #" + j); + Thread.sleep(200); + } + + // check the index contents against our expecationts + final BitSet actualDocIds = allDocs(CLOUD_CLIENT, maxDocId); + if ( expectedDocIds.cardinality() != actualDocIds.cardinality() ) { + log.error("cardinality missmatch: expected {} BUT actual {}", + expectedDocIds.cardinality(), + actualDocIds.cardinality()); + } + final BitSet x = (BitSet) actualDocIds.clone(); + x.xor(expectedDocIds); + for (int b = x.nextSetBit(0); 0 <= b; b = x.nextSetBit(b+1)) { + final boolean expectedBit = expectedDocIds.get(b); + final boolean actualBit = actualDocIds.get(b); + log.error("bit #"+b+" mismatch: expected {} BUT actual {}", expectedBit, actualBit); + } + assertEquals(x.cardinality() + " mismatched bits", + expectedDocIds.cardinality(), actualDocIds.cardinality()); + } + } + + /** sanity check that randomUnsetBit works as expected + * @see #randomUnsetBit + */ + public void testSanityRandomUnsetBit() { + final int max = atLeast(100); + BitSet bits = new BitSet(max+1); + for (int i = 0; i <= max; i++) { + assertFalse("how is bitset already full? iter="+i+" card="+bits.cardinality()+"/max="+max, + bits.cardinality() == max+1); + final int nextBit = randomUnsetBit(random(), bits, max); + assertTrue("nextBit shouldn't be negative yet: " + nextBit, + 0 <= nextBit); + assertTrue("nextBit can't exceed max: " + nextBit, + nextBit <= max); + assertFalse("expect unset: " + nextBit, bits.get(nextBit)); + bits.set(nextBit); + } + + assertEquals("why isn't bitset full?", max+1, bits.cardinality()); + + final int firstClearBit = bits.nextClearBit(0); + assertTrue("why is there a clear bit? = " + firstClearBit, + max < firstClearBit); + assertEquals("why is a bit set above max?", + -1, bits.nextSetBit(max+1)); + + assertEquals("wrong nextBit at end of all iters", -1, + randomUnsetBit(random(), bits, max)); + assertEquals("wrong nextBit at redundent end of all iters", -1, + randomUnsetBit(random(), bits, max)); + } + + public static SolrInputDocument doc(SolrInputField... fields) { + // SolrTestCaseJ4 has same method name, prevents static import from working + return TestTolerantUpdateProcessorCloud.doc(fields); + } + + /** + * Given a BitSet, returns a random bit that is currently false, or -1 if all bits are true. + * NOTE: this method is not fair. + */ + public static final int randomUnsetBit(Random r, BitSet bits, final int max) { + // NOTE: don't forget, BitSet will grow automatically if not careful + if (bits.cardinality() == max+1) { + return -1; + } + final int candidate = TestUtil.nextInt(r, 0, max); + if (bits.get(candidate)) { + final int lo = bits.previousClearBit(candidate); + final int hi = bits.nextClearBit(candidate); + if (lo < 0 && max < hi) { + fail("how the hell did we not short circut out? card="+bits.cardinality()+"/size="+bits.size()); + } else if (lo < 0) { + return hi; + } else if (max < hi) { + return lo; + } // else... + return ((candidate - lo) < (hi - candidate)) ? lo : hi; + } + return candidate; + } + + /** returns the numFound from a *:* query */ + public static final long countDocs(SolrClient c) throws Exception { + return c.query(params("q","*:*","rows","0")).getResults().getNumFound(); + } + + /** uses a Cursor to iterate over every doc in the index, recording the 'id_i' value in a BitSet */ + private static final BitSet allDocs(final SolrClient c, final int maxDocIdExpected) throws Exception { + BitSet docs = new BitSet(maxDocIdExpected+1); + String cursorMark = CURSOR_MARK_START; + int docsOnThisPage = Integer.MAX_VALUE; + while (0 < docsOnThisPage) { + final SolrParams p = params("q","*:*", + "rows","100", + // note: not numeric, but we don't actual care about the order + "sort", "id asc", + CURSOR_MARK_PARAM, cursorMark); + QueryResponse rsp = c.query(p); + cursorMark = rsp.getNextCursorMark(); + docsOnThisPage = 0; + for (SolrDocument doc : rsp.getResults()) { + docsOnThisPage++; + int id_i = ((Integer)doc.get("id_i")).intValue(); + assertTrue("found id_i bigger then expected "+maxDocIdExpected+": " + id_i, + id_i <= maxDocIdExpected); + docs.set(id_i); + } + cursorMark = rsp.getNextCursorMark(); + } + return docs; + } +} diff --git a/solr/core/src/test/org/apache/solr/core/TestBadConfig.java b/solr/core/src/test/org/apache/solr/core/TestBadConfig.java index 637c9751f7f..31361befd5e 100644 --- a/solr/core/src/test/org/apache/solr/core/TestBadConfig.java +++ b/solr/core/src/test/org/apache/solr/core/TestBadConfig.java @@ -96,4 +96,9 @@ public class TestBadConfig extends AbstractBadConfigTestBase { assertConfigs("bad-solrconfig-unexpected-schema-attribute.xml", "schema-minimal.xml", "Unexpected arg(s): {bogusParam=bogusValue}"); } + + public void testTolerantUpdateProcessorNoUniqueKey() throws Exception { + assertConfigs("solrconfig-tolerant-update-minimal.xml", "schema-minimal.xml", + "requires a schema that includes a uniqueKey field"); + } } diff --git a/solr/core/src/test/org/apache/solr/update/processor/TolerantUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/TolerantUpdateProcessorTest.java new file mode 100644 index 00000000000..9bbead8eb1e --- /dev/null +++ b/solr/core/src/test/org/apache/solr/update/processor/TolerantUpdateProcessorTest.java @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.update.processor; + +import java.io.IOException; +import java.io.StringWriter; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import javax.xml.xpath.XPathExpressionException; + +import org.apache.solr.client.solrj.util.ClientUtils; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.ToleratedUpdateError; +import org.apache.solr.common.ToleratedUpdateError.CmdType; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.core.SolrCore; +import org.apache.solr.request.LocalSolrQueryRequest; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.request.SolrRequestHandler; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.servlet.DirectSolrConnection; +import org.apache.solr.update.AddUpdateCommand; +import org.apache.solr.util.BaseTestHarness; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.xml.sax.SAXException; + +public class TolerantUpdateProcessorTest extends UpdateProcessorTestBase { + + /** + * List of valid + invalid documents + */ + private static List docs = null; + /** + * IDs of the invalid documents in docs + */ + private static String[] badIds = null; + + @BeforeClass + public static void beforeClass() throws Exception { + initCore("solrconfig-update-processor-chains.xml", "schema12.xml"); + } + + @AfterClass + public static void tearDownClass() { + docs = null; + badIds = null; + } + + @Override + public void setUp() throws Exception { + super.setUp(); + //expected exception messages + ignoreException("Error adding field"); + ignoreException("Document is missing mandatory uniqueKey field"); + if (docs == null) { + docs = new ArrayList<>(20); + badIds = new String[10]; + for(int i = 0; i < 10;i++) { + // a valid document + docs.add(doc(field("id", 1f, String.valueOf(2*i)), field("weight", 1f, i))); + // ... and an invalid one + docs.add(doc(field("id", 1f, String.valueOf(2*i+1)), field("weight", 1f, "b"))); + badIds[i] = String.valueOf(2*i+1); + } + } + + } + + @Override + public void tearDown() throws Exception { + resetExceptionIgnores(); + assertU(delQ("*:*")); + assertU(commit()); + assertQ(req("q","*:*") + ,"//result[@numFound='0']"); + super.tearDown(); + } + + /** + * future proof TolerantUpdateProcessor against new default method impls being added to UpdateProcessor + * to ensure that every method involved in a processor chain life cycle is overridden with + * exception catching/tracking. + */ + public void testReflection() { + for (Method method : TolerantUpdateProcessor.class.getMethods()) { + if (method.getDeclaringClass().equals(Object.class)) { + continue; + } + assertEquals("base class(es) has changed, TolerantUpdateProcessor needs updated to ensure it " + + "overrides all solr update lifcycle methods with exception tracking: " + method.toString(), + TolerantUpdateProcessor.class, method.getDeclaringClass()); + } + } + + + @Test + public void testValidAdds() throws IOException { + SolrInputDocument validDoc = doc(field("id", 1f, "1"), field("text", 1f, "the quick brown fox")); + add("tolerant-chain-max-errors-10", null, validDoc); + + validDoc = doc(field("id", 1f, "2"), field("text", 1f, "the quick brown fox")); + add("tolerant-chain-max-errors-not-set", null, validDoc); + + assertU(commit()); + assertQ(req("q","*:*") + ,"//result[@numFound='2']"); + assertQ(req("q","id:1") + ,"//result[@numFound='1']"); + assertQ(req("q","id:2") + ,"//result[@numFound='1']"); + } + + @Test + public void testInvalidAdds() throws IOException { + SolrInputDocument invalidDoc = doc(field("text", 1f, "the quick brown fox")); //no id + try { + // This doc should fail without being tolerant + add("not-tolerant", null, invalidDoc); + fail("Expecting exception"); + } catch (Exception e) { + //expected + assertTrue(e.getMessage().contains("Document is missing mandatory uniqueKey field")); + } + assertAddsSucceedWithErrors("tolerant-chain-max-errors-10", Arrays.asList(new SolrInputDocument[]{invalidDoc}), null, "(unknown)"); + + //a valid doc + SolrInputDocument validDoc = doc(field("id", 1f, "1"), field("text", 1f, "the quick brown fox")); + + try { + // This batch should fail without being tolerant + add("not-tolerant", null, Arrays.asList(new SolrInputDocument[]{invalidDoc, validDoc})); + fail("Expecting exception"); + } catch (Exception e) { + //expected + assertTrue(e.getMessage().contains("Document is missing mandatory uniqueKey field")); + } + + assertU(commit()); + assertQ(req("q","id:1") + ,"//result[@numFound='0']"); + + + assertAddsSucceedWithErrors("tolerant-chain-max-errors-10", Arrays.asList(new SolrInputDocument[]{invalidDoc, validDoc}), null, "(unknown)"); + assertU(commit()); + + // verify that the good document made it in. + assertQ(req("q","id:1") + ,"//result[@numFound='1']"); + + invalidDoc = doc(field("id", 1f, "2"), field("weight", 1f, "aaa")); + validDoc = doc(field("id", 1f, "3"), field("weight", 1f, "3")); + + try { + // This batch should fail without being tolerant + add("not-tolerant", null, Arrays.asList(new SolrInputDocument[]{invalidDoc, validDoc})); //no id + fail("Expecting exception"); + } catch (Exception e) { + //expected + assertTrue(e.getMessage().contains("Error adding field")); + } + + assertU(commit()); + assertQ(req("q","id:3") + ,"//result[@numFound='0']"); + + assertAddsSucceedWithErrors("tolerant-chain-max-errors-10", Arrays.asList(new SolrInputDocument[]{invalidDoc, validDoc}), null, "2"); + assertU(commit()); + + // The valid document was indexed + assertQ(req("q","id:3") + ,"//result[@numFound='1']"); + + // The invalid document was NOT indexed + assertQ(req("q","id:2") + ,"//result[@numFound='0']"); + + } + + @Test + public void testMaxErrorsDefault() throws IOException { + try { + // by default the TolerantUpdateProcessor accepts all errors, so this batch should succeed with 10 errors. + assertAddsSucceedWithErrors("tolerant-chain-max-errors-not-set", docs, null, badIds); + } catch(Exception e) { + fail("Shouldn't get an exception for this batch: " + e.getMessage()); + } + assertU(commit()); + assertQ(req("q","*:*") + ,"//result[@numFound='10']"); + } + + public void testMaxErrorsSucceed() throws IOException { + ModifiableSolrParams requestParams = new ModifiableSolrParams(); + requestParams.add("maxErrors", "10"); + // still OK + assertAddsSucceedWithErrors("tolerant-chain-max-errors-not-set", docs, requestParams, badIds); + assertU(commit()); + assertQ(req("q","*:*") + ,"//result[@numFound='10']"); + } + + @Test + public void testMaxErrorsThrowsException() throws IOException { + ModifiableSolrParams requestParams = new ModifiableSolrParams(); + requestParams.add("maxErrors", "5"); + try { + // should fail + assertAddsSucceedWithErrors("tolerant-chain-max-errors-not-set", docs, requestParams, badIds); + fail("Expecting exception"); + } catch (SolrException e) { + assertTrue(e.getMessage(), + e.getMessage().contains("ERROR: [doc=1] Error adding field 'weight'='b' msg=For input string: \"b\"")); + } + //the first good documents made it to the index + assertU(commit()); + assertQ(req("q","*:*") + ,"//result[@numFound='6']"); + } + + @Test + public void testMaxErrorsInfinite() throws IOException { + ModifiableSolrParams requestParams = new ModifiableSolrParams(); + requestParams.add("maxErrors", "-1"); + try { + assertAddsSucceedWithErrors("tolerant-chain-max-errors-not-set", docs, null, badIds); + } catch(Exception e) { + fail("Shouldn't get an exception for this batch: " + e.getMessage()); + } + assertU(commit()); + assertQ(req("q","*:*") + ,"//result[@numFound='10']"); + } + + @Test + public void testMaxErrors0() throws IOException { + //make the TolerantUpdateProcessor intolerant + List smallBatch = docs.subList(0, 2); + ModifiableSolrParams requestParams = new ModifiableSolrParams(); + requestParams.add("maxErrors", "0"); + try { + // should fail + assertAddsSucceedWithErrors("tolerant-chain-max-errors-10", smallBatch, requestParams, "1"); + fail("Expecting exception"); + } catch (SolrException e) { + assertTrue(e.getMessage().contains("ERROR: [doc=1] Error adding field 'weight'='b' msg=For input string: \"b\"")); + } + //the first good documents made it to the index + assertU(commit()); + assertQ(req("q","*:*") + ,"//result[@numFound='1']"); + } + + @Test + public void testInvalidDelete() throws XPathExpressionException, SAXException { + ignoreException("undefined field invalidfield"); + String response = update("tolerant-chain-max-errors-10", adoc("id", "1", "text", "the quick brown fox")); + assertNull(BaseTestHarness.validateXPath(response, + "//int[@name='status']=0", + "//arr[@name='errors']", + "count(//arr[@name='errors']/lst)=0")); + + response = update("tolerant-chain-max-errors-10", delQ("invalidfield:1")); + assertNull(BaseTestHarness.validateXPath + (response, + "//int[@name='status']=0", + "count(//arr[@name='errors']/lst)=1", + "//arr[@name='errors']/lst/str[@name='type']/text()='DELQ'", + "//arr[@name='errors']/lst/str[@name='id']/text()='invalidfield:1'", + "//arr[@name='errors']/lst/str[@name='message']/text()='undefined field invalidfield'")); + } + + @Test + public void testValidDelete() throws XPathExpressionException, SAXException { + ignoreException("undefined field invalidfield"); + String response = update("tolerant-chain-max-errors-10", adoc("id", "1", "text", "the quick brown fox")); + assertNull(BaseTestHarness.validateXPath(response, + "//int[@name='status']=0", + "//arr[@name='errors']", + "count(//arr[@name='errors']/lst)=0")); + + assertU(commit()); + assertQ(req("q","*:*") + ,"//result[@numFound='1']"); + + response = update("tolerant-chain-max-errors-10", delQ("id:1")); + assertNull(BaseTestHarness.validateXPath(response, + "//int[@name='status']=0", + "//arr[@name='errors']", + "count(//arr[@name='errors']/lst)=0")); + assertU(commit()); + assertQ(req("q","*:*") + ,"//result[@numFound='0']"); + } + + @Test + public void testResponse() throws SAXException, XPathExpressionException, IOException { + String response = update("tolerant-chain-max-errors-10", adoc("id", "1", "text", "the quick brown fox")); + assertNull(BaseTestHarness.validateXPath(response, + "//int[@name='status']=0", + "//arr[@name='errors']", + "count(//arr[@name='errors']/lst)=0")); + response = update("tolerant-chain-max-errors-10", adoc("text", "the quick brown fox")); + assertNull(BaseTestHarness.validateXPath(response, "//int[@name='status']=0", + "//int[@name='maxErrors']/text()='10'", + "count(//arr[@name='errors']/lst)=1", + "//arr[@name='errors']/lst/str[@name='id']/text()='(unknown)'", + "//arr[@name='errors']/lst/str[@name='message']/text()='Document is missing mandatory uniqueKey field: id'")); + + response = update("tolerant-chain-max-errors-10", adoc("text", "the quick brown fox")); + StringWriter builder = new StringWriter(); + builder.append(""); + for (SolrInputDocument doc:docs) { + ClientUtils.writeXML(doc, builder); + } + builder.append(""); + response = update("tolerant-chain-max-errors-10", builder.toString()); + assertNull(BaseTestHarness.validateXPath(response, "//int[@name='status']=0", + "//int[@name='maxErrors']/text()='10'", + "count(//arr[@name='errors']/lst)=10", + "not(//arr[@name='errors']/lst/str[@name='id']/text()='0')", + "//arr[@name='errors']/lst/str[@name='id']/text()='1'", + "not(//arr[@name='errors']/lst/str[@name='id']/text()='2')", + "//arr[@name='errors']/lst/str[@name='id']/text()='3'", + "not(//arr[@name='errors']/lst/str[@name='id']/text()='4')", + "//arr[@name='errors']/lst/str[@name='id']/text()='5'", + "not(//arr[@name='errors']/lst/str[@name='id']/text()='6')", + "//arr[@name='errors']/lst/str[@name='id']/text()='7'", + "not(//arr[@name='errors']/lst/str[@name='id']/text()='8')", + "//arr[@name='errors']/lst/str[@name='id']/text()='9'", + "not(//arr[@name='errors']/lst/str[@name='id']/text()='10')", + "//arr[@name='errors']/lst/str[@name='id']/text()='11'", + "not(//arr[@name='errors']/lst/str[@name='id']/text()='12')", + "//arr[@name='errors']/lst/str[@name='id']/text()='13'", + "not(//arr[@name='errors']/lst/str[@name='id']/text()='14')", + "//arr[@name='errors']/lst/str[@name='id']/text()='15'", + "not(//arr[@name='errors']/lst/str[@name='id']/text()='16')", + "//arr[@name='errors']/lst/str[@name='id']/text()='17'", + "not(//arr[@name='errors']/lst/str[@name='id']/text()='18')", + "//arr[@name='errors']/lst/str[@name='id']/text()='19'")); + + // spot check response when effective maxErrors is unlimited + response = update("tolerant-chain-max-errors-not-set", builder.toString()); + assertNull(BaseTestHarness.validateXPath(response, "//int[@name='maxErrors']/text()='-1'")); + + } + + + + public String update(String chain, String xml) { + DirectSolrConnection connection = new DirectSolrConnection(h.getCore()); + SolrRequestHandler handler = h.getCore().getRequestHandler("/update"); + ModifiableSolrParams params = new ModifiableSolrParams(); + params.add("update.chain", chain); + try { + return connection.request(handler, params, xml); + } catch (SolrException e) { + throw (SolrException)e; + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e); + } + } + + private void assertAddsSucceedWithErrors(String chain, + final Collection docs, + SolrParams requestParams, + String... idsShouldFail) throws IOException { + + SolrQueryResponse response = add(chain, requestParams, docs); + + @SuppressWarnings("unchecked") + List> errors = (List>) + response.getResponseHeader().get("errors"); + assertNotNull(errors); + + assertEquals("number of errors", idsShouldFail.length, errors.size()); + + Set addErrorIdsExpected = new HashSet(Arrays.asList(idsShouldFail)); + + for (SimpleOrderedMap err : errors) { + assertEquals("this method only expects 'add' errors", "ADD", err.get("type")); + + String id = err.get("id"); + assertNotNull("null err id", id); + assertTrue("unexpected id", addErrorIdsExpected.contains(id)); + + } + } + + protected SolrQueryResponse add(final String chain, SolrParams requestParams, final SolrInputDocument doc) throws IOException { + return add(chain, requestParams, Arrays.asList(new SolrInputDocument[]{doc})); + } + + protected SolrQueryResponse add(final String chain, SolrParams requestParams, final Collection docs) throws IOException { + + SolrCore core = h.getCore(); + UpdateRequestProcessorChain pc = core.getUpdateProcessingChain(chain); + assertNotNull("No Chain named: " + chain, pc); + + SolrQueryResponse rsp = new SolrQueryResponse(); + rsp.add("responseHeader", new SimpleOrderedMap()); + + if(requestParams == null) { + requestParams = new ModifiableSolrParams(); + } + + SolrQueryRequest req = new LocalSolrQueryRequest(core, requestParams); + try { + UpdateRequestProcessor processor = pc.createProcessor(req, rsp); + for(SolrInputDocument doc:docs) { + AddUpdateCommand cmd = new AddUpdateCommand(req); + cmd.solrDoc = doc; + processor.processAdd(cmd); + } + processor.finish(); + + } finally { + req.close(); + } + return rsp; + } + +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java index 59b37c5cce9..edfe1c3e117 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java @@ -54,6 +54,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.ToleratedUpdateError; import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; @@ -72,6 +73,7 @@ import org.apache.solr.common.params.UpdateParams; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.Hash; import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SimpleOrderedMap; import org.apache.solr.common.util.SolrjNamedThreadFactory; import org.apache.solr.common.util.StrUtils; import org.apache.zookeeper.KeeperException; @@ -726,6 +728,11 @@ public class CloudSolrClient extends SolrClient { int status = 0; Integer rf = null; Integer minRf = null; + + // TolerantUpdateProcessor + List> toleratedErrors = null; + int maxToleratedErrors = Integer.MAX_VALUE; + for(int i=0; i> shardTolerantErrors = + (List>) header.get("errors"); + if (null != shardTolerantErrors) { + Integer shardMaxToleratedErrors = (Integer) header.get("maxErrors"); + assert null != shardMaxToleratedErrors : "TolerantUpdateProcessor reported errors but not maxErrors"; + // if we get into some weird state where the nodes disagree about the effective maxErrors, + // assume the min value seen to decide if we should fail. + maxToleratedErrors = Math.min(maxToleratedErrors, + ToleratedUpdateError.getEffectiveMaxErrors(shardMaxToleratedErrors.intValue())); + + if (null == toleratedErrors) { + toleratedErrors = new ArrayList>(shardTolerantErrors.size()); + } + for (SimpleOrderedMap err : shardTolerantErrors) { + toleratedErrors.add(err); + } + } } NamedList cheader = new NamedList(); @@ -750,7 +775,31 @@ public class CloudSolrClient extends SolrClient { cheader.add(UpdateRequest.REPFACT, rf); if (minRf != null) cheader.add(UpdateRequest.MIN_REPFACT, minRf); - + if (null != toleratedErrors) { + cheader.add("maxErrors", ToleratedUpdateError.getUserFriendlyMaxErrors(maxToleratedErrors)); + cheader.add("errors", toleratedErrors); + if (maxToleratedErrors < toleratedErrors.size()) { + // cumulative errors are too high, we need to throw a client exception w/correct metadata + + // NOTE: it shouldn't be possible for 1 == toleratedErrors.size(), because if that were the case + // then at least one shard should have thrown a real error before this, so we don't worry + // about having a more "singular" exception msg for that situation + StringBuilder msgBuf = new StringBuilder() + .append(toleratedErrors.size()).append(" Async failures during distributed update: "); + + NamedList metadata = new NamedList(); + for (SimpleOrderedMap err : toleratedErrors) { + ToleratedUpdateError te = ToleratedUpdateError.parseMap(err); + metadata.add(te.getMetadataKey(), te.getMetadataValue()); + + msgBuf.append("\n").append(te.getMessage()); + } + + SolrException toThrow = new SolrException(ErrorCode.BAD_REQUEST, msgBuf.toString()); + toThrow.setMetadata(metadata); + throw toThrow; + } + } condensed.add("responseHeader", cheader); return condensed; } @@ -786,6 +835,22 @@ public class CloudSolrClient extends SolrClient { super(errorCode, throwables.getVal(0).getMessage(), throwables.getVal(0)); this.throwables = throwables; this.routes = routes; + + // create a merged copy of the metadata from all wrapped exceptions + NamedList metadata = new NamedList(); + for (int i = 0; i < throwables.size(); i++) { + Throwable t = throwables.getVal(i); + if (t instanceof SolrException) { + SolrException e = (SolrException) t; + NamedList eMeta = e.getMetadata(); + if (null != eMeta) { + metadata.addAll(eMeta); + } + } + } + if (0 < metadata.size()) { + this.setMetadata(metadata); + } } public NamedList getThrowables() { diff --git a/solr/solrj/src/java/org/apache/solr/common/ToleratedUpdateError.java b/solr/solrj/src/java/org/apache/solr/common/ToleratedUpdateError.java new file mode 100644 index 00000000000..fd8b8c79eea --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/common/ToleratedUpdateError.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.common; + +import java.util.ArrayList; +import java.util.List; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrException.ErrorCode; + +/** + * Models the basic information related to a single "tolerated" error that occured during updates. + * This class is only useful when the ToleranteUpdateProcessorFactory is used in an update + * processor chain + */ +public final class ToleratedUpdateError { + + private final static String META_PRE = ToleratedUpdateError.class.getName() + "--"; + private final static int META_PRE_LEN = META_PRE.length(); + + /** + * Given a 'maxErrors' value such that-1 <= maxErrors <= {@link Integer#MAX_VALUE} + * this method returns the original input unless it is -1 in which case the effective value of + * {@link Integer#MAX_VALUE} is returned. + * Input of maxErrors < -1 will trip an assertion and otherwise have undefined behavior. + * @see #getUserFriendlyMaxErrors + */ + public static int getEffectiveMaxErrors(int maxErrors) { + assert -1 <= maxErrors; + return -1 == maxErrors ? Integer.MAX_VALUE : maxErrors; + } + + /** + * Given a 'maxErrors' value such that-1 <= maxErrors <= {@link Integer#MAX_VALUE} + * this method returns the original input unless it is {@link Integer#MAX_VALUE} in which case + * -1 is returned for user convinience. + * Input of maxErrors < -1 will trip an assertion and otherwise have undefined behavior. + * @see #getEffectiveMaxErrors + */ + public static int getUserFriendlyMaxErrors(int maxErrors) { + assert -1 <= maxErrors; + return Integer.MAX_VALUE == maxErrors ? -1 : maxErrors; + } + + /** + * returns a list of maps of simple objects suitable for putting in a SolrQueryResponse header + * @see #getSimpleMap + * @see #parseMap + */ + public static List> formatForResponseHeader(List errs) { + List> result = new ArrayList<>(errs.size()); + for (ToleratedUpdateError e : errs) { + result.add(e.getSimpleMap()); + } + return result; + } + + /** + * returns a ToleratedUpdateError instance from the data in this Map + * @see #getSimpleMap + */ + public static ToleratedUpdateError parseMap(SimpleOrderedMap data) { + final String id = data.get("id"); + final String message = data.get("message"); + final String t = data.get("type"); + if (null == t || null == id || null == message) { + throw new SolrException(ErrorCode.SERVER_ERROR, "Map does not represent a ToleratedUpdateError, must contain 'type', 'id', and 'message'"); + } + try { + return new ToleratedUpdateError(CmdType.valueOf(t), id, message); + } catch (IllegalArgumentException iae) { + throw new SolrException(ErrorCode.SERVER_ERROR, "Invalid type for ToleratedUpdateError: " + t, iae); + } + } + + /** + * returns a ToleratedUpdateError instance if this metadataKey is one we care about, else null + * @see #getMetadataKey + * @see #getMetadataValue + */ + public static ToleratedUpdateError parseMetadataIfToleratedUpdateError(String metadataKey, + String metadataVal) { + if (! metadataKey.startsWith(META_PRE)) { + return null; // not a key we care about + } + final int typeEnd = metadataKey.indexOf(':', META_PRE_LEN); + if (typeEnd < 0) { + return null; // has our prefix, but not our format -- must not be a key we (actually) care about + } + return new ToleratedUpdateError(CmdType.valueOf(metadataKey.substring(META_PRE_LEN, typeEnd)), + metadataKey.substring(typeEnd+1), metadataVal); + } + + private final CmdType type; + private final String id; + private final String message; + + public ToleratedUpdateError(CmdType type, String id, String message) { + assert null != type; + this.type = type; + + assert null != id; + this.id = id; + + assert null != message; + this.message = message; + } + + public CmdType getType() { + return type; + } + public String getId() { + return id; + } + public String getMessage() { + return message; + } + + /** + * returns a string suitable for use as a key in {@link SolrException#setMetadata} + * + * @see #parseMetadataIfToleratedUpdateError + */ + public String getMetadataKey() { + return META_PRE + type + ":" + id; + } + + /** + * returns a string suitable for use as a value in {@link SolrException#setMetadata} + * + * @see #parseMetadataIfToleratedUpdateError + */ + public String getMetadataValue() { + return message.toString(); + } + + /** + * returns a map of simple objects suitable for putting in a SolrQueryResponse header + * @see #formatForResponseHeader + * @see #parseMap + */ + public SimpleOrderedMap getSimpleMap() { + SimpleOrderedMap entry = new SimpleOrderedMap(); + entry.add("type", type.toString()); + entry.add("id", id); + entry.add("message", message); + return entry; + } + + public String toString() { + return getMetadataKey() + "=>" + getMetadataValue(); + } + + public int hashCode() { + int h = this.getClass().hashCode(); + h = h * 31 + type.hashCode(); + h = h * 31 + id.hashCode(); + h = h * 31 + message.hashCode(); + return h; + } + + public boolean equals(Object o) { + if (o instanceof ToleratedUpdateError) { + ToleratedUpdateError that = (ToleratedUpdateError)o; + return that.type.equals(this.type) + && that.id.equals(this.id) + && that.message.equals(this.message); + } + return false; + } + + /** + * Helper class for dealing with SolrException metadata (String) keys + */ + public static enum CmdType { + ADD, DELID, DELQ; + + // if we add support for things like commit, parsing/toString/hashCode logic + // needs to be smarter to account for 'id' being null ... "usesId" should be a prop of enum instances + } +} + + diff --git a/solr/solrj/src/test/org/apache/solr/common/TestToleratedUpdateError.java b/solr/solrj/src/test/org/apache/solr/common/TestToleratedUpdateError.java new file mode 100644 index 00000000000..aba07ae6cce --- /dev/null +++ b/solr/solrj/src/test/org/apache/solr/common/TestToleratedUpdateError.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.common; + +import java.util.EnumSet; +import org.apache.solr.common.ToleratedUpdateError; +import org.apache.solr.common.ToleratedUpdateError.CmdType; +import org.apache.solr.common.util.SimpleOrderedMap; + +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.TestUtil; + +/** Basic testing of the serialization/encapsulation code in ToleratedUpdateError */ +public class TestToleratedUpdateError extends LuceneTestCase { + + private final static CmdType[] ALL_TYPES = EnumSet.allOf(CmdType.class).toArray(new CmdType[0]); + + public void testBasics() { + + assertFalse((new ToleratedUpdateError(CmdType.ADD, "doc1", "some error")).equals + (new ToleratedUpdateError(CmdType.ADD, "doc2", "some error"))); + assertFalse((new ToleratedUpdateError(CmdType.ADD, "doc1", "some error")).equals + (new ToleratedUpdateError(CmdType.ADD, "doc1", "some errorxx"))); + assertFalse((new ToleratedUpdateError(CmdType.ADD, "doc1", "some error")).equals + (new ToleratedUpdateError(CmdType.DELID, "doc1", "some error"))); + } + + public void testParseMetadataErrorHandling() { + + assertNull(ToleratedUpdateError.parseMetadataIfToleratedUpdateError("some other key", "some value")); + + // see if someone tries to trick us into having an NPE... + ToleratedUpdateError valid = new ToleratedUpdateError(CmdType.ADD, "doc2", "some error"); + String badKey = valid.getMetadataKey().replace(":", "X"); + assertNull(ToleratedUpdateError.parseMetadataIfToleratedUpdateError(badKey, valid.getMetadataValue())); + } + + public void testParseMapErrorChecking() { + SimpleOrderedMap bogus = new SimpleOrderedMap(); + try { + ToleratedUpdateError.parseMap(bogus); + fail("map should not be parsable"); + } catch (SolrException e) { + assertTrue(e.toString(), e.getMessage().contains("Map does not represent a ToleratedUpdateError") ); + } + + bogus.add("id", "some id"); + bogus.add("message", "some message"); + try { + ToleratedUpdateError.parseMap(bogus); + fail("map should still not be parsable"); + } catch (SolrException e) { + assertTrue(e.toString(), e.getMessage().contains("Map does not represent a ToleratedUpdateError") ); + } + + bogus.add("type", "not a real type"); + try { + ToleratedUpdateError.parseMap(bogus); + fail("invalid type should not be parsable"); + } catch (SolrException e) { + assertTrue(e.toString(), e.getMessage().contains("Invalid type")); + } + } + + public void testParseMap() { + // trivial + SimpleOrderedMap valid = new SimpleOrderedMap(); + valid.add("type", CmdType.ADD.toString()); + valid.add("id", "some id"); + valid.add("message", "some message"); + + ToleratedUpdateError in = ToleratedUpdateError.parseMap(valid); + compare(in, MAP_COPPIER); + compare(in, METADATA_COPPIER); + + // randomized + int numIters = atLeast(5000); + for (int i = 0; i < numIters; i++) { + valid = new SimpleOrderedMap(); + valid.add("type", ALL_TYPES[TestUtil.nextInt(random(), 0, ALL_TYPES.length-1)].toString()); + valid.add("id", TestUtil.randomUnicodeString(random())); + valid.add("message", TestUtil.randomUnicodeString(random())); + + in = ToleratedUpdateError.parseMap(valid); + compare(in, MAP_COPPIER); + compare(in, METADATA_COPPIER); + } + } + + public void checkRoundTripComparisons(Coppier coppier) { + + // some simple basics + for (ToleratedUpdateError in : new ToleratedUpdateError[] { + new ToleratedUpdateError(CmdType.ADD, "doc1", "some error"), + new ToleratedUpdateError(CmdType.DELID, "doc1", "some diff error"), + new ToleratedUpdateError(CmdType.DELQ, "-field:yakko other_field:wakko", "some other error"), + }) { + + compare(in, coppier); + } + + // randomized testing of non trivial keys/values + int numIters = atLeast(5000); + for (int i = 0; i < numIters; i++) { + ToleratedUpdateError in = new ToleratedUpdateError + (ALL_TYPES[TestUtil.nextInt(random(), 0, ALL_TYPES.length-1)], + TestUtil.randomUnicodeString(random()), + TestUtil.randomUnicodeString(random())); + compare(in, coppier); + } + } + + public void testMetadataRoundTripComparisons(Coppier coppier) { + checkRoundTripComparisons(METADATA_COPPIER); + } + + public void testMapRoundTripComparisons() { + checkRoundTripComparisons(MAP_COPPIER); + } + + /** trivial sanity check */ + public void testMaxErrorsValueConversion() { + + assertEquals(-1, ToleratedUpdateError.getUserFriendlyMaxErrors(-1)); + assertEquals(-1, ToleratedUpdateError.getUserFriendlyMaxErrors(Integer.MAX_VALUE)); + + assertEquals(Integer.MAX_VALUE, ToleratedUpdateError.getEffectiveMaxErrors(Integer.MAX_VALUE)); + assertEquals(Integer.MAX_VALUE, ToleratedUpdateError.getEffectiveMaxErrors(-1)); + + for (int val : new int[] {0, 1, 10, 42, 600000 }) { + assertEquals(val, ToleratedUpdateError.getEffectiveMaxErrors(val)); + assertEquals(val, ToleratedUpdateError.getUserFriendlyMaxErrors(val)); + } + + } + + public void compare(ToleratedUpdateError in, Coppier coppier) { + ToleratedUpdateError out = coppier.copy(in); + assertNotNull(out); + compare(in, out); + } + + public void compare(ToleratedUpdateError in, ToleratedUpdateError out) { + assertEquals(out.getType(), in.getType()); + assertEquals(out.getId(), in.getId()); + assertEquals(out.getMessage(), in.getMessage()); + + assertEquals(out.hashCode(), in.hashCode()); + assertEquals(out.toString(), in.toString()); + + assertEquals(in.getMetadataKey(), out.getMetadataKey()); + assertEquals(in.getMetadataValue(), out.getMetadataValue()); + + assertEquals(out, in); + assertEquals(in, out); + } + + private static abstract class Coppier { + public abstract ToleratedUpdateError copy(ToleratedUpdateError in); + } + + private static final Coppier MAP_COPPIER = new Coppier() { + public ToleratedUpdateError copy(ToleratedUpdateError in) { + return ToleratedUpdateError.parseMap(in.getSimpleMap()); + } + }; + + private static final Coppier METADATA_COPPIER = new Coppier() { + public ToleratedUpdateError copy(ToleratedUpdateError in) { + return ToleratedUpdateError.parseMetadataIfToleratedUpdateError + (in.getMetadataKey(), in.getMetadataValue()); + } + }; + +} + + + +