SOLR-11881: Retry update requests from leaders to followers

This commit is contained in:
Tomas Fernandez Lobbe 2018-08-06 15:56:49 -07:00
parent ea2e564dad
commit c338cf61e7
7 changed files with 540 additions and 134 deletions

View File

@ -233,6 +233,8 @@ Optimizations
introduced (splitMethod=link) which uses hard-linking of index files when possible, resulting in introduced (splitMethod=link) which uses hard-linking of index files when possible, resulting in
significant speedups and reduced CPU / IO load on shard leader. (ab) significant speedups and reduced CPU / IO load on shard leader. (ab)
* SOLR-11881: Retry update requests sent by leaders to it's followers (Varun Thacker, Mark Miller, Tomás Fernández Löbbe)
Other Changes Other Changes
---------------------- ----------------------

View File

@ -17,12 +17,28 @@
package org.apache.solr.update; package org.apache.solr.update;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.net.ConnectException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import org.apache.http.HttpResponse; import org.apache.http.HttpResponse;
import org.apache.http.NoHttpResponseException;
import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.BinaryResponseParser; import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient; // jdoc import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient; // jdoc
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest; import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
@ -32,37 +48,21 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.Diagnostics; import org.apache.solr.core.Diagnostics;
import org.apache.solr.update.processor.DistributedUpdateProcessor; import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker;
import org.apache.solr.update.processor.DistributedUpdateProcessor.LeaderRequestReplicationTracker; import org.apache.solr.update.processor.DistributedUpdateProcessor.LeaderRequestReplicationTracker;
import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
/** /**
* Used for distributing commands from a shard leader to its replicas. * Used for distributing commands from a shard leader to its replicas.
*/ */
public class SolrCmdDistributor implements Closeable { public class SolrCmdDistributor implements Closeable {
private static final int MAX_RETRIES_ON_FORWARD = 25;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private StreamingSolrClients clients; private StreamingSolrClients clients;
private boolean finished = false; // see finish() private boolean finished = false; // see finish()
private int retryPause = 500; private int retryPause = 500;
private int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD;
private final List<Error> allErrors = new ArrayList<>(); private final List<Error> allErrors = new ArrayList<>();
private final List<Error> errors = Collections.synchronizedList(new ArrayList<Error>()); private final List<Error> errors = Collections.synchronizedList(new ArrayList<Error>());
@ -79,9 +79,8 @@ public class SolrCmdDistributor implements Closeable {
this.completionService = new ExecutorCompletionService<>(updateShardHandler.getUpdateExecutor()); this.completionService = new ExecutorCompletionService<>(updateShardHandler.getUpdateExecutor());
} }
public SolrCmdDistributor(StreamingSolrClients clients, int maxRetriesOnForward, int retryPause) { public SolrCmdDistributor(StreamingSolrClients clients, int retryPause) {
this.clients = clients; this.clients = clients;
this.maxRetriesOnForward = maxRetriesOnForward;
this.retryPause = retryPause; this.retryPause = retryPause;
completionService = new ExecutorCompletionService<>(clients.getUpdateExecutor()); completionService = new ExecutorCompletionService<>(clients.getUpdateExecutor());
} }
@ -108,54 +107,62 @@ public class SolrCmdDistributor implements Closeable {
errors.addAll(clients.getErrors()); errors.addAll(clients.getErrors());
List<Error> resubmitList = new ArrayList<>(); List<Error> resubmitList = new ArrayList<>();
if (log.isInfoEnabled() && errors.size() > 0) {
log.info("SolrCmdDistributor found {} errors", errors.size());
}
if (log.isDebugEnabled() && errors.size() > 0) {
StringBuilder builder = new StringBuilder("SolrCmdDistributor found:");
int maxErrorsToShow = 10;
for (Error e:errors) {
if (maxErrorsToShow-- <= 0) break;
builder.append("\n" + e);
}
if (errors.size() > 10) {
builder.append("\n... and ");
builder.append(errors.size() - 10);
builder.append(" more");
}
log.debug(builder.toString());
}
for (Error err : errors) { for (Error err : errors) {
try { try {
String oldNodeUrl = err.req.node.getUrl(); String oldNodeUrl = err.req.node.getUrl();
// if there is a retry url, we want to retry... /*
boolean isRetry = err.req.node.checkRetry(); * if this is a retryable request we may want to retry, depending on the error we received and
* the number of times we have already retried
boolean doRetry = false; */
int rspCode = err.statusCode; boolean isRetry = err.req.shouldRetry(err);
if (testing_errorHook != null) Diagnostics.call(testing_errorHook, if (testing_errorHook != null) Diagnostics.call(testing_errorHook,
err.e); err.e);
// this can happen in certain situations such as close // this can happen in certain situations such as close
if (isRetry) { if (isRetry) {
if (rspCode == 404 || rspCode == 403 || rspCode == 503) { err.req.retries++;
doRetry = true;
}
// if it's a connect exception, lets try again
if (err.e instanceof SolrServerException) {
if (((SolrServerException) err.e).getRootCause() instanceof ConnectException) {
doRetry = true;
}
}
if (err.e instanceof ConnectException) {
doRetry = true;
}
if (err.req.retries < maxRetriesOnForward && doRetry) {
err.req.retries++;
if (err.req.node instanceof ForwardNode) {
SolrException.log(SolrCmdDistributor.log, "forwarding update to " SolrException.log(SolrCmdDistributor.log, "forwarding update to "
+ oldNodeUrl + " failed - retrying ... retries: " + oldNodeUrl + " failed - retrying ... retries: "
+ err.req.retries + " " + err.req.cmd.toString() + " params:" + err.req.retries + "/" + err.req.node.getMaxRetries() + ". "
+ err.req.uReq.getParams() + " rsp:" + rspCode, err.e); + err.req.cmd.toString() + " params:"
try { + err.req.uReq.getParams() + " rsp:" + err.statusCode, err.e);
Thread.sleep(retryPause);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn(null, e);
}
resubmitList.add(err);
} else { } else {
allErrors.add(err); SolrException.log(SolrCmdDistributor.log, "FROMLEADER request to "
+ oldNodeUrl + " failed - retrying ... retries: "
+ err.req.retries + "/" + err.req.node.getMaxRetries() + ". "
+ err.req.cmd.toString() + " params:"
+ err.req.uReq.getParams() + " rsp:" + err.statusCode, err.e);
} }
try {
Thread.sleep(retryPause); //TODO: Do we want this wait for every error?
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn(null, e);
}
resubmitList.add(err);
} else { } else {
allErrors.add(err); allErrors.add(err);
} }
@ -184,6 +191,10 @@ public class SolrCmdDistributor implements Closeable {
RollupRequestReplicationTracker rollupTracker, RollupRequestReplicationTracker rollupTracker,
LeaderRequestReplicationTracker leaderTracker) throws IOException { LeaderRequestReplicationTracker leaderTracker) throws IOException {
if (!cmd.isDeleteById()) {
blockAndDoRetries(); // For DBQ, flush all writes before submitting
}
for (Node node : nodes) { for (Node node : nodes) {
UpdateRequest uReq = new UpdateRequest(); UpdateRequest uReq = new UpdateRequest();
uReq.setParams(params); uReq.setParams(params);
@ -193,7 +204,6 @@ public class SolrCmdDistributor implements Closeable {
} else { } else {
uReq.deleteByQuery(cmd.query); uReq.deleteByQuery(cmd.query);
} }
submit(new Req(cmd, node, uReq, sync, rollupTracker, leaderTracker), false); submit(new Req(cmd, node, uReq, sync, rollupTracker, leaderTracker), false);
} }
} }
@ -346,6 +356,16 @@ public class SolrCmdDistributor implements Closeable {
this.leaderTracker = leaderTracker; this.leaderTracker = leaderTracker;
} }
/**
* @return true if this request should be retried after receiving a particular error
* false otherwise
*/
public boolean shouldRetry(Error err) {
boolean isRetry = node.checkRetry(err);
isRetry &= uReq.getDeleteQuery() == null || uReq.getDeleteQuery().isEmpty(); //Don't retry DBQs
return isRetry && retries < node.getMaxRetries();
}
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("SolrCmdDistributor$Req: cmd=").append(cmd.toString()); sb.append("SolrCmdDistributor$Req: cmd=").append(cmd.toString());
@ -440,27 +460,36 @@ public class SolrCmdDistributor implements Closeable {
public static abstract class Node { public static abstract class Node {
public abstract String getUrl(); public abstract String getUrl();
public abstract boolean checkRetry(); public abstract boolean checkRetry(Error e);
public abstract String getCoreName(); public abstract String getCoreName();
public abstract String getBaseUrl(); public abstract String getBaseUrl();
public abstract ZkCoreNodeProps getNodeProps(); public abstract ZkCoreNodeProps getNodeProps();
public abstract String getCollection(); public abstract String getCollection();
public abstract String getShardId(); public abstract String getShardId();
public abstract int getMaxRetries();
} }
public static class StdNode extends Node { public static class StdNode extends Node {
protected ZkCoreNodeProps nodeProps; protected ZkCoreNodeProps nodeProps;
protected String collection; protected String collection;
protected String shardId; protected String shardId;
private final boolean retry;
private final int maxRetries;
public StdNode(ZkCoreNodeProps nodeProps) { public StdNode(ZkCoreNodeProps nodeProps) {
this(nodeProps, null, null); this(nodeProps, null, null, 0);
} }
public StdNode(ZkCoreNodeProps nodeProps, String collection, String shardId) { public StdNode(ZkCoreNodeProps nodeProps, String collection, String shardId) {
this(nodeProps, collection, shardId, 0);
}
public StdNode(ZkCoreNodeProps nodeProps, String collection, String shardId, int maxRetries) {
this.nodeProps = nodeProps; this.nodeProps = nodeProps;
this.collection = collection; this.collection = collection;
this.shardId = shardId; this.shardId = shardId;
this.retry = maxRetries > 0;
this.maxRetries = maxRetries;
} }
public String getCollection() { public String getCollection() {
@ -482,10 +511,34 @@ public class SolrCmdDistributor implements Closeable {
} }
@Override @Override
public boolean checkRetry() { public boolean checkRetry(Error err) {
if (!retry) return false;
if (err.statusCode == 404 || err.statusCode == 403 || err.statusCode == 503) {
return true;
}
// if it's a connect exception, lets try again
if (err.e instanceof SolrServerException) {
if (isRetriableException(((SolrServerException) err.e).getRootCause())) {
return true;
}
} else {
if (isRetriableException(err.e)) {
return true;
}
}
return false; return false;
} }
/**
* @return true if Solr should retry in case of hitting this exception
* false otherwise
*/
private boolean isRetriableException(Throwable t) {
return t instanceof SocketException || t instanceof NoHttpResponseException || t instanceof SocketTimeoutException;
}
@Override @Override
public String getBaseUrl() { public String getBaseUrl() {
return nodeProps.getBaseUrl(); return nodeProps.getBaseUrl();
@ -506,6 +559,8 @@ public class SolrCmdDistributor implements Closeable {
result = prime * result + ((baseUrl == null) ? 0 : baseUrl.hashCode()); result = prime * result + ((baseUrl == null) ? 0 : baseUrl.hashCode());
result = prime * result + ((coreName == null) ? 0 : coreName.hashCode()); result = prime * result + ((coreName == null) ? 0 : coreName.hashCode());
result = prime * result + ((url == null) ? 0 : url.hashCode()); result = prime * result + ((url == null) ? 0 : url.hashCode());
result = prime * result + Boolean.hashCode(retry);
result = prime * result + Integer.hashCode(maxRetries);
return result; return result;
} }
@ -515,6 +570,8 @@ public class SolrCmdDistributor implements Closeable {
if (obj == null) return false; if (obj == null) return false;
if (getClass() != obj.getClass()) return false; if (getClass() != obj.getClass()) return false;
StdNode other = (StdNode) obj; StdNode other = (StdNode) obj;
if (this.retry != other.retry) return false;
if (this.maxRetries != other.maxRetries) return false;
String baseUrl = nodeProps.getBaseUrl(); String baseUrl = nodeProps.getBaseUrl();
String coreName = nodeProps.getCoreName(); String coreName = nodeProps.getCoreName();
String url = nodeProps.getCoreUrl(); String url = nodeProps.getCoreUrl();
@ -534,39 +591,56 @@ public class SolrCmdDistributor implements Closeable {
public ZkCoreNodeProps getNodeProps() { public ZkCoreNodeProps getNodeProps() {
return nodeProps; return nodeProps;
} }
@Override
public int getMaxRetries() {
return this.maxRetries;
}
} }
// RetryNodes are used in the case of 'forward to leader' where we want // RetryNodes are used in the case of 'forward to leader' where we want
// to try the latest leader on a fail in the case the leader just went down. // to try the latest leader on a fail in the case the leader just went down.
public static class RetryNode extends StdNode { public static class ForwardNode extends StdNode {
private ZkStateReader zkStateReader; private ZkStateReader zkStateReader;
public RetryNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId) { public ForwardNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId, int maxRetries) {
super(nodeProps, collection, shardId); super(nodeProps, collection, shardId, maxRetries);
this.zkStateReader = zkStateReader; this.zkStateReader = zkStateReader;
this.collection = collection; this.collection = collection;
this.shardId = shardId; this.shardId = shardId;
} }
@Override @Override
public boolean checkRetry() { public boolean checkRetry(Error err) {
ZkCoreNodeProps leaderProps; boolean doRetry = false;
try { if (err.statusCode == 404 || err.statusCode == 403 || err.statusCode == 503) {
leaderProps = new ZkCoreNodeProps(zkStateReader.getLeaderRetry( doRetry = true;
collection, shardId));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
} catch (Exception e) {
// we retry with same info
log.warn(null, e);
return true;
} }
this.nodeProps = leaderProps; // if it's a connect exception, lets try again
if (err.e instanceof SolrServerException && ((SolrServerException) err.e).getRootCause() instanceof ConnectException) {
doRetry = true;
} else if (err.e instanceof ConnectException) {
doRetry = true;
}
if (doRetry) {
ZkCoreNodeProps leaderProps;
try {
leaderProps = new ZkCoreNodeProps(zkStateReader.getLeaderRetry(
collection, shardId));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
} catch (Exception e) {
// we retry with same info
log.warn(null, e);
return true;
}
return true; this.nodeProps = leaderProps;
}
return doRetry;
} }
@Override @Override
@ -584,7 +658,7 @@ public class SolrCmdDistributor implements Closeable {
if (this == obj) return true; if (this == obj) return true;
if (!super.equals(obj)) return false; if (!super.equals(obj)) return false;
if (getClass() != obj.getClass()) return false; if (getClass() != obj.getClass()) return false;
RetryNode other = (RetryNode) obj; ForwardNode other = (ForwardNode) obj;
if (nodeProps.getCoreUrl() == null) { if (nodeProps.getCoreUrl() == null) {
if (other.nodeProps.getCoreUrl() != null) return false; if (other.nodeProps.getCoreUrl() != null) return false;
} else if (!nodeProps.getCoreUrl().equals(other.nodeProps.getCoreUrl())) return false; } else if (!nodeProps.getCoreUrl().equals(other.nodeProps.getCoreUrl())) return false;

View File

@ -143,7 +143,6 @@ class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClien
@Override @Override
public void handleError(Throwable ex) { public void handleError(Throwable ex) {
req.trackRequestResult(null, false);
log.error("error", ex); log.error("error", ex);
Error error = new Error(); Error error = new Error();
error.e = (Exception) ex; error.e = (Exception) ex;
@ -152,6 +151,10 @@ class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClien
} }
error.req = req; error.req = req;
errors.add(error); errors.add(error);
if (!req.shouldRetry(error)) {
// only track the error if we are not retrying the request
req.trackRequestResult(null, false);
}
} }
@Override @Override
public void onSuccess(HttpResponse resp) { public void onSuccess(HttpResponse resp) {

View File

@ -83,7 +83,7 @@ import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.SolrCmdDistributor; import org.apache.solr.update.SolrCmdDistributor;
import org.apache.solr.update.SolrCmdDistributor.Error; import org.apache.solr.update.SolrCmdDistributor.Error;
import org.apache.solr.update.SolrCmdDistributor.Node; import org.apache.solr.update.SolrCmdDistributor.Node;
import org.apache.solr.update.SolrCmdDistributor.RetryNode; import org.apache.solr.update.SolrCmdDistributor.ForwardNode;
import org.apache.solr.update.SolrCmdDistributor.StdNode; import org.apache.solr.update.SolrCmdDistributor.StdNode;
import org.apache.solr.update.SolrIndexSplitter; import org.apache.solr.update.SolrIndexSplitter;
import org.apache.solr.update.UpdateCommand; import org.apache.solr.update.UpdateCommand;
@ -113,6 +113,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private static final String TEST_DISTRIB_SKIP_SERVERS = "test.distrib.skip.servers"; private static final String TEST_DISTRIB_SKIP_SERVERS = "test.distrib.skip.servers";
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
* Request forwarded to a leader of a different shard will be retried up to this amount of times by default
*/
static final int MAX_RETRIES_ON_FORWARD_DEAULT = 25;
/**
* Requests from leader to it's followers will be retried this amount of times by default
*/
static final int MAX_RETRIES_TO_FOLLOWERS_DEFAULT = 3;
/** /**
* Values this processor supports for the <code>DISTRIB_UPDATE_PARAM</code>. * Values this processor supports for the <code>DISTRIB_UPDATE_PARAM</code>.
* This is an implementation detail exposed solely for tests. * This is an implementation detail exposed solely for tests.
@ -176,6 +186,15 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private Set<String> skippedCoreNodeNames; private Set<String> skippedCoreNodeNames;
private boolean isIndexChanged = false; private boolean isIndexChanged = false;
/**
* Number of times requests forwarded to some other shard's leader can be retried
*/
private final int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD_DEAULT;
/**
* Number of times requests from leaders to followers can be retried
*/
private final int maxRetriesToFollowers = MAX_RETRIES_TO_FOLLOWERS_DEFAULT;
private UpdateCommand updateCommand; // the current command this processor is working on. private UpdateCommand updateCommand; // the current command this processor is working on.
//used for keeping track of replicas that have processed an add/update from the leader //used for keeping track of replicas that have processed an add/update from the leader
@ -368,7 +387,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
} else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) || replica.getState() == Replica.State.DOWN) { } else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) || replica.getState() == Replica.State.DOWN) {
skippedCoreNodeNames.add(replica.getName()); skippedCoreNodeNames.add(replica.getName());
} else { } else {
nodes.add(new StdNode(new ZkCoreNodeProps(replica), collection, shardId)); nodes.add(new StdNode(new ZkCoreNodeProps(replica), collection, shardId, maxRetriesToFollowers));
} }
} }
return nodes; return nodes;
@ -377,7 +396,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// I need to forward on to the leader... // I need to forward on to the leader...
forwardToLeader = true; forwardToLeader = true;
return Collections.singletonList( return Collections.singletonList(
new RetryNode(new ZkCoreNodeProps(leaderReplica), zkController.getZkStateReader(), collection, shardId)); new ForwardNode(new ZkCoreNodeProps(leaderReplica), zkController.getZkStateReader(), collection, shardId, maxRetriesOnForward));
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -779,7 +798,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
Set<String> replicasShouldBeInLowerTerms = new HashSet<>(); Set<String> replicasShouldBeInLowerTerms = new HashSet<>();
for (final SolrCmdDistributor.Error error : errors) { for (final SolrCmdDistributor.Error error : errors) {
if (error.req.node instanceof RetryNode) { if (error.req.node instanceof ForwardNode) {
// if it's a forward, any fail is a problem - // if it's a forward, any fail is a problem -
// otherwise we assume things are fine if we got it locally // otherwise we assume things are fine if we got it locally
// until we start allowing min replication param // until we start allowing min replication param
@ -1538,7 +1557,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// don't forward to ourself // don't forward to ourself
leaderForAnyShard = true; leaderForAnyShard = true;
} else { } else {
leaders.add(new RetryNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName)); leaders.add(new ForwardNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName, maxRetriesOnForward));
} }
} }
@ -2100,7 +2119,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private int achievedRf = Integer.MAX_VALUE; private int achievedRf = Integer.MAX_VALUE;
private final int requestedRf; private final int requestedRf;
RollupRequestReplicationTracker(String minRepFact) { public RollupRequestReplicationTracker(String minRepFact) {
try { try {
this.requestedRf = Integer.parseInt(minRepFact); this.requestedRf = Integer.parseInt(minRepFact);
} catch (NumberFormatException nfe) { } catch (NumberFormatException nfe) {
@ -2153,7 +2172,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return requestedRf; return requestedRf;
} }
LeaderRequestReplicationTracker(String shardId, int requestedRf) { public LeaderRequestReplicationTracker(String shardId, int requestedRf) {
this.requestedRf = requestedRf; this.requestedRf = requestedRf;
this.myShardId = shardId; this.myShardId = shardId;
} }
@ -2165,8 +2184,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
public void trackRequestResult(Node node, boolean success) { public void trackRequestResult(Node node, boolean success) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("trackRequestResult(" + node + "): success? " + success + log.debug("trackRequestResult({}): success? {}, shardId={}", node, success, myShardId);
", shardId=" + myShardId);
} }
if (success) { if (success) {

View File

@ -401,8 +401,8 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
"No 'leader' replica available for shard " + slice.getName() + " of collection " + collection); "No 'leader' replica available for shard " + slice.getName() + " of collection " + collection);
} }
return new SolrCmdDistributor.RetryNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(), return new SolrCmdDistributor.ForwardNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(),
collection, slice.getName()); collection, slice.getName(), DistributedUpdateProcessor.MAX_RETRIES_ON_FORWARD_DEAULT);
} }
} }

View File

@ -16,19 +16,19 @@
*/ */
package org.apache.solr.update; package org.apache.solr.update;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketException;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketException;
public class MockStreamingSolrClients extends StreamingSolrClients { public class MockStreamingSolrClients extends StreamingSolrClients {
public enum Exp {CONNECT_EXCEPTION, SOCKET_EXCEPTION}; public enum Exp {CONNECT_EXCEPTION, SOCKET_EXCEPTION, BAD_REQUEST};
private volatile Exp exp = null; private volatile Exp exp = null;
@ -46,12 +46,14 @@ public class MockStreamingSolrClients extends StreamingSolrClients {
this.exp = exp; this.exp = exp;
} }
private IOException exception() { private Exception exception() {
switch (exp) { switch (exp) {
case CONNECT_EXCEPTION: case CONNECT_EXCEPTION:
return new ConnectException(); return new ConnectException();
case SOCKET_EXCEPTION: case SOCKET_EXCEPTION:
return new SocketException(); return new SocketException();
case BAD_REQUEST:
return new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Bad Request");
default: default:
break; break;
} }
@ -70,10 +72,17 @@ public class MockStreamingSolrClients extends StreamingSolrClients {
public NamedList<Object> request(SolrRequest request, String collection) public NamedList<Object> request(SolrRequest request, String collection)
throws SolrServerException, IOException { throws SolrServerException, IOException {
if (exp != null) { if (exp != null) {
if (LuceneTestCase.random().nextBoolean()) { Exception e = exception();
throw exception(); if (e instanceof IOException) {
if (LuceneTestCase.random().nextBoolean()) {
throw (IOException)e;
} else {
throw new SolrServerException(e);
}
} else if (e instanceof SolrServerException) {
throw (SolrServerException)e;
} else { } else {
throw new SolrServerException(exception()); throw new SolrServerException(e);
} }
} }

View File

@ -16,15 +16,15 @@
*/ */
package org.apache.solr.update; package org.apache.solr.update;
import javax.xml.parsers.ParserConfigurationException;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.SocketException;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.solr.BaseDistributedSearchTestCase; import org.apache.solr.BaseDistributedSearchTestCase;
import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrQuery;
@ -34,6 +34,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.LukeRequest; import org.apache.solr.client.solrj.request.LukeRequest;
import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
@ -47,10 +48,12 @@ import org.apache.solr.index.LogDocMergePolicyFactory;
import org.apache.solr.search.SolrIndexSearcher; import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.MockStreamingSolrClients.Exp; import org.apache.solr.update.MockStreamingSolrClients.Exp;
import org.apache.solr.update.SolrCmdDistributor.Error; import org.apache.solr.update.SolrCmdDistributor.Error;
import org.apache.solr.update.SolrCmdDistributor.ForwardNode;
import org.apache.solr.update.SolrCmdDistributor.Node; import org.apache.solr.update.SolrCmdDistributor.Node;
import org.apache.solr.update.SolrCmdDistributor.RetryNode;
import org.apache.solr.update.SolrCmdDistributor.StdNode; import org.apache.solr.update.SolrCmdDistributor.StdNode;
import org.apache.solr.update.processor.DistributedUpdateProcessor; import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.DistributedUpdateProcessor.LeaderRequestReplicationTracker;
import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -59,6 +62,8 @@ import org.xml.sax.SAXException;
// See: https://issues.apache.org/jira/browse/SOLR-12028 Tests cannot remove files on Windows machines occasionally // See: https://issues.apache.org/jira/browse/SOLR-12028 Tests cannot remove files on Windows machines occasionally
public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
private static enum NodeType {FORWARD, STANDARD};
private AtomicInteger id = new AtomicInteger(); private AtomicInteger id = new AtomicInteger();
@BeforeClass @BeforeClass
@ -127,6 +132,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
shards = sb.toString(); shards = sb.toString();
} }
@SuppressWarnings("unchecked")
@Test @Test
@ShardsFixed(num = 4) @ShardsFixed(num = 4)
public void test() throws Exception { public void test() throws Exception {
@ -325,30 +331,176 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
} }
} }
testMaxRetries(); testMaxRetries(NodeType.FORWARD);
testOneRetry(); testMaxRetries(NodeType.STANDARD);
testOneRetry(NodeType.FORWARD);
testOneRetry(NodeType.STANDARD);
testRetryNodeAgainstBadAddress(); testRetryNodeAgainstBadAddress();
testRetryNodeWontRetrySocketError(); testStdNodeRetriesSocketError();
testForwardNodeWontRetrySocketError();
testNodeWontRetryBadRequest(NodeType.FORWARD);
testNodeWontRetryBadRequest(NodeType.STANDARD);
testMinRfOnRetries(NodeType.FORWARD);
testMinRfOnRetries(NodeType.STANDARD);
testDistribOpenSearcher(); testDistribOpenSearcher();
testReqShouldRetryNoRetries();
testReqShouldRetryMaxRetries();
testReqShouldRetryBadRequest();
testReqShouldRetryNotFound();
testReqShouldRetryDBQ();
testDeletes(false, true);
testDeletes(false, false);
testDeletes(true, true);
testDeletes(true, false);
} }
private void testMaxRetries() throws IOException { private void testDeletes(boolean dbq, boolean withFailures) throws Exception {
final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
solrclient.commit(true, true);
long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler); final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 5, 0)) { try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
if (withFailures) {
streamingClients.setExp(Exp.CONNECT_EXCEPTION);
}
ArrayList<Node> nodes = new ArrayList<>();
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
ZkStateReader.CORE_NAME_PROP, "");
final AtomicInteger retries = new AtomicInteger();
nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
Node retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
@Override
public boolean checkRetry(Error err) {
streamingClients.setExp(null);
retries.incrementAndGet();
return super.checkRetry(err);
}
};
nodes.add(retryNode);
for (int i = 0 ; i < 5 ; i++) {
AddUpdateCommand cmd = new AddUpdateCommand(null);
int currentId = id.incrementAndGet();
cmd.solrDoc = sdoc("id", currentId);
ModifiableSolrParams params = new ModifiableSolrParams();
cmdDistrib.distribAdd(cmd, nodes, params);
DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null);
if (dbq) {
dcmd.setQuery("id:" + currentId);
} else {
dcmd.setId(String.valueOf(currentId));
}
cmdDistrib.distribDelete(dcmd, nodes, params, false, null, null);
}
CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
cmdDistrib.distribCommit(ccmd, nodes, new ModifiableSolrParams());
cmdDistrib.finish();
int expectedRetryCount = 0;
if (withFailures) {
if (dbq) {
expectedRetryCount = 1; // just the first cmd would be retried
} else {
expectedRetryCount = 10;
}
}
assertEquals(expectedRetryCount, retries.get());
long numFoundAfter = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
// we will get java.net.ConnectException which we retry on
assertEquals(numFoundBefore, numFoundAfter);
assertEquals(0, cmdDistrib.getErrors().size());
}
}
private void testMinRfOnRetries(NodeType nodeType) throws Exception {
final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
streamingClients.setExp(Exp.CONNECT_EXCEPTION);
ArrayList<Node> nodes = new ArrayList<>();
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
ZkStateReader.CORE_NAME_PROP, "");
final AtomicInteger retries = new AtomicInteger();
nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
if (nodeType == NodeType.FORWARD) {
nodes.add(new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
@Override
public boolean checkRetry(Error err) {
if (retries.incrementAndGet() >= 3) {
streamingClients.setExp(null);
}
return super.checkRetry(err);
}
});
} else {
nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
@Override
public boolean checkRetry(Error err) {
if (retries.incrementAndGet() >= 3) {
streamingClients.setExp(null);
}
return super.checkRetry(err);
}
});
}
AddUpdateCommand cmd = new AddUpdateCommand(null);
cmd.solrDoc = sdoc("id", id.incrementAndGet());
ModifiableSolrParams params = new ModifiableSolrParams();
RollupRequestReplicationTracker rollupReqTracker = new RollupRequestReplicationTracker("2");
LeaderRequestReplicationTracker leaderReqTracker = new LeaderRequestReplicationTracker("shard1", 2);
cmdDistrib.distribAdd(cmd, nodes, params, false, rollupReqTracker, leaderReqTracker);
cmdDistrib.finish();
assertEquals(3, retries.get());
assertEquals(2, leaderReqTracker.getAchievedRf());// "2" here is because one would be the leader, that creates the instance of LeaderRequestReplicationTracker, the second one is the node
assertEquals(0, cmdDistrib.getErrors().size());
}
}
private void testMaxRetries(NodeType nodeType) throws IOException {
final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
streamingClients.setExp(Exp.CONNECT_EXCEPTION); streamingClients.setExp(Exp.CONNECT_EXCEPTION);
ArrayList<Node> nodes = new ArrayList<>(); ArrayList<Node> nodes = new ArrayList<>();
final HttpSolrClient solrclient1 = (HttpSolrClient) clients.get(0); final HttpSolrClient solrclient1 = (HttpSolrClient) clients.get(0);
final AtomicInteger retries = new AtomicInteger(); final AtomicInteger retries = new AtomicInteger();
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient1.getBaseURL(), ZkStateReader.CORE_NAME_PROP, ""); ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient1.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") { Node retryNode;
@Override if (nodeType == NodeType.FORWARD) {
public boolean checkRetry() { retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 6) {
retries.incrementAndGet(); @Override
return true; public boolean checkRetry(Error err) {
} retries.incrementAndGet();
}; return super.checkRetry(err);
}
};
} else {
retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 6) {
@Override
public boolean checkRetry(Error err) {
retries.incrementAndGet();
return super.checkRetry(err);
}
};
}
nodes.add(retryNode); nodes.add(retryNode);
@ -359,18 +511,61 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
cmdDistrib.distribAdd(cmd, nodes, params); cmdDistrib.distribAdd(cmd, nodes, params);
cmdDistrib.finish(); cmdDistrib.finish();
assertEquals(6, retries.get()); assertEquals(7, retries.get());
assertEquals(1, cmdDistrib.getErrors().size()); assertEquals(1, cmdDistrib.getErrors().size());
} }
} }
private void testOneRetry() throws Exception { private void testReqShouldRetryNoRetries() {
Error err = getError(new SocketException());
SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 0), new UpdateRequest(), true);
assertFalse(req.shouldRetry(err));
}
private void testReqShouldRetryDBQ() {
Error err = getError(new SocketException());
UpdateRequest dbqReq = new UpdateRequest();
dbqReq.deleteByQuery("*:*");
SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), dbqReq, true);
assertFalse(req.shouldRetry(err));
}
private void testReqShouldRetryMaxRetries() {
Error err = getError(new SocketException());
SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), new UpdateRequest(), true);
assertTrue(req.shouldRetry(err));
req.retries++;
assertFalse(req.shouldRetry(err));
}
private void testReqShouldRetryBadRequest() {
Error err = getError(new SolrException(SolrException.ErrorCode.BAD_REQUEST, "bad request"));
SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), new UpdateRequest(), true);
assertFalse(req.shouldRetry(err));
}
private void testReqShouldRetryNotFound() {
Error err = getError(new SolrException(SolrException.ErrorCode.NOT_FOUND, "not found"));
SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), new UpdateRequest(), true);
assertTrue(req.shouldRetry(err));
}
private Error getError(Exception e) {
Error err = new Error();
err.e = e;
if (e instanceof SolrException) {
err.statusCode = ((SolrException)e).code();
}
return err;
}
private void testOneRetry(NodeType nodeType) throws Exception {
final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0); final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults() long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound(); .getNumFound();
final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler); final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 5, 0)) { try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
streamingClients.setExp(Exp.CONNECT_EXCEPTION); streamingClients.setExp(Exp.CONNECT_EXCEPTION);
ArrayList<Node> nodes = new ArrayList<>(); ArrayList<Node> nodes = new ArrayList<>();
@ -379,14 +574,26 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
final AtomicInteger retries = new AtomicInteger(); final AtomicInteger retries = new AtomicInteger();
nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, ""); nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") { Node retryNode;
@Override if (nodeType == NodeType.FORWARD) {
public boolean checkRetry() { retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
streamingClients.setExp(null); @Override
retries.incrementAndGet(); public boolean checkRetry(Error err) {
return true; streamingClients.setExp(null);
} retries.incrementAndGet();
}; return super.checkRetry(err);
}
};
} else {
retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
@Override
public boolean checkRetry(Error err) {
streamingClients.setExp(null);
retries.incrementAndGet();
return super.checkRetry(err);
}
};
}
nodes.add(retryNode); nodes.add(retryNode);
@ -412,12 +619,70 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
} }
} }
private void testRetryNodeWontRetrySocketError() throws Exception { private void testNodeWontRetryBadRequest(NodeType nodeType) throws Exception {
ignoreException("Bad Request");
final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0); final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults() long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound(); .getNumFound();
final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler); final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 5, 0)) { try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
streamingClients.setExp(Exp.BAD_REQUEST);
ArrayList<Node> nodes = new ArrayList<>();
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
ZkStateReader.CORE_NAME_PROP, "");
final AtomicInteger retries = new AtomicInteger();
Node retryNode;
if (nodeType == NodeType.FORWARD) {
retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
@Override
public boolean checkRetry(Error err) {
retries.incrementAndGet();
return super.checkRetry(err);
}
};
} else {
retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
@Override
public boolean checkRetry(Error err) {
retries.incrementAndGet();
return super.checkRetry(err);
}
};
}
nodes.add(retryNode);
AddUpdateCommand cmd = new AddUpdateCommand(null);
cmd.solrDoc = sdoc("id", id.incrementAndGet());
ModifiableSolrParams params = new ModifiableSolrParams();
CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
cmdDistrib.distribAdd(cmd, nodes, params);
streamingClients.setExp(null);
cmdDistrib.distribCommit(ccmd, nodes, params);
cmdDistrib.finish();
// it will checkRetry, but not actually do it...
assertEquals(1, retries.get());
long numFoundAfter = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
// we will get java.net.SocketException: Network is unreachable, which we don't retry on
assertEquals(numFoundBefore, numFoundAfter);
assertEquals(1, cmdDistrib.getErrors().size());
unIgnoreException("Bad Request");
}
}
private void testForwardNodeWontRetrySocketError() throws Exception {
final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
streamingClients.setExp(Exp.SOCKET_EXCEPTION); streamingClients.setExp(Exp.SOCKET_EXCEPTION);
ArrayList<Node> nodes = new ArrayList<>(); ArrayList<Node> nodes = new ArrayList<>();
@ -426,11 +691,11 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
final AtomicInteger retries = new AtomicInteger(); final AtomicInteger retries = new AtomicInteger();
nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, ""); nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") { ForwardNode retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
@Override @Override
public boolean checkRetry() { public boolean checkRetry(Error err) {
retries.incrementAndGet(); retries.incrementAndGet();
return true; return super.checkRetry(err);
} }
}; };
@ -461,6 +726,41 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
} }
} }
private void testStdNodeRetriesSocketError() throws Exception {
final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
streamingClients.setExp(Exp.SOCKET_EXCEPTION);
ArrayList<Node> nodes = new ArrayList<>();
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
ZkStateReader.CORE_NAME_PROP, "");
final AtomicInteger retries = new AtomicInteger();
nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
Node retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
@Override
public boolean checkRetry(Error err) {
retries.incrementAndGet();
return super.checkRetry(err);
}
};
nodes.add(retryNode);
AddUpdateCommand cmd = new AddUpdateCommand(null);
cmd.solrDoc = sdoc("id", id.incrementAndGet());
ModifiableSolrParams params = new ModifiableSolrParams();
cmdDistrib.distribAdd(cmd, nodes, params);
cmdDistrib.finish();
// it will checkRetry, but not actually do it...
assertEquals(6, retries.get());
}
}
private void testRetryNodeAgainstBadAddress() throws SolrServerException, IOException { private void testRetryNodeAgainstBadAddress() throws SolrServerException, IOException {
// Test RetryNode // Test RetryNode
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) { try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) {
@ -471,14 +771,14 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
ArrayList<Node> nodes = new ArrayList<>(); ArrayList<Node> nodes = new ArrayList<>();
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, "[ff01::114]:33332" + context, ZkStateReader.CORE_NAME_PROP, ""); ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, "[ff01::114]:33332" + context, ZkStateReader.CORE_NAME_PROP, "");
RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") { ForwardNode retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
@Override @Override
public boolean checkRetry() { public boolean checkRetry(Error err) {
ZkNodeProps leaderProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkNodeProps leaderProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
ZkStateReader.CORE_NAME_PROP, ""); ZkStateReader.CORE_NAME_PROP, "");
this.nodeProps = new ZkCoreNodeProps(leaderProps); this.nodeProps = new ZkCoreNodeProps(leaderProps);
return true; return super.checkRetry(err);
} }
}; };