We need better testing for SolrCmdDistributor retry logic.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1545428 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2013-11-25 22:32:05 +00:00
parent 2678a15112
commit bf251f501d
3 changed files with 206 additions and 21 deletions

View File

@ -43,6 +43,9 @@ public class SolrCmdDistributor {
private StreamingSolrServers servers;
private int retryPause = 500;
private int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD;
private List<Error> allErrors = new ArrayList<Error>();
private List<Error> errors = new ArrayList<Error>();
@ -54,6 +57,12 @@ public class SolrCmdDistributor {
servers = new StreamingSolrServers(updateShardHandler);
}
public SolrCmdDistributor(StreamingSolrServers servers, int maxRetriesOnForward, int retryPause) {
this.servers = servers;
this.maxRetriesOnForward = maxRetriesOnForward;
this.retryPause = retryPause;
}
public void finish() {
try {
servers.blockUntilFinished();
@ -85,8 +94,7 @@ public class SolrCmdDistributor {
// this can happen in certain situations such as shutdown
if (isRetry) {
if (rspCode == 404 || rspCode == 403 || rspCode == 503
|| rspCode == 500) {
if (rspCode == 404 || rspCode == 403 || rspCode == 503) {
doRetry = true;
}
@ -98,14 +106,14 @@ public class SolrCmdDistributor {
doRetry = true;
}
}
if (err.req.retries < MAX_RETRIES_ON_FORWARD && doRetry) {
if (err.req.retries < maxRetriesOnForward && doRetry) {
err.req.retries++;
SolrException.log(SolrCmdDistributor.log, "forwarding update to "
+ oldNodeUrl + " failed - retrying ... retries: "
+ err.req.retries);
try {
Thread.sleep(500);
Thread.sleep(retryPause);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn(null, e);
@ -252,7 +260,7 @@ public class SolrCmdDistributor {
public static class Error {
public Exception e;
public int statusCode;
public int statusCode = -1;
public Req req;
}

View File

@ -0,0 +1,86 @@
package org.apache.solr.update;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.net.ConnectException;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.util.NamedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MockStreamingSolrServers extends StreamingSolrServers {
public static Logger log = LoggerFactory
.getLogger(MockStreamingSolrServers.class);
public enum Exp {CONNECT_EXCEPTION};
private volatile Exp exp = null;
public MockStreamingSolrServers(UpdateShardHandler updateShardHandler) {
super(updateShardHandler);
}
@Override
public synchronized SolrServer getSolrServer(final SolrCmdDistributor.Req req) {
SolrServer server = super.getSolrServer(req);
return new MockSolrServer(server);
}
public void setExp(Exp exp) {
this.exp = exp;
}
private IOException exception() {
switch (exp) {
case CONNECT_EXCEPTION:
return new ConnectException();
default:
break;
}
return null;
}
class MockSolrServer extends SolrServer {
private SolrServer solrServer;
public MockSolrServer(SolrServer solrServer) {
this.solrServer = solrServer;
}
@Override
public NamedList<Object> request(SolrRequest request)
throws SolrServerException, IOException {
if (exp != null) {
throw exception();
}
return solrServer.request(request);
}
@Override
public void shutdown() {}
}
}

View File

@ -30,6 +30,7 @@ import org.apache.lucene.index.LogDocMergePolicy;
import org.apache.solr.BaseDistributedSearchTestCase;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.LukeRequest;
@ -46,6 +47,7 @@ import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrEventListener;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.update.MockStreamingSolrServers.Exp;
import org.apache.solr.update.SolrCmdDistributor.Error;
import org.apache.solr.update.SolrCmdDistributor.Node;
import org.apache.solr.update.SolrCmdDistributor.RetryNode;
@ -55,6 +57,9 @@ import org.junit.BeforeClass;
import org.xml.sax.SAXException;
public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
private AtomicInteger id = new AtomicInteger();
@BeforeClass
public static void beforeClass() throws Exception {
@ -139,7 +144,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
// add one doc to controlClient
AddUpdateCommand cmd = new AddUpdateCommand(null);
cmd.solrDoc = sdoc("id", 1);
cmd.solrDoc = sdoc("id", id.incrementAndGet());
params = new ModifiableSolrParams();
cmdDistrib.distribAdd(cmd, nodes, params);
@ -166,20 +171,21 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
// add another 2 docs to control and 3 to client
cmdDistrib = new SolrCmdDistributor(updateShardHandler);
cmd.solrDoc = sdoc("id", 2);
cmd.solrDoc = sdoc("id", id.incrementAndGet());
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribAdd(cmd, nodes, params);
int id2 = id.incrementAndGet();
AddUpdateCommand cmd2 = new AddUpdateCommand(null);
cmd2.solrDoc = sdoc("id", 3);
cmd2.solrDoc = sdoc("id", id2);
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribAdd(cmd2, nodes, params);
AddUpdateCommand cmd3 = new AddUpdateCommand(null);
cmd3.solrDoc = sdoc("id", 4);
cmd3.solrDoc = sdoc("id", id.incrementAndGet());
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
@ -204,8 +210,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
// now delete doc 2 which is on both control and client1
DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null);
dcmd.id = "2";
dcmd.id = Integer.toString(id2);
cmdDistrib = new SolrCmdDistributor(updateShardHandler);
@ -239,8 +244,6 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
//System.out.println(clients.get(0).request(new LukeRequest()));
}
int id = 5;
cmdDistrib = new SolrCmdDistributor(updateShardHandler);
int cnt = atLeast(303);
@ -257,7 +260,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
}
AddUpdateCommand c = new AddUpdateCommand(null);
c.solrDoc = sdoc("id", id++);
c.solrDoc = sdoc("id", id.incrementAndGet());
if (nodes.size() > 0) {
params = new ModifiableSolrParams();
cmdDistrib.distribAdd(c, nodes, params);
@ -312,15 +315,100 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
((NamedList<Object>) resp.get("index")).get("maxDoc"));
}
testMaxRetries();
testOneRetry();
testRetryNode();
}
private void testMaxRetries() throws IOException {
final MockStreamingSolrServers ss = new MockStreamingSolrServers(updateShardHandler);
SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(ss, 5, 0);
ss.setExp(Exp.CONNECT_EXCEPTION);
ArrayList<Node> nodes = new ArrayList<Node>();
final HttpSolrServer solrclient1 = (HttpSolrServer) clients.get(0);
final AtomicInteger retries = new AtomicInteger();
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient1.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") {
@Override
public boolean checkRetry() {
retries.incrementAndGet();
return true;
}
};
nodes.add(retryNode);
AddUpdateCommand cmd = new AddUpdateCommand(null);
cmd.solrDoc = sdoc("id", id.incrementAndGet());
ModifiableSolrParams params = new ModifiableSolrParams();
cmdDistrib.distribAdd(cmd, nodes, params);
cmdDistrib.finish();
assertEquals(6, retries.get());
assertEquals(1, cmdDistrib.getErrors().size());
}
private void testOneRetry() throws Exception {
final HttpSolrServer solrclient = (HttpSolrServer) clients.get(0);
long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
final MockStreamingSolrServers ss = new MockStreamingSolrServers(updateShardHandler);
SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(ss, 5, 0);
ss.setExp(Exp.CONNECT_EXCEPTION);
ArrayList<Node> nodes = new ArrayList<Node>();
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
ZkStateReader.CORE_NAME_PROP, "");
final AtomicInteger retries = new AtomicInteger();
nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") {
@Override
public boolean checkRetry() {
ss.setExp(null);
retries.incrementAndGet();
return true;
}
};
nodes.add(retryNode);
AddUpdateCommand cmd = new AddUpdateCommand(null);
cmd.solrDoc = sdoc("id", id.incrementAndGet());
ModifiableSolrParams params = new ModifiableSolrParams();
CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
cmdDistrib.distribAdd(cmd, nodes, params);
cmdDistrib.distribCommit(ccmd, nodes, params);
cmdDistrib.finish();
assertEquals(1, retries.get());
long numFoundAfter = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
// we will get java.net.SocketException: Network is unreachable, which we don't retry on
assertEquals(numFoundBefore + 1, numFoundAfter);
assertEquals(0, cmdDistrib.getErrors().size());
}
private void testRetryNode() throws SolrServerException, IOException {
// Test RetryNode
cmdDistrib = new SolrCmdDistributor(updateShardHandler);
SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler);
final HttpSolrServer solrclient = (HttpSolrServer) clients.get(0);
long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
nodes = new ArrayList<Node>();
ArrayList<Node> nodes = new ArrayList<Node>();
nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, "[ff01::114]:33332" + context, ZkStateReader.CORE_NAME_PROP, "");
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, "[ff01::114]:33332" + context, ZkStateReader.CORE_NAME_PROP, "");
RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") {
@Override
public boolean checkRetry() {
@ -336,13 +424,13 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
nodes.add(retryNode);
cmd = new AddUpdateCommand(null);
cmd.solrDoc = sdoc("id", 1111111);
params = new ModifiableSolrParams();
AddUpdateCommand cmd = new AddUpdateCommand(null);
cmd.solrDoc = sdoc("id", id.incrementAndGet());
ModifiableSolrParams params = new ModifiableSolrParams();
cmdDistrib.distribAdd(cmd, nodes, params);
ccmd = new CommitUpdateCommand(null, false);
CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribCommit(ccmd, nodes, params);
@ -351,7 +439,10 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
long numFoundAfter = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
// we will get java.net.SocketException: Network is unreachable and then retry
assertEquals(numFoundBefore + 1, numFoundAfter);
assertEquals(0, cmdDistrib.getErrors().size());
}
@Override