From c338cf61e7baba4908c31e02beda47ae3e201752 Mon Sep 17 00:00:00 2001 From: Tomas Fernandez Lobbe Date: Mon, 6 Aug 2018 15:56:49 -0700 Subject: [PATCH] SOLR-11881: Retry update requests from leaders to followers --- solr/CHANGES.txt | 2 + .../solr/update/SolrCmdDistributor.java | 230 +++++++---- .../solr/update/StreamingSolrClients.java | 5 +- .../processor/DistributedUpdateProcessor.java | 36 +- .../TimeRoutedAliasUpdateProcessor.java | 4 +- .../solr/update/MockStreamingSolrClients.java | 27 +- .../solr/update/SolrCmdDistributorTest.java | 370 ++++++++++++++++-- 7 files changed, 540 insertions(+), 134 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 605e8375cb8..5fb1f884282 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -233,6 +233,8 @@ Optimizations introduced (splitMethod=link) which uses hard-linking of index files when possible, resulting in significant speedups and reduced CPU / IO load on shard leader. (ab) +* SOLR-11881: Retry update requests sent by leaders to it's followers (Varun Thacker, Mark Miller, Tomás Fernández Löbbe) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java index 80e22532845..d5aafeca49e 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java +++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java @@ -17,12 +17,28 @@ package org.apache.solr.update; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.lang.invoke.MethodHandles; +import java.net.ConnectException; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; import org.apache.http.HttpResponse; +import org.apache.http.NoHttpResponseException; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.BinaryResponseParser; -import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient; // jdoc +import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.AbstractUpdateRequest; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.common.SolrException; @@ -32,37 +48,21 @@ import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.Diagnostics; import org.apache.solr.update.processor.DistributedUpdateProcessor; -import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker; import org.apache.solr.update.processor.DistributedUpdateProcessor.LeaderRequestReplicationTracker; +import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.lang.invoke.MethodHandles; -import java.net.ConnectException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; - /** * Used for distributing commands from a shard leader to its replicas. */ public class SolrCmdDistributor implements Closeable { - private static final int MAX_RETRIES_ON_FORWARD = 25; private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private StreamingSolrClients clients; private boolean finished = false; // see finish() private int retryPause = 500; - private int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD; private final List allErrors = new ArrayList<>(); private final List errors = Collections.synchronizedList(new ArrayList()); @@ -79,9 +79,8 @@ public class SolrCmdDistributor implements Closeable { this.completionService = new ExecutorCompletionService<>(updateShardHandler.getUpdateExecutor()); } - public SolrCmdDistributor(StreamingSolrClients clients, int maxRetriesOnForward, int retryPause) { + public SolrCmdDistributor(StreamingSolrClients clients, int retryPause) { this.clients = clients; - this.maxRetriesOnForward = maxRetriesOnForward; this.retryPause = retryPause; completionService = new ExecutorCompletionService<>(clients.getUpdateExecutor()); } @@ -107,55 +106,63 @@ public class SolrCmdDistributor implements Closeable { List errors = new ArrayList<>(this.errors); errors.addAll(clients.getErrors()); List resubmitList = new ArrayList<>(); + + if (log.isInfoEnabled() && errors.size() > 0) { + log.info("SolrCmdDistributor found {} errors", errors.size()); + } + + if (log.isDebugEnabled() && errors.size() > 0) { + StringBuilder builder = new StringBuilder("SolrCmdDistributor found:"); + int maxErrorsToShow = 10; + for (Error e:errors) { + if (maxErrorsToShow-- <= 0) break; + builder.append("\n" + e); + } + if (errors.size() > 10) { + builder.append("\n... and "); + builder.append(errors.size() - 10); + builder.append(" more"); + } + log.debug(builder.toString()); + } for (Error err : errors) { try { String oldNodeUrl = err.req.node.getUrl(); - // if there is a retry url, we want to retry... - boolean isRetry = err.req.node.checkRetry(); - - boolean doRetry = false; - int rspCode = err.statusCode; + /* + * if this is a retryable request we may want to retry, depending on the error we received and + * the number of times we have already retried + */ + boolean isRetry = err.req.shouldRetry(err); if (testing_errorHook != null) Diagnostics.call(testing_errorHook, err.e); // this can happen in certain situations such as close if (isRetry) { - if (rspCode == 404 || rspCode == 403 || rspCode == 503) { - doRetry = true; - } - - // if it's a connect exception, lets try again - if (err.e instanceof SolrServerException) { - if (((SolrServerException) err.e).getRootCause() instanceof ConnectException) { - doRetry = true; - } - } - - if (err.e instanceof ConnectException) { - doRetry = true; - } - - if (err.req.retries < maxRetriesOnForward && doRetry) { - err.req.retries++; - + err.req.retries++; + + if (err.req.node instanceof ForwardNode) { SolrException.log(SolrCmdDistributor.log, "forwarding update to " + oldNodeUrl + " failed - retrying ... retries: " - + err.req.retries + " " + err.req.cmd.toString() + " params:" - + err.req.uReq.getParams() + " rsp:" + rspCode, err.e); - try { - Thread.sleep(retryPause); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.warn(null, e); - } - - resubmitList.add(err); + + err.req.retries + "/" + err.req.node.getMaxRetries() + ". " + + err.req.cmd.toString() + " params:" + + err.req.uReq.getParams() + " rsp:" + err.statusCode, err.e); } else { - allErrors.add(err); + SolrException.log(SolrCmdDistributor.log, "FROMLEADER request to " + + oldNodeUrl + " failed - retrying ... retries: " + + err.req.retries + "/" + err.req.node.getMaxRetries() + ". " + + err.req.cmd.toString() + " params:" + + err.req.uReq.getParams() + " rsp:" + err.statusCode, err.e); } + try { + Thread.sleep(retryPause); //TODO: Do we want this wait for every error? + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn(null, e); + } + resubmitList.add(err); } else { allErrors.add(err); } @@ -184,6 +191,10 @@ public class SolrCmdDistributor implements Closeable { RollupRequestReplicationTracker rollupTracker, LeaderRequestReplicationTracker leaderTracker) throws IOException { + if (!cmd.isDeleteById()) { + blockAndDoRetries(); // For DBQ, flush all writes before submitting + } + for (Node node : nodes) { UpdateRequest uReq = new UpdateRequest(); uReq.setParams(params); @@ -193,7 +204,6 @@ public class SolrCmdDistributor implements Closeable { } else { uReq.deleteByQuery(cmd.query); } - submit(new Req(cmd, node, uReq, sync, rollupTracker, leaderTracker), false); } } @@ -346,6 +356,16 @@ public class SolrCmdDistributor implements Closeable { this.leaderTracker = leaderTracker; } + /** + * @return true if this request should be retried after receiving a particular error + * false otherwise + */ + public boolean shouldRetry(Error err) { + boolean isRetry = node.checkRetry(err); + isRetry &= uReq.getDeleteQuery() == null || uReq.getDeleteQuery().isEmpty(); //Don't retry DBQs + return isRetry && retries < node.getMaxRetries(); + } + public String toString() { StringBuilder sb = new StringBuilder(); sb.append("SolrCmdDistributor$Req: cmd=").append(cmd.toString()); @@ -440,27 +460,36 @@ public class SolrCmdDistributor implements Closeable { public static abstract class Node { public abstract String getUrl(); - public abstract boolean checkRetry(); + public abstract boolean checkRetry(Error e); public abstract String getCoreName(); public abstract String getBaseUrl(); public abstract ZkCoreNodeProps getNodeProps(); public abstract String getCollection(); public abstract String getShardId(); + public abstract int getMaxRetries(); } public static class StdNode extends Node { protected ZkCoreNodeProps nodeProps; protected String collection; protected String shardId; + private final boolean retry; + private final int maxRetries; public StdNode(ZkCoreNodeProps nodeProps) { - this(nodeProps, null, null); + this(nodeProps, null, null, 0); } - public StdNode(ZkCoreNodeProps nodeProps, String collection, String shardId) { + public StdNode(ZkCoreNodeProps nodeProps, String collection, String shardId) { + this(nodeProps, collection, shardId, 0); + } + + public StdNode(ZkCoreNodeProps nodeProps, String collection, String shardId, int maxRetries) { this.nodeProps = nodeProps; this.collection = collection; this.shardId = shardId; + this.retry = maxRetries > 0; + this.maxRetries = maxRetries; } public String getCollection() { @@ -482,9 +511,33 @@ public class SolrCmdDistributor implements Closeable { } @Override - public boolean checkRetry() { + public boolean checkRetry(Error err) { + if (!retry) return false; + + if (err.statusCode == 404 || err.statusCode == 403 || err.statusCode == 503) { + return true; + } + + // if it's a connect exception, lets try again + if (err.e instanceof SolrServerException) { + if (isRetriableException(((SolrServerException) err.e).getRootCause())) { + return true; + } + } else { + if (isRetriableException(err.e)) { + return true; + } + } return false; } + + /** + * @return true if Solr should retry in case of hitting this exception + * false otherwise + */ + private boolean isRetriableException(Throwable t) { + return t instanceof SocketException || t instanceof NoHttpResponseException || t instanceof SocketTimeoutException; + } @Override public String getBaseUrl() { @@ -506,6 +559,8 @@ public class SolrCmdDistributor implements Closeable { result = prime * result + ((baseUrl == null) ? 0 : baseUrl.hashCode()); result = prime * result + ((coreName == null) ? 0 : coreName.hashCode()); result = prime * result + ((url == null) ? 0 : url.hashCode()); + result = prime * result + Boolean.hashCode(retry); + result = prime * result + Integer.hashCode(maxRetries); return result; } @@ -515,6 +570,8 @@ public class SolrCmdDistributor implements Closeable { if (obj == null) return false; if (getClass() != obj.getClass()) return false; StdNode other = (StdNode) obj; + if (this.retry != other.retry) return false; + if (this.maxRetries != other.maxRetries) return false; String baseUrl = nodeProps.getBaseUrl(); String coreName = nodeProps.getCoreName(); String url = nodeProps.getCoreUrl(); @@ -534,39 +591,56 @@ public class SolrCmdDistributor implements Closeable { public ZkCoreNodeProps getNodeProps() { return nodeProps; } + + @Override + public int getMaxRetries() { + return this.maxRetries; + } } // RetryNodes are used in the case of 'forward to leader' where we want // to try the latest leader on a fail in the case the leader just went down. - public static class RetryNode extends StdNode { + public static class ForwardNode extends StdNode { private ZkStateReader zkStateReader; - public RetryNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId) { - super(nodeProps, collection, shardId); + public ForwardNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId, int maxRetries) { + super(nodeProps, collection, shardId, maxRetries); this.zkStateReader = zkStateReader; this.collection = collection; this.shardId = shardId; } @Override - public boolean checkRetry() { - ZkCoreNodeProps leaderProps; - try { - leaderProps = new ZkCoreNodeProps(zkStateReader.getLeaderRetry( - collection, shardId)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return false; - } catch (Exception e) { - // we retry with same info - log.warn(null, e); - return true; + public boolean checkRetry(Error err) { + boolean doRetry = false; + if (err.statusCode == 404 || err.statusCode == 403 || err.statusCode == 503) { + doRetry = true; } - - this.nodeProps = leaderProps; - return true; + // if it's a connect exception, lets try again + if (err.e instanceof SolrServerException && ((SolrServerException) err.e).getRootCause() instanceof ConnectException) { + doRetry = true; + } else if (err.e instanceof ConnectException) { + doRetry = true; + } + if (doRetry) { + ZkCoreNodeProps leaderProps; + try { + leaderProps = new ZkCoreNodeProps(zkStateReader.getLeaderRetry( + collection, shardId)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } catch (Exception e) { + // we retry with same info + log.warn(null, e); + return true; + } + + this.nodeProps = leaderProps; + } + return doRetry; } @Override @@ -584,7 +658,7 @@ public class SolrCmdDistributor implements Closeable { if (this == obj) return true; if (!super.equals(obj)) return false; if (getClass() != obj.getClass()) return false; - RetryNode other = (RetryNode) obj; + ForwardNode other = (ForwardNode) obj; if (nodeProps.getCoreUrl() == null) { if (other.nodeProps.getCoreUrl() != null) return false; } else if (!nodeProps.getCoreUrl().equals(other.nodeProps.getCoreUrl())) return false; diff --git a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java index eb4caecf3ea..eb925404db3 100644 --- a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java +++ b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java @@ -143,7 +143,6 @@ class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClien @Override public void handleError(Throwable ex) { - req.trackRequestResult(null, false); log.error("error", ex); Error error = new Error(); error.e = (Exception) ex; @@ -152,6 +151,10 @@ class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClien } error.req = req; errors.add(error); + if (!req.shouldRetry(error)) { + // only track the error if we are not retrying the request + req.trackRequestResult(null, false); + } } @Override public void onSuccess(HttpResponse resp) { diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index a21d906776c..e1e7968721e 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -83,7 +83,7 @@ import org.apache.solr.update.DeleteUpdateCommand; import org.apache.solr.update.SolrCmdDistributor; import org.apache.solr.update.SolrCmdDistributor.Error; import org.apache.solr.update.SolrCmdDistributor.Node; -import org.apache.solr.update.SolrCmdDistributor.RetryNode; +import org.apache.solr.update.SolrCmdDistributor.ForwardNode; import org.apache.solr.update.SolrCmdDistributor.StdNode; import org.apache.solr.update.SolrIndexSplitter; import org.apache.solr.update.UpdateCommand; @@ -113,6 +113,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { private static final String TEST_DISTRIB_SKIP_SERVERS = "test.distrib.skip.servers"; private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + /** + * Request forwarded to a leader of a different shard will be retried up to this amount of times by default + */ + static final int MAX_RETRIES_ON_FORWARD_DEAULT = 25; + + /** + * Requests from leader to it's followers will be retried this amount of times by default + */ + static final int MAX_RETRIES_TO_FOLLOWERS_DEFAULT = 3; + /** * Values this processor supports for the DISTRIB_UPDATE_PARAM. * This is an implementation detail exposed solely for tests. @@ -175,6 +185,15 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { private List nodes; private Set skippedCoreNodeNames; private boolean isIndexChanged = false; + + /** + * Number of times requests forwarded to some other shard's leader can be retried + */ + private final int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD_DEAULT; + /** + * Number of times requests from leaders to followers can be retried + */ + private final int maxRetriesToFollowers = MAX_RETRIES_TO_FOLLOWERS_DEFAULT; private UpdateCommand updateCommand; // the current command this processor is working on. @@ -368,7 +387,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) || replica.getState() == Replica.State.DOWN) { skippedCoreNodeNames.add(replica.getName()); } else { - nodes.add(new StdNode(new ZkCoreNodeProps(replica), collection, shardId)); + nodes.add(new StdNode(new ZkCoreNodeProps(replica), collection, shardId, maxRetriesToFollowers)); } } return nodes; @@ -377,7 +396,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // I need to forward on to the leader... forwardToLeader = true; return Collections.singletonList( - new RetryNode(new ZkCoreNodeProps(leaderReplica), zkController.getZkStateReader(), collection, shardId)); + new ForwardNode(new ZkCoreNodeProps(leaderReplica), zkController.getZkStateReader(), collection, shardId, maxRetriesOnForward)); } } catch (InterruptedException e) { @@ -779,7 +798,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { Set replicasShouldBeInLowerTerms = new HashSet<>(); for (final SolrCmdDistributor.Error error : errors) { - if (error.req.node instanceof RetryNode) { + if (error.req.node instanceof ForwardNode) { // if it's a forward, any fail is a problem - // otherwise we assume things are fine if we got it locally // until we start allowing min replication param @@ -1538,7 +1557,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // don't forward to ourself leaderForAnyShard = true; } else { - leaders.add(new RetryNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName)); + leaders.add(new ForwardNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName, maxRetriesOnForward)); } } @@ -2100,7 +2119,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { private int achievedRf = Integer.MAX_VALUE; private final int requestedRf; - RollupRequestReplicationTracker(String minRepFact) { + public RollupRequestReplicationTracker(String minRepFact) { try { this.requestedRf = Integer.parseInt(minRepFact); } catch (NumberFormatException nfe) { @@ -2153,7 +2172,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { return requestedRf; } - LeaderRequestReplicationTracker(String shardId, int requestedRf) { + public LeaderRequestReplicationTracker(String shardId, int requestedRf) { this.requestedRf = requestedRf; this.myShardId = shardId; } @@ -2165,8 +2184,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { public void trackRequestResult(Node node, boolean success) { if (log.isDebugEnabled()) { - log.debug("trackRequestResult(" + node + "): success? " + success + - ", shardId=" + myShardId); + log.debug("trackRequestResult({}): success? {}, shardId={}", node, success, myShardId); } if (success) { diff --git a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java index 1d2d73054f8..cd4ed00c8e7 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java @@ -401,8 +401,8 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor { throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "No 'leader' replica available for shard " + slice.getName() + " of collection " + collection); } - return new SolrCmdDistributor.RetryNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(), - collection, slice.getName()); + return new SolrCmdDistributor.ForwardNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(), + collection, slice.getName(), DistributedUpdateProcessor.MAX_RETRIES_ON_FORWARD_DEAULT); } } diff --git a/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java b/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java index 72d39ff89bb..c269c9efb46 100644 --- a/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java +++ b/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java @@ -16,19 +16,19 @@ */ package org.apache.solr.update; +import java.io.IOException; +import java.net.ConnectException; +import java.net.SocketException; import org.apache.lucene.util.LuceneTestCase; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.common.SolrException; import org.apache.solr.common.util.NamedList; -import java.io.IOException; -import java.net.ConnectException; -import java.net.SocketException; - public class MockStreamingSolrClients extends StreamingSolrClients { - public enum Exp {CONNECT_EXCEPTION, SOCKET_EXCEPTION}; + public enum Exp {CONNECT_EXCEPTION, SOCKET_EXCEPTION, BAD_REQUEST}; private volatile Exp exp = null; @@ -46,12 +46,14 @@ public class MockStreamingSolrClients extends StreamingSolrClients { this.exp = exp; } - private IOException exception() { + private Exception exception() { switch (exp) { case CONNECT_EXCEPTION: return new ConnectException(); case SOCKET_EXCEPTION: return new SocketException(); + case BAD_REQUEST: + return new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Bad Request"); default: break; } @@ -70,10 +72,17 @@ public class MockStreamingSolrClients extends StreamingSolrClients { public NamedList request(SolrRequest request, String collection) throws SolrServerException, IOException { if (exp != null) { - if (LuceneTestCase.random().nextBoolean()) { - throw exception(); + Exception e = exception(); + if (e instanceof IOException) { + if (LuceneTestCase.random().nextBoolean()) { + throw (IOException)e; + } else { + throw new SolrServerException(e); + } + } else if (e instanceof SolrServerException) { + throw (SolrServerException)e; } else { - throw new SolrServerException(exception()); + throw new SolrServerException(e); } } diff --git a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java index 1699b0d70dc..24cf71728ef 100644 --- a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java +++ b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java @@ -16,15 +16,15 @@ */ package org.apache.solr.update; -import javax.xml.parsers.ParserConfigurationException; import java.io.File; import java.io.IOException; +import java.net.SocketException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; - +import javax.xml.parsers.ParserConfigurationException; import org.apache.solr.BaseDistributedSearchTestCase; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrQuery; @@ -34,6 +34,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.LukeRequest; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; @@ -47,10 +48,12 @@ import org.apache.solr.index.LogDocMergePolicyFactory; import org.apache.solr.search.SolrIndexSearcher; import org.apache.solr.update.MockStreamingSolrClients.Exp; import org.apache.solr.update.SolrCmdDistributor.Error; +import org.apache.solr.update.SolrCmdDistributor.ForwardNode; import org.apache.solr.update.SolrCmdDistributor.Node; -import org.apache.solr.update.SolrCmdDistributor.RetryNode; import org.apache.solr.update.SolrCmdDistributor.StdNode; import org.apache.solr.update.processor.DistributedUpdateProcessor; +import org.apache.solr.update.processor.DistributedUpdateProcessor.LeaderRequestReplicationTracker; +import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -59,6 +62,8 @@ import org.xml.sax.SAXException; // See: https://issues.apache.org/jira/browse/SOLR-12028 Tests cannot remove files on Windows machines occasionally public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { + private static enum NodeType {FORWARD, STANDARD}; + private AtomicInteger id = new AtomicInteger(); @BeforeClass @@ -127,6 +132,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { shards = sb.toString(); } + @SuppressWarnings("unchecked") @Test @ShardsFixed(num = 4) public void test() throws Exception { @@ -325,30 +331,176 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { } } - testMaxRetries(); - testOneRetry(); + testMaxRetries(NodeType.FORWARD); + testMaxRetries(NodeType.STANDARD); + testOneRetry(NodeType.FORWARD); + testOneRetry(NodeType.STANDARD); testRetryNodeAgainstBadAddress(); - testRetryNodeWontRetrySocketError(); - + testStdNodeRetriesSocketError(); + testForwardNodeWontRetrySocketError(); + testNodeWontRetryBadRequest(NodeType.FORWARD); + testNodeWontRetryBadRequest(NodeType.STANDARD); + testMinRfOnRetries(NodeType.FORWARD); + testMinRfOnRetries(NodeType.STANDARD); testDistribOpenSearcher(); + testReqShouldRetryNoRetries(); + testReqShouldRetryMaxRetries(); + testReqShouldRetryBadRequest(); + testReqShouldRetryNotFound(); + testReqShouldRetryDBQ(); + testDeletes(false, true); + testDeletes(false, false); + testDeletes(true, true); + testDeletes(true, false); + } + + private void testDeletes(boolean dbq, boolean withFailures) throws Exception { + final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0); + solrclient.commit(true, true); + long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults() + .getNumFound(); + final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler); + try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) { + if (withFailures) { + streamingClients.setExp(Exp.CONNECT_EXCEPTION); + } + ArrayList nodes = new ArrayList<>(); + + ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), + ZkStateReader.CORE_NAME_PROP, ""); + + final AtomicInteger retries = new AtomicInteger(); + nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, ""); + Node retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) { + @Override + public boolean checkRetry(Error err) { + streamingClients.setExp(null); + retries.incrementAndGet(); + return super.checkRetry(err); + } + }; + + + nodes.add(retryNode); + + for (int i = 0 ; i < 5 ; i++) { + AddUpdateCommand cmd = new AddUpdateCommand(null); + int currentId = id.incrementAndGet(); + cmd.solrDoc = sdoc("id", currentId); + ModifiableSolrParams params = new ModifiableSolrParams(); + cmdDistrib.distribAdd(cmd, nodes, params); + DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null); + if (dbq) { + dcmd.setQuery("id:" + currentId); + } else { + dcmd.setId(String.valueOf(currentId)); + } + cmdDistrib.distribDelete(dcmd, nodes, params, false, null, null); + } + + + CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false); + cmdDistrib.distribCommit(ccmd, nodes, new ModifiableSolrParams()); + cmdDistrib.finish(); + + int expectedRetryCount = 0; + if (withFailures) { + if (dbq) { + expectedRetryCount = 1; // just the first cmd would be retried + } else { + expectedRetryCount = 10; + } + } + assertEquals(expectedRetryCount, retries.get()); + + + long numFoundAfter = solrclient.query(new SolrQuery("*:*")).getResults() + .getNumFound(); + + // we will get java.net.ConnectException which we retry on + assertEquals(numFoundBefore, numFoundAfter); + assertEquals(0, cmdDistrib.getErrors().size()); + } } - private void testMaxRetries() throws IOException { + private void testMinRfOnRetries(NodeType nodeType) throws Exception { + final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0); final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler); - try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 5, 0)) { + try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) { + streamingClients.setExp(Exp.CONNECT_EXCEPTION); + ArrayList nodes = new ArrayList<>(); + + ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), + ZkStateReader.CORE_NAME_PROP, ""); + + final AtomicInteger retries = new AtomicInteger(); + nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, ""); + if (nodeType == NodeType.FORWARD) { + nodes.add(new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) { + @Override + public boolean checkRetry(Error err) { + if (retries.incrementAndGet() >= 3) { + streamingClients.setExp(null); + } + return super.checkRetry(err); + } + }); + } else { + nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) { + @Override + public boolean checkRetry(Error err) { + if (retries.incrementAndGet() >= 3) { + streamingClients.setExp(null); + } + return super.checkRetry(err); + } + }); + } + + + AddUpdateCommand cmd = new AddUpdateCommand(null); + cmd.solrDoc = sdoc("id", id.incrementAndGet()); + ModifiableSolrParams params = new ModifiableSolrParams(); + RollupRequestReplicationTracker rollupReqTracker = new RollupRequestReplicationTracker("2"); + LeaderRequestReplicationTracker leaderReqTracker = new LeaderRequestReplicationTracker("shard1", 2); + + cmdDistrib.distribAdd(cmd, nodes, params, false, rollupReqTracker, leaderReqTracker); + cmdDistrib.finish(); + assertEquals(3, retries.get()); + assertEquals(2, leaderReqTracker.getAchievedRf());// "2" here is because one would be the leader, that creates the instance of LeaderRequestReplicationTracker, the second one is the node + + assertEquals(0, cmdDistrib.getErrors().size()); + } + } + + private void testMaxRetries(NodeType nodeType) throws IOException { + final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler); + try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) { streamingClients.setExp(Exp.CONNECT_EXCEPTION); ArrayList nodes = new ArrayList<>(); final HttpSolrClient solrclient1 = (HttpSolrClient) clients.get(0); final AtomicInteger retries = new AtomicInteger(); ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient1.getBaseURL(), ZkStateReader.CORE_NAME_PROP, ""); - RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") { - @Override - public boolean checkRetry() { - retries.incrementAndGet(); - return true; - } - }; + Node retryNode; + if (nodeType == NodeType.FORWARD) { + retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 6) { + @Override + public boolean checkRetry(Error err) { + retries.incrementAndGet(); + return super.checkRetry(err); + } + }; + } else { + retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 6) { + @Override + public boolean checkRetry(Error err) { + retries.incrementAndGet(); + return super.checkRetry(err); + } + }; + } + nodes.add(retryNode); @@ -359,18 +511,61 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { cmdDistrib.distribAdd(cmd, nodes, params); cmdDistrib.finish(); - assertEquals(6, retries.get()); + assertEquals(7, retries.get()); assertEquals(1, cmdDistrib.getErrors().size()); } } - private void testOneRetry() throws Exception { + private void testReqShouldRetryNoRetries() { + Error err = getError(new SocketException()); + SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 0), new UpdateRequest(), true); + assertFalse(req.shouldRetry(err)); + } + + private void testReqShouldRetryDBQ() { + Error err = getError(new SocketException()); + UpdateRequest dbqReq = new UpdateRequest(); + dbqReq.deleteByQuery("*:*"); + SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), dbqReq, true); + assertFalse(req.shouldRetry(err)); + } + + private void testReqShouldRetryMaxRetries() { + Error err = getError(new SocketException()); + SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), new UpdateRequest(), true); + assertTrue(req.shouldRetry(err)); + req.retries++; + assertFalse(req.shouldRetry(err)); + } + + private void testReqShouldRetryBadRequest() { + Error err = getError(new SolrException(SolrException.ErrorCode.BAD_REQUEST, "bad request")); + SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), new UpdateRequest(), true); + assertFalse(req.shouldRetry(err)); + } + + private void testReqShouldRetryNotFound() { + Error err = getError(new SolrException(SolrException.ErrorCode.NOT_FOUND, "not found")); + SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), new UpdateRequest(), true); + assertTrue(req.shouldRetry(err)); + } + + private Error getError(Exception e) { + Error err = new Error(); + err.e = e; + if (e instanceof SolrException) { + err.statusCode = ((SolrException)e).code(); + } + return err; + } + + private void testOneRetry(NodeType nodeType) throws Exception { final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0); long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults() .getNumFound(); final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler); - try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 5, 0)) { + try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) { streamingClients.setExp(Exp.CONNECT_EXCEPTION); ArrayList nodes = new ArrayList<>(); @@ -379,14 +574,26 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { final AtomicInteger retries = new AtomicInteger(); nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, ""); - RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") { - @Override - public boolean checkRetry() { - streamingClients.setExp(null); - retries.incrementAndGet(); - return true; - } - }; + Node retryNode; + if (nodeType == NodeType.FORWARD) { + retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) { + @Override + public boolean checkRetry(Error err) { + streamingClients.setExp(null); + retries.incrementAndGet(); + return super.checkRetry(err); + } + }; + } else { + retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) { + @Override + public boolean checkRetry(Error err) { + streamingClients.setExp(null); + retries.incrementAndGet(); + return super.checkRetry(err); + } + }; + } nodes.add(retryNode); @@ -412,12 +619,70 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { } } - private void testRetryNodeWontRetrySocketError() throws Exception { + private void testNodeWontRetryBadRequest(NodeType nodeType) throws Exception { + ignoreException("Bad Request"); final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0); long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults() .getNumFound(); final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler); - try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 5, 0)) { + try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) { + streamingClients.setExp(Exp.BAD_REQUEST); + ArrayList nodes = new ArrayList<>(); + ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), + ZkStateReader.CORE_NAME_PROP, ""); + + final AtomicInteger retries = new AtomicInteger(); + Node retryNode; + if (nodeType == NodeType.FORWARD) { + retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) { + @Override + public boolean checkRetry(Error err) { + retries.incrementAndGet(); + return super.checkRetry(err); + } + }; + } else { + retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) { + @Override + public boolean checkRetry(Error err) { + retries.incrementAndGet(); + return super.checkRetry(err); + } + }; + } + nodes.add(retryNode); + + AddUpdateCommand cmd = new AddUpdateCommand(null); + cmd.solrDoc = sdoc("id", id.incrementAndGet()); + ModifiableSolrParams params = new ModifiableSolrParams(); + + CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false); + cmdDistrib.distribAdd(cmd, nodes, params); + + streamingClients.setExp(null); + cmdDistrib.distribCommit(ccmd, nodes, params); + cmdDistrib.finish(); + + // it will checkRetry, but not actually do it... + assertEquals(1, retries.get()); + + + long numFoundAfter = solrclient.query(new SolrQuery("*:*")).getResults() + .getNumFound(); + + // we will get java.net.SocketException: Network is unreachable, which we don't retry on + assertEquals(numFoundBefore, numFoundAfter); + assertEquals(1, cmdDistrib.getErrors().size()); + unIgnoreException("Bad Request"); + } + } + + private void testForwardNodeWontRetrySocketError() throws Exception { + final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0); + long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults() + .getNumFound(); + final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler); + try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) { streamingClients.setExp(Exp.SOCKET_EXCEPTION); ArrayList nodes = new ArrayList<>(); @@ -426,11 +691,11 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { final AtomicInteger retries = new AtomicInteger(); nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, ""); - RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") { + ForwardNode retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) { @Override - public boolean checkRetry() { + public boolean checkRetry(Error err) { retries.incrementAndGet(); - return true; + return super.checkRetry(err); } }; @@ -460,6 +725,41 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { assertEquals(1, cmdDistrib.getErrors().size()); } } + + private void testStdNodeRetriesSocketError() throws Exception { + final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0); + final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler); + try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) { + streamingClients.setExp(Exp.SOCKET_EXCEPTION); + ArrayList nodes = new ArrayList<>(); + + ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), + ZkStateReader.CORE_NAME_PROP, ""); + + final AtomicInteger retries = new AtomicInteger(); + nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, ""); + Node retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) { + @Override + public boolean checkRetry(Error err) { + retries.incrementAndGet(); + return super.checkRetry(err); + } + }; + + + nodes.add(retryNode); + + AddUpdateCommand cmd = new AddUpdateCommand(null); + cmd.solrDoc = sdoc("id", id.incrementAndGet()); + ModifiableSolrParams params = new ModifiableSolrParams(); + + cmdDistrib.distribAdd(cmd, nodes, params); + cmdDistrib.finish(); + + // it will checkRetry, but not actually do it... + assertEquals(6, retries.get()); + } + } private void testRetryNodeAgainstBadAddress() throws SolrServerException, IOException { // Test RetryNode @@ -471,14 +771,14 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { ArrayList nodes = new ArrayList<>(); ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, "[ff01::114]:33332" + context, ZkStateReader.CORE_NAME_PROP, ""); - RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") { + ForwardNode retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) { @Override - public boolean checkRetry() { + public boolean checkRetry(Error err) { ZkNodeProps leaderProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, ""); this.nodeProps = new ZkCoreNodeProps(leaderProps); - return true; + return super.checkRetry(err); } };