SOLR-445: new ToleranteUpdateProcessorFactory to support skipping update commands that cause failures when sending multiple updates in a single request.

SOLR-8890: New static method in DistributedUpdateProcessorFactory to allow UpdateProcessorFactories to indicate request params that should be forwarded when DUP distributes updates.

This commit is a squashed merge from the jira/SOLR-445 branch (as of b08c284b26b1779d03693a45e219db89839461d0)
This commit is contained in:
Chris Hostetter 2016-03-25 11:00:29 -07:00
parent e26a443194
commit 5b6eacb80b
17 changed files with 3214 additions and 45 deletions

View File

@ -49,6 +49,13 @@ New Features
* SOLR-8859: AbstractSpatialFieldType will now convert Shapes to/from Strings
using the SpatialContext. (ryan)
* SOLR-445: new ToleranteUpdateProcessorFactory to support skipping update commands that cause
failures when sending multiple updates in a single request.
(Erick Erickson, Tomás Fernández Löbbe, Anshum Gupta, hossman)
* SOLR-8890: New static method in DistributedUpdateProcessorFactory to allow UpdateProcessorFactories
to indicate request params that should be forwarded when DUP distributes updates. (hossman)
Bug Fixes
----------------------

View File

@ -161,6 +161,10 @@ public class SolrQueryResponse {
/**
* Causes an error to be returned instead of the results.
*
* In general, new calls to this method should not be added. In most cases
* you should simply throw an exception and let it bubble out to
* RequestHandlerBase, which will set the exception thrown.
*/
public void setException(Exception e) {
err=e;

View File

@ -21,6 +21,7 @@ import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient; // jdoc
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
@ -138,7 +139,7 @@ public class SolrCmdDistributor {
SolrException.log(SolrCmdDistributor.log, "forwarding update to "
+ oldNodeUrl + " failed - retrying ... retries: "
+ err.req.retries + " " + err.req.cmdString + " params:"
+ err.req.retries + " " + err.req.cmd.toString() + " params:"
+ err.req.uReq.getParams() + " rsp:" + rspCode, err.e);
try {
Thread.sleep(retryPause);
@ -187,7 +188,7 @@ public class SolrCmdDistributor {
uReq.deleteByQuery(cmd.query);
}
submit(new Req(cmd.toString(), node, uReq, sync), false);
submit(new Req(cmd, node, uReq, sync), false);
}
}
@ -200,14 +201,13 @@ public class SolrCmdDistributor {
}
public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous, RequestReplicationTracker rrt) throws IOException {
String cmdStr = cmd.toString();
for (Node node : nodes) {
UpdateRequest uReq = new UpdateRequest();
if (cmd.isLastDocInBatch)
uReq.lastDocInBatch();
uReq.setParams(params);
uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
submit(new Req(cmdStr, node, uReq, synchronous, rrt, cmd.pollQueueTime), false);
submit(new Req(cmd, node, uReq, synchronous, rrt, cmd.pollQueueTime), false);
}
}
@ -226,7 +226,7 @@ public class SolrCmdDistributor {
log.debug("Distrib commit to: {} params: {}", nodes, params);
for (Node node : nodes) {
submit(new Req(cmd.toString(), node, uReq, false), true);
submit(new Req(cmd, node, uReq, false), true);
}
}
@ -272,7 +272,7 @@ public class SolrCmdDistributor {
if (log.isDebugEnabled()) {
log.debug("sending update to "
+ req.node.getUrl() + " retry:"
+ req.retries + " " + req.cmdString + " params:" + req.uReq.getParams());
+ req.retries + " " + req.cmd + " params:" + req.uReq.getParams());
}
if (isCommit) {
@ -314,26 +314,26 @@ public class SolrCmdDistributor {
public UpdateRequest uReq;
public int retries;
public boolean synchronous;
public String cmdString;
public UpdateCommand cmd;
public RequestReplicationTracker rfTracker;
public int pollQueueTime;
public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous) {
this(cmdString, node, uReq, synchronous, null, 0);
public Req(UpdateCommand cmd, Node node, UpdateRequest uReq, boolean synchronous) {
this(cmd, node, uReq, synchronous, null, 0);
}
public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker, int pollQueueTime) {
public Req(UpdateCommand cmd, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker, int pollQueueTime) {
this.node = node;
this.uReq = uReq;
this.synchronous = synchronous;
this.cmdString = cmdString;
this.cmd = cmd;
this.rfTracker = rfTracker;
this.pollQueueTime = pollQueueTime;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("SolrCmdDistributor$Req: cmd=").append(String.valueOf(cmdString));
sb.append("SolrCmdDistributor$Req: cmd=").append(cmd.toString());
sb.append("; node=").append(String.valueOf(node));
return sb.toString();
}
@ -382,6 +382,13 @@ public class SolrCmdDistributor {
public static class Error {
public Exception e;
public int statusCode = -1;
/**
* NOTE: This is the request that happened to be executed when this error was <b>triggered</b> the error,
* but because of how {@link StreamingSolrClients} uses {@link ConcurrentUpdateSolrClient} it might not
* actaully be the request that <b>caused</b> the error -- multiple requests are merged &amp; processed as
* a sequential batch.
*/
public Req req;
public String toString() {

View File

@ -92,7 +92,8 @@ import org.slf4j.LoggerFactory;
// NOT mt-safe... create a new processor for each add thread
// TODO: we really should not wait for distrib after local? unless a certain replication factor is asked for
public class DistributedUpdateProcessor extends UpdateRequestProcessor {
final static String PARAM_WHITELIST_CTX_KEY = DistributedUpdateProcessor.class + "PARAM_WHITELIST_CTX_KEY";
public static final String DISTRIB_FROM_SHARD = "distrib.from.shard";
public static final String DISTRIB_FROM_COLLECTION = "distrib.from.collection";
public static final String DISTRIB_FROM_PARENT = "distrib.from.parent";
@ -292,6 +293,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
this.req = req;
// this should always be used - see filterParams
DistributedUpdateProcessorFactory.addParamToDistributedRequestWhitelist
(this.req, UpdateParams.UPDATE_CHAIN, TEST_DISTRIB_SKIP_SERVERS);
CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
this.zkEnabled = coreDesc.getCoreContainer().isZooKeeperAware();
@ -790,38 +795,30 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
cmdDistrib.finish();
List<Error> errors = cmdDistrib.getErrors();
// TODO - we may need to tell about more than one error...
// if it's a forward, any fail is a problem -
// otherwise we assume things are fine if we got it locally
// until we start allowing min replication param
if (errors.size() > 0) {
// if one node is a RetryNode, this was a forward request
if (errors.get(0).req.node instanceof RetryNode) {
rsp.setException(errors.get(0).e);
} else {
if (log.isWarnEnabled()) {
for (Error error : errors) {
log.warn("Error sending update to " + error.req.node.getBaseUrl(), error.e);
}
}
}
// else
// for now we don't error - we assume if it was added locally, we
// succeeded
}
// if it is not a forward request, for each fail, try to tell them to
// recover - the doc was already added locally, so it should have been
// legit
List<Error> errorsForClient = new ArrayList<>(errors.size());
for (final SolrCmdDistributor.Error error : errors) {
if (error.req.node instanceof RetryNode) {
// we don't try to force a leader to recover
// when we cannot forward to it
// if it's a forward, any fail is a problem -
// otherwise we assume things are fine if we got it locally
// until we start allowing min replication param
errorsForClient.add(error);
continue;
}
// else...
// for now we don't error - we assume if it was added locally, we
// succeeded
if (log.isWarnEnabled()) {
log.warn("Error sending update to " + error.req.node.getBaseUrl(), error.e);
}
// Since it is not a forward request, for each fail, try to tell them to
// recover - the doc was already added locally, so it should have been
// legit
DistribPhase phase =
DistribPhase.parseParam(error.req.uReq.getParams().get(DISTRIB_UPDATE_PARAM));
@ -841,8 +838,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// let's just fail this request and let the client retry? or just call processAdd again?
log.error("On "+cloudDesc.getCoreNodeName()+", replica "+replicaUrl+
" now thinks it is the leader! Failing the request to let the client retry! "+error.e);
rsp.setException(error.e);
break;
errorsForClient.add(error);
continue;
}
String collection = null;
@ -927,7 +924,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
rsp.getResponseHeader().add(UpdateRequest.REPFACT, replicationTracker.getAchievedRf());
rsp.getResponseHeader().add(UpdateRequest.MIN_REPFACT, replicationTracker.minRf);
replicationTracker = null;
}
}
if (0 < errorsForClient.size()) {
throw new DistributedUpdatesAsyncException(errorsForClient);
}
}
@ -1210,10 +1212,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
/** @see DistributedUpdateProcessorFactory#addParamToDistributedRequestWhitelist */
protected ModifiableSolrParams filterParams(SolrParams params) {
ModifiableSolrParams fparams = new ModifiableSolrParams();
passParam(params, fparams, UpdateParams.UPDATE_CHAIN);
passParam(params, fparams, TEST_DISTRIB_SKIP_SERVERS);
Set<String> whitelist = (Set<String>) this.req.getContext().get(PARAM_WHITELIST_CTX_KEY);
assert null != whitelist : "whitelist can't be null, constructor adds to it";
for (String p : whitelist) {
passParam(params, fparams, p);
}
return fparams;
}
@ -1698,4 +1706,67 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// definitely not the leader. Otherwise assume we are.
return DistribPhase.FROMLEADER != phase;
}
public static final class DistributedUpdatesAsyncException extends SolrException {
public final List<Error> errors;
public DistributedUpdatesAsyncException(List<Error> errors) {
super(buildCode(errors), buildMsg(errors), null);
this.errors = errors;
// create a merged copy of the metadata from all wrapped exceptions
NamedList<String> metadata = new NamedList<String>();
for (Error error : errors) {
if (error.e instanceof SolrException) {
SolrException e = (SolrException) error.e;
NamedList<String> eMeta = e.getMetadata();
if (null != eMeta) {
metadata.addAll(eMeta);
}
}
}
if (0 < metadata.size()) {
this.setMetadata(metadata);
}
}
/** Helper method for constructor */
private static final int buildCode(List<Error> errors) {
assert null != errors;
assert 0 < errors.size();
int minCode = Integer.MAX_VALUE;
int maxCode = Integer.MIN_VALUE;
for (Error error : errors) {
log.trace("REMOTE ERROR: {}", error);
minCode = Math.min(error.statusCode, minCode);
maxCode = Math.max(error.statusCode, maxCode);
}
if (minCode == maxCode) {
// all codes are consistent, use that...
return minCode;
} else if (400 <= minCode && maxCode < 500) {
// all codes are 4xx, use 400
return ErrorCode.BAD_REQUEST.code;
}
// ...otherwise use sensible default
return ErrorCode.SERVER_ERROR.code;
}
/** Helper method for constructor */
private static final String buildMsg(List<Error> errors) {
assert null != errors;
assert 0 < errors.size();
if (1 == errors.size()) {
return "Async exception during distributed update: " + errors.get(0).e.getMessage();
} else {
StringBuilder buf = new StringBuilder(errors.size() + " Async exceptions during distributed update: ");
for (Error error : errors) {
buf.append("\n");
buf.append(error.e.getMessage());
}
return buf.toString();
}
}
}
}

View File

@ -16,6 +16,9 @@
*/
package org.apache.solr.update.processor;
import java.util.Set;
import java.util.TreeSet;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
@ -28,6 +31,23 @@ import org.apache.solr.response.SolrQueryResponse;
public class DistributedUpdateProcessorFactory
extends UpdateRequestProcessorFactory
implements DistributingUpdateProcessorFactory {
/**
* By default, the {@link DistributedUpdateProcessor} is extremely conservative in the list of request
* params that will be copied/included when updates are forwarded to other nodes. This method may be
* used by any {@link UpdateRequestProcessorFactory#getInstance} call to annotate a
* SolrQueryRequest with the names of parameters that should also be forwarded.
*/
public static void addParamToDistributedRequestWhitelist(final SolrQueryRequest req, final String... paramNames) {
Set<String> whitelist = (Set<String>) req.getContext().get(DistributedUpdateProcessor.PARAM_WHITELIST_CTX_KEY);
if (null == whitelist) {
whitelist = new TreeSet<String>();
req.getContext().put(DistributedUpdateProcessor.PARAM_WHITELIST_CTX_KEY, whitelist);
}
for (String p : paramNames) {
whitelist.add(p);
}
}
@Override
public void init(NamedList args) {

View File

@ -0,0 +1,415 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.lang.invoke.MethodHandles;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRefBuilder;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.ToleratedUpdateError;
import org.apache.solr.common.ToleratedUpdateError.CmdType;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.MergeIndexesCommand;
import org.apache.solr.update.RollbackUpdateCommand;
import org.apache.solr.update.SolrCmdDistributor.Error;
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>
* Suppresses errors for individual add/delete commands within a single request.
* Instead of failing on the first error, at most <code>maxErrors</code> errors (or unlimited
* if <code>-1==maxErrors</code>) are logged and recorded the batch continues.
* The client will receive a <code>status==200</code> response, which includes a list of errors
* that were tolerated.
* </p>
* <p>
* If more then <code>maxErrors</code> occur, the first exception recorded will be re-thrown,
* Solr will respond with <code>status==5xx</code> or <code>status==4xx</code>
* (depending on the underlying exceptions) and it won't finish processing any more updates in the request.
* (ie: subsequent update commands in the request will not be processed even if they are valid).
* </p>
*
* <p>
* NOTE: In cloud based collections, this processor expects to <b>NOT</b> be used on {@link DistribPhase#FROMLEADER}
* requests (because any successes that occur locally on the leader are considered successes even if there is some
* subsequent error on a replica). {@link TolerantUpdateProcessorFactory} will short circut it away in those
* requests.
* </p>
*
* @see TolerantUpdateProcessorFactory
*/
public class TolerantUpdateProcessor extends UpdateRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
* String to be used as document key for errors when a real uniqueKey can't be determined
*/
private static final String UNKNOWN_ID = "(unknown)";
/**
* Response Header
*/
private final NamedList<Object> header;
/**
* Number of errors this UpdateRequestProcessor will tolerate. If more then this occur,
* the original exception will be thrown, interrupting the processing of the document
* batch
*/
private final int maxErrors;
/** The uniqueKey field */
private SchemaField uniqueKeyField;
private final SolrQueryRequest req;
private ZkController zkController;
/**
* Known errors that occurred in this batch, in order encountered (may not be the same as the
* order the commands were originally executed in due to the async distributed updates).
*/
private final List<ToleratedUpdateError> knownErrors = new ArrayList<ToleratedUpdateError>();
// Kludge: Because deleteByQuery updates are forwarded to every leader, we can get identical
// errors reported by every leader for the same underlying problem.
//
// It would be nice if we could cleanly handle the unlikely (but possible) situation of an
// update stream that includes multiple identical DBQs, with identical failures, and
// to report each one once, for example...
// add: id#1
// dbq: foo:bar
// add: id#2
// add: id#3
// dbq: foo:bar
//
// ...but i can't figure out a way to accurately identify & return duplicate
// ToleratedUpdateErrors from duplicate identical underlying requests w/o erroneously returning identical
// ToleratedUpdateErrors for the *same* underlying request but from diff shards.
//
// So as a kludge, we keep track of them for deduping against identical remote failures
//
private Set<ToleratedUpdateError> knownDBQErrors = new HashSet<>();
private final FirstErrTracker firstErrTracker = new FirstErrTracker();
private final DistribPhase distribPhase;
public TolerantUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next, int maxErrors, DistribPhase distribPhase) {
super(next);
assert maxErrors >= -1;
header = rsp.getResponseHeader();
this.maxErrors = ToleratedUpdateError.getEffectiveMaxErrors(maxErrors);
this.req = req;
this.distribPhase = distribPhase;
assert ! DistribPhase.FROMLEADER.equals(distribPhase);
this.zkController = this.req.getCore().getCoreDescriptor().getCoreContainer().getZkController();
this.uniqueKeyField = this.req.getCore().getLatestSchema().getUniqueKeyField();
assert null != uniqueKeyField : "Factory didn't enforce uniqueKey field?";
}
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
BytesRef id = null;
try {
// force AddUpdateCommand to validate+cache the id before proceeding
id = cmd.getIndexedId();
super.processAdd(cmd);
} catch (Throwable t) {
firstErrTracker.caught(t);
knownErrors.add(new ToleratedUpdateError
(CmdType.ADD,
getPrintableId(id),
t.getMessage()));
if (knownErrors.size() > maxErrors) {
firstErrTracker.throwFirst();
}
}
}
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
try {
super.processDelete(cmd);
} catch (Throwable t) {
firstErrTracker.caught(t);
ToleratedUpdateError err = new ToleratedUpdateError(cmd.isDeleteById() ? CmdType.DELID : CmdType.DELQ,
cmd.isDeleteById() ? cmd.id : cmd.query,
t.getMessage());
knownErrors.add(err);
// NOTE: we're not using this to dedup before adding to knownErrors.
// if we're lucky enough to get an immediate local failure (ie: we're a leader, or some other processor
// failed) then recording the multiple failures is a good thing -- helps us with an accurate fail
// fast if we exceed maxErrors
if (CmdType.DELQ.equals(err.getType())) {
knownDBQErrors.add(err);
}
if (knownErrors.size() > maxErrors) {
firstErrTracker.throwFirst();
}
}
}
@Override
public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
try {
super.processMergeIndexes(cmd);
} catch (Throwable t) {
// we're not tolerante of errors from this type of command, but we
// do need to track it so we can annotate it with any other errors we were allready tolerant of
firstErrTracker.caught(t);
throw t;
}
}
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
try {
super.processCommit(cmd);
} catch (Throwable t) {
// we're not tolerante of errors from this type of command, but we
// do need to track it so we can annotate it with any other errors we were allready tolerant of
firstErrTracker.caught(t);
throw t;
}
}
@Override
public void processRollback(RollbackUpdateCommand cmd) throws IOException {
try {
super.processRollback(cmd);
} catch (Throwable t) {
// we're not tolerante of errors from this type of command, but we
// do need to track it so we can annotate it with any other errors we were allready tolerant of
firstErrTracker.caught(t);
throw t;
}
}
@Override
public void finish() throws IOException {
// even if processAdd threw an error, this.finish() is still called and we might have additional
// errors from other remote leaders that we need to check for from the finish method of downstream processors
// (like DUP)
try {
super.finish();
} catch (DistributedUpdateProcessor.DistributedUpdatesAsyncException duae) {
firstErrTracker.caught(duae);
// adjust our stats based on each of the distributed errors
for (Error error : duae.errors) {
// we can't trust the req info from the Error, because multiple original requests might have been
// lumped together
//
// instead we trust the metadata that the TolerantUpdateProcessor running on the remote node added
// to the exception when it failed.
if ( ! (error.e instanceof SolrException) ) {
log.error("async update exception is not SolrException, no metadata to process", error.e);
continue;
}
SolrException remoteErr = (SolrException) error.e;
NamedList<String> remoteErrMetadata = remoteErr.getMetadata();
if (null == remoteErrMetadata) {
log.warn("remote error has no metadata to aggregate: " + remoteErr.getMessage(), remoteErr);
continue;
}
for (int i = 0; i < remoteErrMetadata.size(); i++) {
ToleratedUpdateError err =
ToleratedUpdateError.parseMetadataIfToleratedUpdateError(remoteErrMetadata.getName(i),
remoteErrMetadata.getVal(i));
if (null == err) {
// some metadata unrelated to this update processor
continue;
}
if (CmdType.DELQ.equals(err.getType())) {
if (knownDBQErrors.contains(err)) {
// we've already seen this identical error, probably a dup from another shard
continue;
} else {
knownDBQErrors.add(err);
}
}
knownErrors.add(err);
}
}
}
header.add("errors", ToleratedUpdateError.formatForResponseHeader(knownErrors));
// include in response so client knows what effective value was (may have been server side config)
header.add("maxErrors", ToleratedUpdateError.getUserFriendlyMaxErrors(maxErrors));
// annotate any error that might be thrown (or was already thrown)
firstErrTracker.annotate(knownErrors);
// decide if we have hit a situation where we know an error needs to be thrown.
if ((DistribPhase.TOLEADER.equals(distribPhase) ? 0 : maxErrors) < knownErrors.size()) {
// NOTE: even if maxErrors wasn't exceeeded, we need to throw an error when we have any errors if we're
// a leader that was forwarded to by another node so that the forwarding node knows we encountered some
// problems and can aggregate the results
firstErrTracker.throwFirst();
}
}
/**
* Returns the output of {@link org.apache.solr.schema.FieldType#
* indexedToReadable(BytesRef, CharsRefBuilder)} of the field
* type of the uniqueKey on the {@link BytesRef} passed as parameter.
* <code>ref</code> should be the indexed representation of the id -- if null
* (possibly because it's missing in the update) this method will return {@link #UNKNOWN_ID}
*/
private String getPrintableId(BytesRef ref) {
if (ref == null) {
return UNKNOWN_ID;
}
return uniqueKeyField.getType().indexedToReadable(ref, new CharsRefBuilder()).toString();
}
/**
* Simple helper class for "tracking" any exceptions encountered.
*
* Only remembers the "first" exception encountered, and wraps it in a SolrException if needed, so that
* it can later be annotated with the metadata our users expect and re-thrown.
*
* NOTE: NOT THREAD SAFE
*/
private static final class FirstErrTracker {
SolrException first = null;
boolean thrown = false;
public FirstErrTracker() {
/* NOOP */
}
/**
* Call this method immediately anytime an exception is caught from a down stream method --
* even if you are going to ignore it (for now). If you plan to rethrow the Exception, use
* {@link #throwFirst} instead.
*/
public void caught(Throwable t) {
assert null != t;
if (null == first) {
if (t instanceof SolrException) {
first = (SolrException)t;
} else {
first = new SolrException(ErrorCode.SERVER_ERROR, "Tolerantly Caught Exception: " + t.getMessage(), t);
}
}
}
/**
* Call this method in place of any situation where you would normally (re)throw an exception
* (already passed to the {@link #caught} method because maxErrors was exceeded
* is exceed.
*
* This method will keep a record that this update processor has already thrown the exception, and do
* nothing on future calls, so subsequent update processor methods can update the metadata but won't
* inadvertantly re-throw this (or any other) cascading exception by mistake.
*/
public void throwFirst() throws SolrException {
assert null != first : "caught was never called?";
if (! thrown) {
thrown = true;
throw first;
}
}
/**
* Annotates the first exception (which may already have been thrown, or be thrown in the future) with
* the metadata from this update processor. For use in {@link TolerantUpdateProcessor#finish}
*/
public void annotate(List<ToleratedUpdateError> errors) {
if (null == first) {
return; // no exception to annotate
}
assert null != errors : "how do we have an exception to annotate w/o any errors?";
NamedList<String> firstErrMetadata = first.getMetadata();
if (null == firstErrMetadata) { // obnoxious
firstErrMetadata = new NamedList<String>();
first.setMetadata(firstErrMetadata);
} else {
// any existing metadata representing ToleratedUpdateErrors in this single exception needs removed
// so we can add *all* of the known ToleratedUpdateErrors (from this and other exceptions)
for (int i = 0; i < firstErrMetadata.size(); i++) {
if (null != ToleratedUpdateError.parseMetadataIfToleratedUpdateError
(firstErrMetadata.getName(i), firstErrMetadata.getVal(i))) {
firstErrMetadata.remove(i);
// NOTE: post decrementing index so we don't miss anything as we remove items
i--;
}
}
}
for (ToleratedUpdateError te : errors) {
firstErrMetadata.add(te.getMetadataKey(), te.getMetadataValue());
}
}
/** The first exception that was thrown (or may be thrown) whose metadata can be annotated. */
public SolrException getFirst() {
return first;
}
}
}

View File

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
import org.apache.solr.util.plugin.SolrCoreAware;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
/**
* <p>
* Suppresses errors for individual add/delete commands within a single request.
* Instead of failing on the first error, at most <code>maxErrors</code> errors (or unlimited
* if <code>-1==maxErrors</code>) are logged and recorded the batch continues.
* The client will receive a <code>status==200</code> response, which includes a list of errors
* that were tolerated.
* </p>
* <p>
* If more then <code>maxErrors</code> occur, the first exception recorded will be re-thrown,
* Solr will respond with <code>status==5xx</code> or <code>status==4xx</code>
* (depending on the underlying exceptions) and it won't finish processing any more updates in the request.
* (ie: subsequent update commands in the request will not be processed even if they are valid).
* </p>
*
* <p>
* <code>maxErrors</code> is an int value that can be specified in the configuration and/or overridden
* per request. If unset, it will default to {@link Integer#MAX_VALUE}. Specifying an explicit value
* of <code>-1</code> is supported as shorthand for {@link Integer#MAX_VALUE}, all other negative
* integer values are not supported.
* </p>
* <p>
* An example configuration would be:
* </p>
* <pre class="prettyprint">
* &lt;updateRequestProcessorChain name="tolerant-chain"&gt;
* &lt;processor class="solr.TolerantUpdateProcessorFactory"&gt;
* &lt;int name="maxErrors"&gt;10&lt;/int&gt;
* &lt;/processor&gt;
* &lt;processor class="solr.RunUpdateProcessorFactory" /&gt;
* &lt;/updateRequestProcessorChain&gt;
*
* </pre>
*
* <p>
* The <code>maxErrors</code> parameter in the above chain could be overwritten per request, for example:
* </p>
* <pre class="prettyprint">
* curl http://localhost:8983/update?update.chain=tolerant-chain&amp;maxErrors=100 -H "Content-Type: text/xml" -d @myfile.xml
* </pre>
*
* <p>
* <b>NOTE:</b> The behavior of this UpdateProcessofFactory in conjunction with indexing operations
* while a Shard Split is actively in progress is not well defined (or sufficiently tested). Users
* of this update processor are encouraged to either disable it, or pause updates, while any shard
* splitting is in progress (see <a href="https://issues.apache.org/jira/browse/SOLR-8881">SOLR-8881</a>
* for more details.)
* </p>
*/
public class TolerantUpdateProcessorFactory extends UpdateRequestProcessorFactory
implements SolrCoreAware, UpdateRequestProcessorFactory.RunAlways {
/**
* Parameter that defines how many errors the UpdateRequestProcessor will tolerate
*/
private final static String MAX_ERRORS_PARAM = "maxErrors";
/**
* Default maxErrors value that will be use if the value is not set in configuration
* or in the request
*/
private int defaultMaxErrors = Integer.MAX_VALUE;
private boolean informed = false;
@SuppressWarnings("rawtypes")
@Override
public void init( NamedList args ) {
Object maxErrorsObj = args.get(MAX_ERRORS_PARAM);
if (maxErrorsObj != null) {
try {
defaultMaxErrors = Integer.valueOf(maxErrorsObj.toString());
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Unnable to parse maxErrors parameter: " + maxErrorsObj, e);
}
if (defaultMaxErrors < -1) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Config option '"+MAX_ERRORS_PARAM + "' must either be non-negative, or -1 to indicate 'unlimiited': " + maxErrorsObj.toString());
}
}
}
@Override
public void inform(SolrCore core) {
informed = true;
if (null == core.getLatestSchema().getUniqueKeyField()) {
throw new SolrException(ErrorCode.SERVER_ERROR, this.getClass().getName() +
" requires a schema that includes a uniqueKey field.");
}
}
@Override
public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
assert informed : "inform(SolrCore) never called?";
// short circut if we're a replica processing commands from our leader
DistribPhase distribPhase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
if (DistribPhase.FROMLEADER.equals(distribPhase)) {
return next;
}
DistributedUpdateProcessorFactory.addParamToDistributedRequestWhitelist(req, MAX_ERRORS_PARAM);
int maxErrors = req.getParams().getInt(MAX_ERRORS_PARAM, defaultMaxErrors);
if (maxErrors < -1) {
throw new SolrException(ErrorCode.BAD_REQUEST, "'"+MAX_ERRORS_PARAM + "' must either be non-negative, or -1 to indicate 'unlimiited': " + maxErrors);
}
// NOTE: even if 0==maxErrors, we still inject processor into chain so respones has expected header info
return new TolerantUpdateProcessor(req, rsp, next, maxErrors, distribPhase);
}
}

View File

@ -0,0 +1,85 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<config>
<jmx />
<luceneMatchVersion>${tests.luceneMatchVersion:LUCENE_CURRENT}</luceneMatchVersion>
<directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}">
<!-- used to keep RAM reqs down for HdfsDirectoryFactory -->
<bool name="solr.hdfs.blockcache.enabled">${solr.hdfs.blockcache.enabled:true}</bool>
<int name="solr.hdfs.blockcache.blocksperbank">${solr.hdfs.blockcache.blocksperbank:1024}</int>
<str name="solr.hdfs.home">${solr.hdfs.home:}</str>
<str name="solr.hdfs.confdir">${solr.hdfs.confdir:}</str>
<str name="solr.hdfs.blockcache.global">${solr.hdfs.blockcache.global:false}</str>
</directoryFactory>
<dataDir>${solr.data.dir:}</dataDir>
<xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
<!-- an update processor the explicitly excludes distrib to test
clean errors when people attempt atomic updates w/o it
-->
<updateRequestProcessorChain name="nodistrib" >
<processor class="solr.NoOpDistributingUpdateProcessorFactory" />
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<requestHandler name="standard" class="solr.StandardRequestHandler">
</requestHandler>
<requestHandler name="/get" class="solr.RealTimeGetHandler">
<lst name="defaults">
<str name="omitHeader">true</str>
</lst>
</requestHandler>
<requestHandler name="/replication" class="solr.ReplicationHandler" startup="lazy" />
<requestHandler name="/update" class="solr.UpdateRequestHandler" />
<updateHandler class="solr.DirectUpdateHandler2">
<updateLog>
<str name="dir">${solr.ulog.dir:}</str>
</updateLog>
</updateHandler>
<updateRequestProcessorChain name="tolerant-chain-max-errors-10">
<processor class="solr.TolerantUpdateProcessorFactory">
<!-- explicitly testing that parsing still works if a valid int is specified as a string -->
<str name="maxErrors">10</str>
</processor>
<processor class="solr.DistributedUpdateProcessorFactory" />
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<updateRequestProcessorChain name="tolerant-chain-max-errors-not-set">
<processor class="solr.TolerantUpdateProcessorFactory"/>
<processor class="solr.DistributedUpdateProcessorFactory" />
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<updateRequestProcessorChain name="not-tolerant">
<processor class="solr.DistributedUpdateProcessorFactory" />
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
</config>

View File

@ -0,0 +1,40 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<config>
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
<schemaFactory class="ClassicIndexSchemaFactory"/>
<xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
<requestHandler name="/select" class="solr.SearchHandler">
<lst name="defaults">
<str name="echoParams">explicit</str>
<str name="indent">true</str>
<str name="df">text</str>
</lst>
</requestHandler>
<updateRequestProcessorChain name="tolerant-chain">
<processor class="solr.TolerantUpdateProcessorFactory" />
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
</config>

View File

@ -26,6 +26,7 @@
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
<xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
<requestHandler name="standard" class="solr.StandardRequestHandler"></requestHandler>
<requestHandler name="/update" class="solr.UpdateRequestHandler" />
<directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/>
<schemaFactory class="ClassicIndexSchemaFactory"/>
@ -628,4 +629,20 @@
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<updateRequestProcessorChain name="tolerant-chain-max-errors-10">
<processor class="solr.TolerantUpdateProcessorFactory">
<int name="maxErrors">10</int>
</processor>
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<updateRequestProcessorChain name="tolerant-chain-max-errors-not-set">
<processor class="solr.TolerantUpdateProcessorFactory"/>
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<updateRequestProcessorChain name="not-tolerant">
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
</config>

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,389 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.cloud.SolrCloudTestCase;
import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.assertUpdateTolerantErrors;
import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.addErr;
import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.delIErr;
import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.delQErr;
import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.f;
import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.update;
import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.ExpectedErr;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import static org.apache.solr.common.params.CursorMarkParams.CURSOR_MARK_PARAM;
import static org.apache.solr.common.params.CursorMarkParams.CURSOR_MARK_NEXT;
import static org.apache.solr.common.params.CursorMarkParams.CURSOR_MARK_START;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.ToleratedUpdateError;
import org.apache.solr.common.ToleratedUpdateError.CmdType;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.util.RevertDefaultThreadHandlerRule;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test of TolerantUpdateProcessor using a randomized MiniSolrCloud.
* Reuses some utility methods in {@link TestTolerantUpdateProcessorCloud}
*
* <p>
* <b>NOTE:</b> This test sets up a static instance of MiniSolrCloud with a single collection
* and several clients pointed at specific nodes. These are all re-used across multiple test methods,
* and assumes that the state of the cluster is healthy between tests.
* </p>
*
*/
public class TestTolerantUpdateProcessorRandomCloud extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String COLLECTION_NAME = "test_col";
/** A basic client for operations at the cloud level, default collection will be set */
private static CloudSolrClient CLOUD_CLIENT;
/** one HttpSolrClient for each server */
private static List<SolrClient> NODE_CLIENTS;
@BeforeClass
private static void createMiniSolrCloudCluster() throws Exception {
final String configName = "solrCloudCollectionConfig";
final File configDir = new File(TEST_HOME() + File.separator + "collection1" + File.separator + "conf");
final int numShards = TestUtil.nextInt(random(), 2, TEST_NIGHTLY ? 5 : 3);
final int repFactor = TestUtil.nextInt(random(), 2, TEST_NIGHTLY ? 5 : 3);
// at least one server won't have any replicas
final int numServers = 1 + (numShards * repFactor);
log.info("Configuring cluster: servers={}, shards={}, repfactor={}", numServers, numShards, repFactor);
configureCluster(numServers)
.addConfig(configName, configDir.toPath())
.configure();
TestTolerantUpdateProcessorCloud.assertSpinLoopAllJettyAreRunning(cluster);
Map<String, String> collectionProperties = new HashMap<>();
collectionProperties.put("config", "solrconfig-distrib-update-processor-chains.xml");
collectionProperties.put("schema", "schema15.xml"); // string id
assertNotNull(cluster.createCollection(COLLECTION_NAME, numShards, repFactor,
configName, null, null, collectionProperties));
CLOUD_CLIENT = cluster.getSolrClient();
CLOUD_CLIENT.setDefaultCollection(COLLECTION_NAME);
NODE_CLIENTS = new ArrayList<SolrClient>(numServers);
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
URL jettyURL = jetty.getBaseUrl();
NODE_CLIENTS.add(new HttpSolrClient(jettyURL.toString() + "/" + COLLECTION_NAME + "/"));
}
assertEquals(numServers, NODE_CLIENTS.size());
ZkStateReader zkStateReader = CLOUD_CLIENT.getZkStateReader();
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION_NAME, zkStateReader, true, true, 330);
}
@Before
private void deleteAllDocs() throws Exception {
assertEquals(0, update(params("commit","true")).deleteByQuery("*:*").process(CLOUD_CLIENT).getStatus());
assertEquals("index should be empty", 0L, countDocs(CLOUD_CLIENT));
}
public void testRandomUpdates() throws Exception {
final int maxDocId = atLeast(10000);
final BitSet expectedDocIds = new BitSet(maxDocId+1);
final int numIters = atLeast(50);
for (int i = 0; i < numIters; i++) {
log.info("BEGIN ITER #{}", i);
final UpdateRequest req = update(params("maxErrors","-1",
"update.chain", "tolerant-chain-max-errors-10"));
final int numCmds = TestUtil.nextInt(random(), 1, 20);
final List<ExpectedErr> expectedErrors = new ArrayList<ExpectedErr>(numCmds);
int expectedErrorsCount = 0;
// it's ambigious/confusing which order mixed DELQ + ADD (or ADD and DELI for the same ID)
// in the same request wll be processed by various clients, so we keep things simple
// and ensure that no single doc Id is affected by more then one command in the same request
final BitSet docsAffectedThisRequest = new BitSet(maxDocId+1);
for (int cmdIter = 0; cmdIter < numCmds; cmdIter++) {
if ((maxDocId / 2) < docsAffectedThisRequest.cardinality()) {
// we're already mucking with more then half the docs in the index
break;
}
final boolean causeError = random().nextBoolean();
if (causeError) {
expectedErrorsCount++;
}
if (random().nextBoolean()) {
// add a doc
String id = null;
SolrInputDocument doc = null;
if (causeError && (0 == TestUtil.nextInt(random(), 0, 21))) {
doc = doc(f("foo_s","no unique key"));
expectedErrors.add(addErr("(unknown)"));
} else {
final int id_i = randomUnsetBit(random(), docsAffectedThisRequest, maxDocId);
docsAffectedThisRequest.set(id_i);
id = "id_"+id_i;
if (causeError) {
expectedErrors.add(addErr(id));
} else {
expectedDocIds.set(id_i);
}
final String val = causeError ? "bogus_val" : (""+TestUtil.nextInt(random(), 42, 666));
doc = doc(f("id",id),
f("id_i", id_i),
f("foo_i", val));
}
req.add(doc);
log.info("ADD: {} = {}", id, doc);
} else {
// delete something
if (random().nextBoolean()) {
// delete by id
final int id_i = randomUnsetBit(random(), docsAffectedThisRequest, maxDocId);
final String id = "id_"+id_i;
final boolean docExists = expectedDocIds.get(id_i);
docsAffectedThisRequest.set(id_i);
long versionConstraint = docExists ? 1 : -1;
if (causeError) {
versionConstraint = -1 * versionConstraint;
expectedErrors.add(delIErr(id));
} else {
// if doc exists it will legitimately be deleted
expectedDocIds.clear(id_i);
}
req.deleteById(id, versionConstraint);
log.info("DEL: {} = {}", id, causeError ? "ERR" : "OK" );
} else {
// delete by query
final String q;
if (causeError) {
// even though our DBQ is gibberish that's going to fail, record a docId as affected
// so that we don't generate the same random DBQ and get redundent errors
// (problematic because of how DUP forwarded DBQs have to have their errors deduped by TUP)
final int id_i = randomUnsetBit(random(), docsAffectedThisRequest, maxDocId);
docsAffectedThisRequest.set(id_i);
q = "foo_i:["+id_i+" TO ....giberish";
expectedErrors.add(delQErr(q));
} else {
// ensure our DBQ is only over a range of docs not already affected
// by any other cmds in this request
final int rangeAxis = randomUnsetBit(random(), docsAffectedThisRequest, maxDocId);
final int loBound = docsAffectedThisRequest.previousSetBit(rangeAxis);
final int hiBound = docsAffectedThisRequest.nextSetBit(rangeAxis);
final int lo = TestUtil.nextInt(random(), loBound+1, rangeAxis);
final int hi = TestUtil.nextInt(random(), rangeAxis,
// bound might be negative if no set bits above axis
(hiBound < 0) ? maxDocId : hiBound-1);
if (lo != hi) {
assert lo < hi : "lo="+lo+" hi="+hi;
// NOTE: clear & set are exclusive of hi, so we use "}" in range query accordingly
q = "id_i:[" + lo + " TO " + hi + "}";
expectedDocIds.clear(lo, hi);
docsAffectedThisRequest.set(lo, hi);
} else {
// edge case: special case DBQ of one doc
assert (lo == rangeAxis && hi == rangeAxis) : "lo="+lo+" axis="+rangeAxis+" hi="+hi;
q = "id_i:[" + lo + " TO " + lo + "]"; // have to be inclusive of both ends
expectedDocIds.clear(lo);
docsAffectedThisRequest.set(lo);
}
}
req.deleteByQuery(q);
log.info("DEL: {}", q);
}
}
}
assertEquals("expected error count sanity check: " + req.toString(),
expectedErrorsCount, expectedErrors.size());
final SolrClient client = random().nextBoolean() ? CLOUD_CLIENT
: NODE_CLIENTS.get(TestUtil.nextInt(random(), 0, NODE_CLIENTS.size()-1));
final UpdateResponse rsp = req.process(client);
assertUpdateTolerantErrors(client.toString() + " => " + expectedErrors.toString(), rsp,
expectedErrors.toArray(new ExpectedErr[expectedErrors.size()]));
log.info("END ITER #{}, expecting #docs: {}", i, expectedDocIds.cardinality());
assertEquals("post update commit failed?", 0, CLOUD_CLIENT.commit().getStatus());
for (int j = 0; j < 5; j++) {
if (expectedDocIds.cardinality() == countDocs(CLOUD_CLIENT)) {
break;
}
log.info("sleeping to give searchers a chance to re-open #" + j);
Thread.sleep(200);
}
// check the index contents against our expecationts
final BitSet actualDocIds = allDocs(CLOUD_CLIENT, maxDocId);
if ( expectedDocIds.cardinality() != actualDocIds.cardinality() ) {
log.error("cardinality missmatch: expected {} BUT actual {}",
expectedDocIds.cardinality(),
actualDocIds.cardinality());
}
final BitSet x = (BitSet) actualDocIds.clone();
x.xor(expectedDocIds);
for (int b = x.nextSetBit(0); 0 <= b; b = x.nextSetBit(b+1)) {
final boolean expectedBit = expectedDocIds.get(b);
final boolean actualBit = actualDocIds.get(b);
log.error("bit #"+b+" mismatch: expected {} BUT actual {}", expectedBit, actualBit);
}
assertEquals(x.cardinality() + " mismatched bits",
expectedDocIds.cardinality(), actualDocIds.cardinality());
}
}
/** sanity check that randomUnsetBit works as expected
* @see #randomUnsetBit
*/
public void testSanityRandomUnsetBit() {
final int max = atLeast(100);
BitSet bits = new BitSet(max+1);
for (int i = 0; i <= max; i++) {
assertFalse("how is bitset already full? iter="+i+" card="+bits.cardinality()+"/max="+max,
bits.cardinality() == max+1);
final int nextBit = randomUnsetBit(random(), bits, max);
assertTrue("nextBit shouldn't be negative yet: " + nextBit,
0 <= nextBit);
assertTrue("nextBit can't exceed max: " + nextBit,
nextBit <= max);
assertFalse("expect unset: " + nextBit, bits.get(nextBit));
bits.set(nextBit);
}
assertEquals("why isn't bitset full?", max+1, bits.cardinality());
final int firstClearBit = bits.nextClearBit(0);
assertTrue("why is there a clear bit? = " + firstClearBit,
max < firstClearBit);
assertEquals("why is a bit set above max?",
-1, bits.nextSetBit(max+1));
assertEquals("wrong nextBit at end of all iters", -1,
randomUnsetBit(random(), bits, max));
assertEquals("wrong nextBit at redundent end of all iters", -1,
randomUnsetBit(random(), bits, max));
}
public static SolrInputDocument doc(SolrInputField... fields) {
// SolrTestCaseJ4 has same method name, prevents static import from working
return TestTolerantUpdateProcessorCloud.doc(fields);
}
/**
* Given a BitSet, returns a random bit that is currently false, or -1 if all bits are true.
* NOTE: this method is not fair.
*/
public static final int randomUnsetBit(Random r, BitSet bits, final int max) {
// NOTE: don't forget, BitSet will grow automatically if not careful
if (bits.cardinality() == max+1) {
return -1;
}
final int candidate = TestUtil.nextInt(r, 0, max);
if (bits.get(candidate)) {
final int lo = bits.previousClearBit(candidate);
final int hi = bits.nextClearBit(candidate);
if (lo < 0 && max < hi) {
fail("how the hell did we not short circut out? card="+bits.cardinality()+"/size="+bits.size());
} else if (lo < 0) {
return hi;
} else if (max < hi) {
return lo;
} // else...
return ((candidate - lo) < (hi - candidate)) ? lo : hi;
}
return candidate;
}
/** returns the numFound from a *:* query */
public static final long countDocs(SolrClient c) throws Exception {
return c.query(params("q","*:*","rows","0")).getResults().getNumFound();
}
/** uses a Cursor to iterate over every doc in the index, recording the 'id_i' value in a BitSet */
private static final BitSet allDocs(final SolrClient c, final int maxDocIdExpected) throws Exception {
BitSet docs = new BitSet(maxDocIdExpected+1);
String cursorMark = CURSOR_MARK_START;
int docsOnThisPage = Integer.MAX_VALUE;
while (0 < docsOnThisPage) {
final SolrParams p = params("q","*:*",
"rows","100",
// note: not numeric, but we don't actual care about the order
"sort", "id asc",
CURSOR_MARK_PARAM, cursorMark);
QueryResponse rsp = c.query(p);
cursorMark = rsp.getNextCursorMark();
docsOnThisPage = 0;
for (SolrDocument doc : rsp.getResults()) {
docsOnThisPage++;
int id_i = ((Integer)doc.get("id_i")).intValue();
assertTrue("found id_i bigger then expected "+maxDocIdExpected+": " + id_i,
id_i <= maxDocIdExpected);
docs.set(id_i);
}
cursorMark = rsp.getNextCursorMark();
}
return docs;
}
}

View File

@ -96,4 +96,9 @@ public class TestBadConfig extends AbstractBadConfigTestBase {
assertConfigs("bad-solrconfig-unexpected-schema-attribute.xml", "schema-minimal.xml",
"Unexpected arg(s): {bogusParam=bogusValue}");
}
public void testTolerantUpdateProcessorNoUniqueKey() throws Exception {
assertConfigs("solrconfig-tolerant-update-minimal.xml", "schema-minimal.xml",
"requires a schema that includes a uniqueKey field");
}
}

View File

@ -0,0 +1,447 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import java.io.IOException;
import java.io.StringWriter;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.xml.xpath.XPathExpressionException;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.ToleratedUpdateError;
import org.apache.solr.common.ToleratedUpdateError.CmdType;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.servlet.DirectSolrConnection;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.util.BaseTestHarness;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.xml.sax.SAXException;
public class TolerantUpdateProcessorTest extends UpdateProcessorTestBase {
/**
* List of valid + invalid documents
*/
private static List<SolrInputDocument> docs = null;
/**
* IDs of the invalid documents in <code>docs</code>
*/
private static String[] badIds = null;
@BeforeClass
public static void beforeClass() throws Exception {
initCore("solrconfig-update-processor-chains.xml", "schema12.xml");
}
@AfterClass
public static void tearDownClass() {
docs = null;
badIds = null;
}
@Override
public void setUp() throws Exception {
super.setUp();
//expected exception messages
ignoreException("Error adding field");
ignoreException("Document is missing mandatory uniqueKey field");
if (docs == null) {
docs = new ArrayList<>(20);
badIds = new String[10];
for(int i = 0; i < 10;i++) {
// a valid document
docs.add(doc(field("id", 1f, String.valueOf(2*i)), field("weight", 1f, i)));
// ... and an invalid one
docs.add(doc(field("id", 1f, String.valueOf(2*i+1)), field("weight", 1f, "b")));
badIds[i] = String.valueOf(2*i+1);
}
}
}
@Override
public void tearDown() throws Exception {
resetExceptionIgnores();
assertU(delQ("*:*"));
assertU(commit());
assertQ(req("q","*:*")
,"//result[@numFound='0']");
super.tearDown();
}
/**
* future proof TolerantUpdateProcessor against new default method impls being added to UpdateProcessor
* to ensure that every method involved in a processor chain life cycle is overridden with
* exception catching/tracking.
*/
public void testReflection() {
for (Method method : TolerantUpdateProcessor.class.getMethods()) {
if (method.getDeclaringClass().equals(Object.class)) {
continue;
}
assertEquals("base class(es) has changed, TolerantUpdateProcessor needs updated to ensure it " +
"overrides all solr update lifcycle methods with exception tracking: " + method.toString(),
TolerantUpdateProcessor.class, method.getDeclaringClass());
}
}
@Test
public void testValidAdds() throws IOException {
SolrInputDocument validDoc = doc(field("id", 1f, "1"), field("text", 1f, "the quick brown fox"));
add("tolerant-chain-max-errors-10", null, validDoc);
validDoc = doc(field("id", 1f, "2"), field("text", 1f, "the quick brown fox"));
add("tolerant-chain-max-errors-not-set", null, validDoc);
assertU(commit());
assertQ(req("q","*:*")
,"//result[@numFound='2']");
assertQ(req("q","id:1")
,"//result[@numFound='1']");
assertQ(req("q","id:2")
,"//result[@numFound='1']");
}
@Test
public void testInvalidAdds() throws IOException {
SolrInputDocument invalidDoc = doc(field("text", 1f, "the quick brown fox")); //no id
try {
// This doc should fail without being tolerant
add("not-tolerant", null, invalidDoc);
fail("Expecting exception");
} catch (Exception e) {
//expected
assertTrue(e.getMessage().contains("Document is missing mandatory uniqueKey field"));
}
assertAddsSucceedWithErrors("tolerant-chain-max-errors-10", Arrays.asList(new SolrInputDocument[]{invalidDoc}), null, "(unknown)");
//a valid doc
SolrInputDocument validDoc = doc(field("id", 1f, "1"), field("text", 1f, "the quick brown fox"));
try {
// This batch should fail without being tolerant
add("not-tolerant", null, Arrays.asList(new SolrInputDocument[]{invalidDoc, validDoc}));
fail("Expecting exception");
} catch (Exception e) {
//expected
assertTrue(e.getMessage().contains("Document is missing mandatory uniqueKey field"));
}
assertU(commit());
assertQ(req("q","id:1")
,"//result[@numFound='0']");
assertAddsSucceedWithErrors("tolerant-chain-max-errors-10", Arrays.asList(new SolrInputDocument[]{invalidDoc, validDoc}), null, "(unknown)");
assertU(commit());
// verify that the good document made it in.
assertQ(req("q","id:1")
,"//result[@numFound='1']");
invalidDoc = doc(field("id", 1f, "2"), field("weight", 1f, "aaa"));
validDoc = doc(field("id", 1f, "3"), field("weight", 1f, "3"));
try {
// This batch should fail without being tolerant
add("not-tolerant", null, Arrays.asList(new SolrInputDocument[]{invalidDoc, validDoc})); //no id
fail("Expecting exception");
} catch (Exception e) {
//expected
assertTrue(e.getMessage().contains("Error adding field"));
}
assertU(commit());
assertQ(req("q","id:3")
,"//result[@numFound='0']");
assertAddsSucceedWithErrors("tolerant-chain-max-errors-10", Arrays.asList(new SolrInputDocument[]{invalidDoc, validDoc}), null, "2");
assertU(commit());
// The valid document was indexed
assertQ(req("q","id:3")
,"//result[@numFound='1']");
// The invalid document was NOT indexed
assertQ(req("q","id:2")
,"//result[@numFound='0']");
}
@Test
public void testMaxErrorsDefault() throws IOException {
try {
// by default the TolerantUpdateProcessor accepts all errors, so this batch should succeed with 10 errors.
assertAddsSucceedWithErrors("tolerant-chain-max-errors-not-set", docs, null, badIds);
} catch(Exception e) {
fail("Shouldn't get an exception for this batch: " + e.getMessage());
}
assertU(commit());
assertQ(req("q","*:*")
,"//result[@numFound='10']");
}
public void testMaxErrorsSucceed() throws IOException {
ModifiableSolrParams requestParams = new ModifiableSolrParams();
requestParams.add("maxErrors", "10");
// still OK
assertAddsSucceedWithErrors("tolerant-chain-max-errors-not-set", docs, requestParams, badIds);
assertU(commit());
assertQ(req("q","*:*")
,"//result[@numFound='10']");
}
@Test
public void testMaxErrorsThrowsException() throws IOException {
ModifiableSolrParams requestParams = new ModifiableSolrParams();
requestParams.add("maxErrors", "5");
try {
// should fail
assertAddsSucceedWithErrors("tolerant-chain-max-errors-not-set", docs, requestParams, badIds);
fail("Expecting exception");
} catch (SolrException e) {
assertTrue(e.getMessage(),
e.getMessage().contains("ERROR: [doc=1] Error adding field 'weight'='b' msg=For input string: \"b\""));
}
//the first good documents made it to the index
assertU(commit());
assertQ(req("q","*:*")
,"//result[@numFound='6']");
}
@Test
public void testMaxErrorsInfinite() throws IOException {
ModifiableSolrParams requestParams = new ModifiableSolrParams();
requestParams.add("maxErrors", "-1");
try {
assertAddsSucceedWithErrors("tolerant-chain-max-errors-not-set", docs, null, badIds);
} catch(Exception e) {
fail("Shouldn't get an exception for this batch: " + e.getMessage());
}
assertU(commit());
assertQ(req("q","*:*")
,"//result[@numFound='10']");
}
@Test
public void testMaxErrors0() throws IOException {
//make the TolerantUpdateProcessor intolerant
List<SolrInputDocument> smallBatch = docs.subList(0, 2);
ModifiableSolrParams requestParams = new ModifiableSolrParams();
requestParams.add("maxErrors", "0");
try {
// should fail
assertAddsSucceedWithErrors("tolerant-chain-max-errors-10", smallBatch, requestParams, "1");
fail("Expecting exception");
} catch (SolrException e) {
assertTrue(e.getMessage().contains("ERROR: [doc=1] Error adding field 'weight'='b' msg=For input string: \"b\""));
}
//the first good documents made it to the index
assertU(commit());
assertQ(req("q","*:*")
,"//result[@numFound='1']");
}
@Test
public void testInvalidDelete() throws XPathExpressionException, SAXException {
ignoreException("undefined field invalidfield");
String response = update("tolerant-chain-max-errors-10", adoc("id", "1", "text", "the quick brown fox"));
assertNull(BaseTestHarness.validateXPath(response,
"//int[@name='status']=0",
"//arr[@name='errors']",
"count(//arr[@name='errors']/lst)=0"));
response = update("tolerant-chain-max-errors-10", delQ("invalidfield:1"));
assertNull(BaseTestHarness.validateXPath
(response,
"//int[@name='status']=0",
"count(//arr[@name='errors']/lst)=1",
"//arr[@name='errors']/lst/str[@name='type']/text()='DELQ'",
"//arr[@name='errors']/lst/str[@name='id']/text()='invalidfield:1'",
"//arr[@name='errors']/lst/str[@name='message']/text()='undefined field invalidfield'"));
}
@Test
public void testValidDelete() throws XPathExpressionException, SAXException {
ignoreException("undefined field invalidfield");
String response = update("tolerant-chain-max-errors-10", adoc("id", "1", "text", "the quick brown fox"));
assertNull(BaseTestHarness.validateXPath(response,
"//int[@name='status']=0",
"//arr[@name='errors']",
"count(//arr[@name='errors']/lst)=0"));
assertU(commit());
assertQ(req("q","*:*")
,"//result[@numFound='1']");
response = update("tolerant-chain-max-errors-10", delQ("id:1"));
assertNull(BaseTestHarness.validateXPath(response,
"//int[@name='status']=0",
"//arr[@name='errors']",
"count(//arr[@name='errors']/lst)=0"));
assertU(commit());
assertQ(req("q","*:*")
,"//result[@numFound='0']");
}
@Test
public void testResponse() throws SAXException, XPathExpressionException, IOException {
String response = update("tolerant-chain-max-errors-10", adoc("id", "1", "text", "the quick brown fox"));
assertNull(BaseTestHarness.validateXPath(response,
"//int[@name='status']=0",
"//arr[@name='errors']",
"count(//arr[@name='errors']/lst)=0"));
response = update("tolerant-chain-max-errors-10", adoc("text", "the quick brown fox"));
assertNull(BaseTestHarness.validateXPath(response, "//int[@name='status']=0",
"//int[@name='maxErrors']/text()='10'",
"count(//arr[@name='errors']/lst)=1",
"//arr[@name='errors']/lst/str[@name='id']/text()='(unknown)'",
"//arr[@name='errors']/lst/str[@name='message']/text()='Document is missing mandatory uniqueKey field: id'"));
response = update("tolerant-chain-max-errors-10", adoc("text", "the quick brown fox"));
StringWriter builder = new StringWriter();
builder.append("<add>");
for (SolrInputDocument doc:docs) {
ClientUtils.writeXML(doc, builder);
}
builder.append("</add>");
response = update("tolerant-chain-max-errors-10", builder.toString());
assertNull(BaseTestHarness.validateXPath(response, "//int[@name='status']=0",
"//int[@name='maxErrors']/text()='10'",
"count(//arr[@name='errors']/lst)=10",
"not(//arr[@name='errors']/lst/str[@name='id']/text()='0')",
"//arr[@name='errors']/lst/str[@name='id']/text()='1'",
"not(//arr[@name='errors']/lst/str[@name='id']/text()='2')",
"//arr[@name='errors']/lst/str[@name='id']/text()='3'",
"not(//arr[@name='errors']/lst/str[@name='id']/text()='4')",
"//arr[@name='errors']/lst/str[@name='id']/text()='5'",
"not(//arr[@name='errors']/lst/str[@name='id']/text()='6')",
"//arr[@name='errors']/lst/str[@name='id']/text()='7'",
"not(//arr[@name='errors']/lst/str[@name='id']/text()='8')",
"//arr[@name='errors']/lst/str[@name='id']/text()='9'",
"not(//arr[@name='errors']/lst/str[@name='id']/text()='10')",
"//arr[@name='errors']/lst/str[@name='id']/text()='11'",
"not(//arr[@name='errors']/lst/str[@name='id']/text()='12')",
"//arr[@name='errors']/lst/str[@name='id']/text()='13'",
"not(//arr[@name='errors']/lst/str[@name='id']/text()='14')",
"//arr[@name='errors']/lst/str[@name='id']/text()='15'",
"not(//arr[@name='errors']/lst/str[@name='id']/text()='16')",
"//arr[@name='errors']/lst/str[@name='id']/text()='17'",
"not(//arr[@name='errors']/lst/str[@name='id']/text()='18')",
"//arr[@name='errors']/lst/str[@name='id']/text()='19'"));
// spot check response when effective maxErrors is unlimited
response = update("tolerant-chain-max-errors-not-set", builder.toString());
assertNull(BaseTestHarness.validateXPath(response, "//int[@name='maxErrors']/text()='-1'"));
}
public String update(String chain, String xml) {
DirectSolrConnection connection = new DirectSolrConnection(h.getCore());
SolrRequestHandler handler = h.getCore().getRequestHandler("/update");
ModifiableSolrParams params = new ModifiableSolrParams();
params.add("update.chain", chain);
try {
return connection.request(handler, params, xml);
} catch (SolrException e) {
throw (SolrException)e;
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
}
}
private void assertAddsSucceedWithErrors(String chain,
final Collection<SolrInputDocument> docs,
SolrParams requestParams,
String... idsShouldFail) throws IOException {
SolrQueryResponse response = add(chain, requestParams, docs);
@SuppressWarnings("unchecked")
List<SimpleOrderedMap<String>> errors = (List<SimpleOrderedMap<String>>)
response.getResponseHeader().get("errors");
assertNotNull(errors);
assertEquals("number of errors", idsShouldFail.length, errors.size());
Set<String> addErrorIdsExpected = new HashSet<String>(Arrays.asList(idsShouldFail));
for (SimpleOrderedMap<String> err : errors) {
assertEquals("this method only expects 'add' errors", "ADD", err.get("type"));
String id = err.get("id");
assertNotNull("null err id", id);
assertTrue("unexpected id", addErrorIdsExpected.contains(id));
}
}
protected SolrQueryResponse add(final String chain, SolrParams requestParams, final SolrInputDocument doc) throws IOException {
return add(chain, requestParams, Arrays.asList(new SolrInputDocument[]{doc}));
}
protected SolrQueryResponse add(final String chain, SolrParams requestParams, final Collection<SolrInputDocument> docs) throws IOException {
SolrCore core = h.getCore();
UpdateRequestProcessorChain pc = core.getUpdateProcessingChain(chain);
assertNotNull("No Chain named: " + chain, pc);
SolrQueryResponse rsp = new SolrQueryResponse();
rsp.add("responseHeader", new SimpleOrderedMap<Object>());
if(requestParams == null) {
requestParams = new ModifiableSolrParams();
}
SolrQueryRequest req = new LocalSolrQueryRequest(core, requestParams);
try {
UpdateRequestProcessor processor = pc.createProcessor(req, rsp);
for(SolrInputDocument doc:docs) {
AddUpdateCommand cmd = new AddUpdateCommand(req);
cmd.solrDoc = doc;
processor.processAdd(cmd);
}
processor.finish();
} finally {
req.close();
}
return rsp;
}
}

View File

@ -54,6 +54,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.ToleratedUpdateError;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@ -72,6 +73,7 @@ import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.Hash;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.common.util.StrUtils;
import org.apache.zookeeper.KeeperException;
@ -726,6 +728,11 @@ public class CloudSolrClient extends SolrClient {
int status = 0;
Integer rf = null;
Integer minRf = null;
// TolerantUpdateProcessor
List<SimpleOrderedMap<String>> toleratedErrors = null;
int maxToleratedErrors = Integer.MAX_VALUE;
for(int i=0; i<response.size(); i++) {
NamedList shardResponse = (NamedList)response.getVal(i);
NamedList header = (NamedList)shardResponse.get("responseHeader");
@ -741,6 +748,24 @@ public class CloudSolrClient extends SolrClient {
rf = routeRf;
}
minRf = (Integer)header.get(UpdateRequest.MIN_REPFACT);
List<SimpleOrderedMap<String>> shardTolerantErrors =
(List<SimpleOrderedMap<String>>) header.get("errors");
if (null != shardTolerantErrors) {
Integer shardMaxToleratedErrors = (Integer) header.get("maxErrors");
assert null != shardMaxToleratedErrors : "TolerantUpdateProcessor reported errors but not maxErrors";
// if we get into some weird state where the nodes disagree about the effective maxErrors,
// assume the min value seen to decide if we should fail.
maxToleratedErrors = Math.min(maxToleratedErrors,
ToleratedUpdateError.getEffectiveMaxErrors(shardMaxToleratedErrors.intValue()));
if (null == toleratedErrors) {
toleratedErrors = new ArrayList<SimpleOrderedMap<String>>(shardTolerantErrors.size());
}
for (SimpleOrderedMap<String> err : shardTolerantErrors) {
toleratedErrors.add(err);
}
}
}
NamedList cheader = new NamedList();
@ -750,7 +775,31 @@ public class CloudSolrClient extends SolrClient {
cheader.add(UpdateRequest.REPFACT, rf);
if (minRf != null)
cheader.add(UpdateRequest.MIN_REPFACT, minRf);
if (null != toleratedErrors) {
cheader.add("maxErrors", ToleratedUpdateError.getUserFriendlyMaxErrors(maxToleratedErrors));
cheader.add("errors", toleratedErrors);
if (maxToleratedErrors < toleratedErrors.size()) {
// cumulative errors are too high, we need to throw a client exception w/correct metadata
// NOTE: it shouldn't be possible for 1 == toleratedErrors.size(), because if that were the case
// then at least one shard should have thrown a real error before this, so we don't worry
// about having a more "singular" exception msg for that situation
StringBuilder msgBuf = new StringBuilder()
.append(toleratedErrors.size()).append(" Async failures during distributed update: ");
NamedList metadata = new NamedList<String>();
for (SimpleOrderedMap<String> err : toleratedErrors) {
ToleratedUpdateError te = ToleratedUpdateError.parseMap(err);
metadata.add(te.getMetadataKey(), te.getMetadataValue());
msgBuf.append("\n").append(te.getMessage());
}
SolrException toThrow = new SolrException(ErrorCode.BAD_REQUEST, msgBuf.toString());
toThrow.setMetadata(metadata);
throw toThrow;
}
}
condensed.add("responseHeader", cheader);
return condensed;
}
@ -786,6 +835,22 @@ public class CloudSolrClient extends SolrClient {
super(errorCode, throwables.getVal(0).getMessage(), throwables.getVal(0));
this.throwables = throwables;
this.routes = routes;
// create a merged copy of the metadata from all wrapped exceptions
NamedList<String> metadata = new NamedList<String>();
for (int i = 0; i < throwables.size(); i++) {
Throwable t = throwables.getVal(i);
if (t instanceof SolrException) {
SolrException e = (SolrException) t;
NamedList<String> eMeta = e.getMetadata();
if (null != eMeta) {
metadata.addAll(eMeta);
}
}
}
if (0 < metadata.size()) {
this.setMetadata(metadata);
}
}
public NamedList<Throwable> getThrowables() {

View File

@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.common;
import java.util.ArrayList;
import java.util.List;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
/**
* Models the basic information related to a single "tolerated" error that occured during updates.
* This class is only useful when the <code>ToleranteUpdateProcessorFactory</code> is used in an update
* processor chain
*/
public final class ToleratedUpdateError {
private final static String META_PRE = ToleratedUpdateError.class.getName() + "--";
private final static int META_PRE_LEN = META_PRE.length();
/**
* Given a 'maxErrors' value such that<code>-1 &lt;= maxErrors &lt;= {@link Integer#MAX_VALUE}</code>
* this method returns the original input unless it is <code>-1</code> in which case the effective value of
* {@link Integer#MAX_VALUE} is returned.
* Input of <code>maxErrors &lt; -1</code> will trip an assertion and otherwise have undefined behavior.
* @see #getUserFriendlyMaxErrors
*/
public static int getEffectiveMaxErrors(int maxErrors) {
assert -1 <= maxErrors;
return -1 == maxErrors ? Integer.MAX_VALUE : maxErrors;
}
/**
* Given a 'maxErrors' value such that<code>-1 &lt;= maxErrors &lt;= {@link Integer#MAX_VALUE}</code>
* this method returns the original input unless it is {@link Integer#MAX_VALUE} in which case
* <code>-1</code> is returned for user convinience.
* Input of <code>maxErrors &lt; -1</code> will trip an assertion and otherwise have undefined behavior.
* @see #getEffectiveMaxErrors
*/
public static int getUserFriendlyMaxErrors(int maxErrors) {
assert -1 <= maxErrors;
return Integer.MAX_VALUE == maxErrors ? -1 : maxErrors;
}
/**
* returns a list of maps of simple objects suitable for putting in a SolrQueryResponse header
* @see #getSimpleMap
* @see #parseMap
*/
public static List<SimpleOrderedMap<String>> formatForResponseHeader(List<ToleratedUpdateError> errs) {
List<SimpleOrderedMap<String>> result = new ArrayList<>(errs.size());
for (ToleratedUpdateError e : errs) {
result.add(e.getSimpleMap());
}
return result;
}
/**
* returns a ToleratedUpdateError instance from the data in this Map
* @see #getSimpleMap
*/
public static ToleratedUpdateError parseMap(SimpleOrderedMap<String> data) {
final String id = data.get("id");
final String message = data.get("message");
final String t = data.get("type");
if (null == t || null == id || null == message) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Map does not represent a ToleratedUpdateError, must contain 'type', 'id', and 'message'");
}
try {
return new ToleratedUpdateError(CmdType.valueOf(t), id, message);
} catch (IllegalArgumentException iae) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Invalid type for ToleratedUpdateError: " + t, iae);
}
}
/**
* returns a ToleratedUpdateError instance if this metadataKey is one we care about, else null
* @see #getMetadataKey
* @see #getMetadataValue
*/
public static ToleratedUpdateError parseMetadataIfToleratedUpdateError(String metadataKey,
String metadataVal) {
if (! metadataKey.startsWith(META_PRE)) {
return null; // not a key we care about
}
final int typeEnd = metadataKey.indexOf(':', META_PRE_LEN);
if (typeEnd < 0) {
return null; // has our prefix, but not our format -- must not be a key we (actually) care about
}
return new ToleratedUpdateError(CmdType.valueOf(metadataKey.substring(META_PRE_LEN, typeEnd)),
metadataKey.substring(typeEnd+1), metadataVal);
}
private final CmdType type;
private final String id;
private final String message;
public ToleratedUpdateError(CmdType type, String id, String message) {
assert null != type;
this.type = type;
assert null != id;
this.id = id;
assert null != message;
this.message = message;
}
public CmdType getType() {
return type;
}
public String getId() {
return id;
}
public String getMessage() {
return message;
}
/**
* returns a string suitable for use as a key in {@link SolrException#setMetadata}
*
* @see #parseMetadataIfToleratedUpdateError
*/
public String getMetadataKey() {
return META_PRE + type + ":" + id;
}
/**
* returns a string suitable for use as a value in {@link SolrException#setMetadata}
*
* @see #parseMetadataIfToleratedUpdateError
*/
public String getMetadataValue() {
return message.toString();
}
/**
* returns a map of simple objects suitable for putting in a SolrQueryResponse header
* @see #formatForResponseHeader
* @see #parseMap
*/
public SimpleOrderedMap<String> getSimpleMap() {
SimpleOrderedMap<String> entry = new SimpleOrderedMap<String>();
entry.add("type", type.toString());
entry.add("id", id);
entry.add("message", message);
return entry;
}
public String toString() {
return getMetadataKey() + "=>" + getMetadataValue();
}
public int hashCode() {
int h = this.getClass().hashCode();
h = h * 31 + type.hashCode();
h = h * 31 + id.hashCode();
h = h * 31 + message.hashCode();
return h;
}
public boolean equals(Object o) {
if (o instanceof ToleratedUpdateError) {
ToleratedUpdateError that = (ToleratedUpdateError)o;
return that.type.equals(this.type)
&& that.id.equals(this.id)
&& that.message.equals(this.message);
}
return false;
}
/**
* Helper class for dealing with SolrException metadata (String) keys
*/
public static enum CmdType {
ADD, DELID, DELQ;
// if we add support for things like commit, parsing/toString/hashCode logic
// needs to be smarter to account for 'id' being null ... "usesId" should be a prop of enum instances
}
}

View File

@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.common;
import java.util.EnumSet;
import org.apache.solr.common.ToleratedUpdateError;
import org.apache.solr.common.ToleratedUpdateError.CmdType;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
/** Basic testing of the serialization/encapsulation code in ToleratedUpdateError */
public class TestToleratedUpdateError extends LuceneTestCase {
private final static CmdType[] ALL_TYPES = EnumSet.allOf(CmdType.class).toArray(new CmdType[0]);
public void testBasics() {
assertFalse((new ToleratedUpdateError(CmdType.ADD, "doc1", "some error")).equals
(new ToleratedUpdateError(CmdType.ADD, "doc2", "some error")));
assertFalse((new ToleratedUpdateError(CmdType.ADD, "doc1", "some error")).equals
(new ToleratedUpdateError(CmdType.ADD, "doc1", "some errorxx")));
assertFalse((new ToleratedUpdateError(CmdType.ADD, "doc1", "some error")).equals
(new ToleratedUpdateError(CmdType.DELID, "doc1", "some error")));
}
public void testParseMetadataErrorHandling() {
assertNull(ToleratedUpdateError.parseMetadataIfToleratedUpdateError("some other key", "some value"));
// see if someone tries to trick us into having an NPE...
ToleratedUpdateError valid = new ToleratedUpdateError(CmdType.ADD, "doc2", "some error");
String badKey = valid.getMetadataKey().replace(":", "X");
assertNull(ToleratedUpdateError.parseMetadataIfToleratedUpdateError(badKey, valid.getMetadataValue()));
}
public void testParseMapErrorChecking() {
SimpleOrderedMap<String> bogus = new SimpleOrderedMap<String>();
try {
ToleratedUpdateError.parseMap(bogus);
fail("map should not be parsable");
} catch (SolrException e) {
assertTrue(e.toString(), e.getMessage().contains("Map does not represent a ToleratedUpdateError") );
}
bogus.add("id", "some id");
bogus.add("message", "some message");
try {
ToleratedUpdateError.parseMap(bogus);
fail("map should still not be parsable");
} catch (SolrException e) {
assertTrue(e.toString(), e.getMessage().contains("Map does not represent a ToleratedUpdateError") );
}
bogus.add("type", "not a real type");
try {
ToleratedUpdateError.parseMap(bogus);
fail("invalid type should not be parsable");
} catch (SolrException e) {
assertTrue(e.toString(), e.getMessage().contains("Invalid type"));
}
}
public void testParseMap() {
// trivial
SimpleOrderedMap valid = new SimpleOrderedMap<String>();
valid.add("type", CmdType.ADD.toString());
valid.add("id", "some id");
valid.add("message", "some message");
ToleratedUpdateError in = ToleratedUpdateError.parseMap(valid);
compare(in, MAP_COPPIER);
compare(in, METADATA_COPPIER);
// randomized
int numIters = atLeast(5000);
for (int i = 0; i < numIters; i++) {
valid = new SimpleOrderedMap<String>();
valid.add("type", ALL_TYPES[TestUtil.nextInt(random(), 0, ALL_TYPES.length-1)].toString());
valid.add("id", TestUtil.randomUnicodeString(random()));
valid.add("message", TestUtil.randomUnicodeString(random()));
in = ToleratedUpdateError.parseMap(valid);
compare(in, MAP_COPPIER);
compare(in, METADATA_COPPIER);
}
}
public void checkRoundTripComparisons(Coppier coppier) {
// some simple basics
for (ToleratedUpdateError in : new ToleratedUpdateError[] {
new ToleratedUpdateError(CmdType.ADD, "doc1", "some error"),
new ToleratedUpdateError(CmdType.DELID, "doc1", "some diff error"),
new ToleratedUpdateError(CmdType.DELQ, "-field:yakko other_field:wakko", "some other error"),
}) {
compare(in, coppier);
}
// randomized testing of non trivial keys/values
int numIters = atLeast(5000);
for (int i = 0; i < numIters; i++) {
ToleratedUpdateError in = new ToleratedUpdateError
(ALL_TYPES[TestUtil.nextInt(random(), 0, ALL_TYPES.length-1)],
TestUtil.randomUnicodeString(random()),
TestUtil.randomUnicodeString(random()));
compare(in, coppier);
}
}
public void testMetadataRoundTripComparisons(Coppier coppier) {
checkRoundTripComparisons(METADATA_COPPIER);
}
public void testMapRoundTripComparisons() {
checkRoundTripComparisons(MAP_COPPIER);
}
/** trivial sanity check */
public void testMaxErrorsValueConversion() {
assertEquals(-1, ToleratedUpdateError.getUserFriendlyMaxErrors(-1));
assertEquals(-1, ToleratedUpdateError.getUserFriendlyMaxErrors(Integer.MAX_VALUE));
assertEquals(Integer.MAX_VALUE, ToleratedUpdateError.getEffectiveMaxErrors(Integer.MAX_VALUE));
assertEquals(Integer.MAX_VALUE, ToleratedUpdateError.getEffectiveMaxErrors(-1));
for (int val : new int[] {0, 1, 10, 42, 600000 }) {
assertEquals(val, ToleratedUpdateError.getEffectiveMaxErrors(val));
assertEquals(val, ToleratedUpdateError.getUserFriendlyMaxErrors(val));
}
}
public void compare(ToleratedUpdateError in, Coppier coppier) {
ToleratedUpdateError out = coppier.copy(in);
assertNotNull(out);
compare(in, out);
}
public void compare(ToleratedUpdateError in, ToleratedUpdateError out) {
assertEquals(out.getType(), in.getType());
assertEquals(out.getId(), in.getId());
assertEquals(out.getMessage(), in.getMessage());
assertEquals(out.hashCode(), in.hashCode());
assertEquals(out.toString(), in.toString());
assertEquals(in.getMetadataKey(), out.getMetadataKey());
assertEquals(in.getMetadataValue(), out.getMetadataValue());
assertEquals(out, in);
assertEquals(in, out);
}
private static abstract class Coppier {
public abstract ToleratedUpdateError copy(ToleratedUpdateError in);
}
private static final Coppier MAP_COPPIER = new Coppier() {
public ToleratedUpdateError copy(ToleratedUpdateError in) {
return ToleratedUpdateError.parseMap(in.getSimpleMap());
}
};
private static final Coppier METADATA_COPPIER = new Coppier() {
public ToleratedUpdateError copy(ToleratedUpdateError in) {
return ToleratedUpdateError.parseMetadataIfToleratedUpdateError
(in.getMetadataKey(), in.getMetadataValue());
}
};
}