mirror of https://github.com/apache/lucene.git
SOLR-6944: see if NoHttpResponse exceptions are due to the socket timeout being set too low in the SocketProxy, change from 10 to 100 seconds
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1671861 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fe0ab10d4e
commit
35f0e80a02
|
@ -17,12 +17,12 @@ package org.apache.solr.cloud;
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.http.NoHttpResponseException;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -66,6 +66,9 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
|
|||
}
|
||||
|
||||
private void multiShardTest() throws Exception {
|
||||
|
||||
log.info("Running multiShardTest");
|
||||
|
||||
// create a collection that has 2 shard and 2 replicas
|
||||
String testCollectionName = "c8n_2x2_commits";
|
||||
createCollection(testCollectionName, 2, 2, 1);
|
||||
|
@ -78,16 +81,17 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
|
|||
+ printClusterStateInfo(),
|
||||
notLeaders.size() == 1);
|
||||
|
||||
log.info("All replicas active for "+testCollectionName);
|
||||
|
||||
// let's put the leader in its own partition, no replicas can contact it now
|
||||
Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
|
||||
log.info("Creating partition to leader at "+leader.getCoreUrl());
|
||||
SocketProxy leaderProxy = getProxyForReplica(leader);
|
||||
leaderProxy.close();
|
||||
|
||||
// let's find the leader of shard2 and ask him to commit
|
||||
Replica shard2Leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard2");
|
||||
try (HttpSolrClient server = new HttpSolrClient(ZkCoreNodeProps.getCoreUrl(shard2Leader.getStr("base_url"), shard2Leader.getStr("core")))) {
|
||||
server.commit();
|
||||
}
|
||||
sendCommitWithRetry(shard2Leader);
|
||||
|
||||
Thread.sleep(sleepMsBeforeHealPartition);
|
||||
|
||||
|
@ -95,6 +99,7 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
|
|||
leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
|
||||
assertSame("Leader was not active", Replica.State.ACTIVE, leader.getState());
|
||||
|
||||
log.info("Healing partitioned replica at "+leader.getCoreUrl());
|
||||
leaderProxy.reopen();
|
||||
Thread.sleep(sleepMsBeforeHealPartition);
|
||||
|
||||
|
@ -107,9 +112,13 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
|
|||
// don't fail the test
|
||||
log.warn("Could not delete collection {} after test completed", testCollectionName);
|
||||
}
|
||||
|
||||
log.info("multiShardTest completed OK");
|
||||
}
|
||||
|
||||
private void oneShardTest() throws Exception {
|
||||
log.info("Running oneShardTest");
|
||||
|
||||
// create a collection that has 1 shard and 3 replicas
|
||||
String testCollectionName = "c8n_1x3_commits";
|
||||
createCollection(testCollectionName, 1, 3, 1);
|
||||
|
@ -122,22 +131,23 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
|
|||
+ printClusterStateInfo(),
|
||||
notLeaders.size() == 2);
|
||||
|
||||
log.info("All replicas active for "+testCollectionName);
|
||||
|
||||
// let's put the leader in its own partition, no replicas can contact it now
|
||||
Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
|
||||
log.info("Creating partition to leader at "+leader.getCoreUrl());
|
||||
SocketProxy leaderProxy = getProxyForReplica(leader);
|
||||
leaderProxy.close();
|
||||
|
||||
Replica replica = notLeaders.get(0);
|
||||
try (HttpSolrClient client = new HttpSolrClient(ZkCoreNodeProps.getCoreUrl(replica.getStr("base_url"), replica.getStr("core")))) {
|
||||
client.commit();
|
||||
}
|
||||
|
||||
sendCommitWithRetry(replica);
|
||||
Thread.sleep(sleepMsBeforeHealPartition);
|
||||
|
||||
cloudClient.getZkStateReader().updateClusterState(true); // get the latest state
|
||||
leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
|
||||
assertSame("Leader was not active", Replica.State.ACTIVE, leader.getState());
|
||||
|
||||
log.info("Healing partitioned replica at "+leader.getCoreUrl());
|
||||
leaderProxy.reopen();
|
||||
Thread.sleep(sleepMsBeforeHealPartition);
|
||||
|
||||
|
@ -150,6 +160,8 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
|
|||
// don't fail the test
|
||||
log.warn("Could not delete collection {} after test completed", testCollectionName);
|
||||
}
|
||||
|
||||
log.info("oneShardTest completed OK");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -162,4 +174,29 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
|
|||
return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride);
|
||||
}
|
||||
|
||||
protected void sendCommitWithRetry(Replica replica) throws Exception {
|
||||
String replicaCoreUrl = replica.getCoreUrl();
|
||||
log.info("Sending commit request to: "+replicaCoreUrl);
|
||||
long startMs = System.currentTimeMillis();
|
||||
try (HttpSolrClient client = new HttpSolrClient(replicaCoreUrl)) {
|
||||
try {
|
||||
client.commit();
|
||||
|
||||
long tookMs = System.currentTimeMillis() - startMs;
|
||||
log.info("Sent commit request to "+replicaCoreUrl+" OK, took: "+tookMs);
|
||||
} catch (Exception exc) {
|
||||
Throwable rootCause = SolrException.getRootCause(exc);
|
||||
if (rootCause instanceof NoHttpResponseException) {
|
||||
log.warn("No HTTP response from sending commit request to "+replicaCoreUrl+
|
||||
"; will re-try after waiting 3 seconds");
|
||||
Thread.sleep(3000);
|
||||
client.commit();
|
||||
log.info("Second attempt at sending commit to "+replicaCoreUrl+" succeeded.");
|
||||
} else {
|
||||
throw exc;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -48,6 +48,9 @@ public class SocketProxy {
|
|||
private static final transient Logger log = LoggerFactory.getLogger(SocketProxy.class);
|
||||
|
||||
public static final int ACCEPT_TIMEOUT_MILLIS = 100;
|
||||
|
||||
// should be as large as the HttpShardHandlerFactory socket timeout ... or larger?
|
||||
public static final int PUMP_SOCKET_TIMEOUT_MS = 100 * 1000;
|
||||
|
||||
private URI proxyUrl;
|
||||
private URI target;
|
||||
|
@ -148,7 +151,7 @@ public class SocketProxy {
|
|||
synchronized (this.connections) {
|
||||
connections = new ArrayList<Bridge>(this.connections);
|
||||
}
|
||||
log.warn("Closing " + connections.size()+" connections to: "+getUrl());
|
||||
log.warn("Closing " + connections.size()+" connections to: "+getUrl()+", target: "+target);
|
||||
for (Bridge con : connections) {
|
||||
closeConnection(con);
|
||||
}
|
||||
|
@ -338,7 +341,7 @@ public class SocketProxy {
|
|||
byte[] buf = new byte[1024];
|
||||
|
||||
try {
|
||||
src.setSoTimeout(10 * 1000);
|
||||
src.setSoTimeout(PUMP_SOCKET_TIMEOUT_MS);
|
||||
} catch (SocketException e) {
|
||||
log.error("Failed to set socket timeout on "+src+" due to: "+e);
|
||||
throw new RuntimeException(e);
|
||||
|
|
Loading…
Reference in New Issue