SOLR-5495: Hardening recovery scenarios after the leader receives an error trying to forward an update request to a replica.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1593312 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy Potter 2014-05-08 15:48:26 +00:00
parent b1a81fe2bc
commit eb3beb2d70
12 changed files with 1697 additions and 122 deletions

View File

@ -82,6 +82,10 @@ Other Changes
* SOLR-5868: HttpClient should be configured to use ALLOW_ALL_HOSTNAME hostname * SOLR-5868: HttpClient should be configured to use ALLOW_ALL_HOSTNAME hostname
verifier to simplify SSL setup. (Steve Davids via Mark Miller) verifier to simplify SSL setup. (Steve Davids via Mark Miller)
* SOLR-5495: Recovery strategy for leader partitioned from replica case. Hardening
recovery scenarios after the leader receives an error trying to forward an
update request to a replica. (Timothy Potter)
================== 4.9.0 ================== ================== 4.9.0 ==================
Versions of Major Components Versions of Major Components

View File

@ -96,6 +96,8 @@ public class JettySolrRunner {
private SSLConfig sslConfig; private SSLConfig sslConfig;
private int proxyPort = -1;
public static class DebugFilter implements Filter { public static class DebugFilter implements Filter {
public int requestsToKeep = 10; public int requestsToKeep = 10;
private AtomicLong nRequests = new AtomicLong(); private AtomicLong nRequests = new AtomicLong();
@ -477,7 +479,7 @@ public class JettySolrRunner {
if (0 == conns.length) { if (0 == conns.length) {
throw new RuntimeException("Jetty Server has no Connectors"); throw new RuntimeException("Jetty Server has no Connectors");
} }
return conns[0].getLocalPort(); return (proxyPort != -1) ? proxyPort : conns[0].getLocalPort();
} }
/** /**
@ -489,7 +491,16 @@ public class JettySolrRunner {
if (lastPort == -1) { if (lastPort == -1) {
throw new IllegalStateException("You cannot get the port until this instance has started"); throw new IllegalStateException("You cannot get the port until this instance has started");
} }
return lastPort; return (proxyPort != -1) ? proxyPort : lastPort;
}
/**
* Sets the port of a local socket proxy that sits infront of this server; if set
* then all client traffic will flow through the proxy, giving us the ability to
* simulate network partitions very easily.
*/
public void setProxyPort(int proxyPort) {
this.proxyPort = proxyPort;
} }
/** /**

View File

@ -3,8 +3,6 @@ package org.apache.solr.cloud;
import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor; import org.apache.solr.common.cloud.ZkCmdExecutor;
@ -26,7 +24,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/* /*
@ -146,7 +145,6 @@ class ShardLeaderElectionContextBase extends ElectionContext {
ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE); ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m)); Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
} }
} }
// add core container and stop passing core around... // add core container and stop passing core around...
@ -287,9 +285,11 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
core.getCoreDescriptor().getCloudDescriptor().setLeader(true); core.getCoreDescriptor().getCloudDescriptor().setLeader(true);
} }
boolean isLeader = true;
try { try {
super.runLeaderProcess(weAreReplacement, 0); super.runLeaderProcess(weAreReplacement, 0);
} catch (Exception e) { } catch (Exception e) {
isLeader = false;
SolrException.log(log, "There was a problem trying to register as the leader", e); SolrException.log(log, "There was a problem trying to register as the leader", e);
try (SolrCore core = cc.getCore(coreName)) { try (SolrCore core = cc.getCore(coreName)) {
@ -306,6 +306,71 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
} }
} }
if (isLeader) {
// check for any replicas in my shard that were set to down by the previous leader
try {
startLeaderInitiatedRecoveryOnReplicas(coreName);
} catch (Exception exc) {
// don't want leader election to fail because of
// an error trying to tell others to recover
}
}
}
private void startLeaderInitiatedRecoveryOnReplicas(String coreName) throws Exception {
try (SolrCore core = cc.getCore(coreName)) {
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
String coll = cloudDesc.getCollectionName();
String shardId = cloudDesc.getShardId();
String znodePath = zkController.getLeaderInitiatedRecoveryZnodePath(coll, shardId);
List<String> replicas = null;
try {
replicas = zkClient.getChildren(znodePath, null, false);
} catch (NoNodeException nne) {
// this can be ignored
}
if (replicas != null && replicas.size() > 0) {
for (String replicaCore : replicas) {
if (coreName.equals(replicaCore))
continue; // added safe-guard so we don't mark this core as down
String lirState = zkController.getLeaderInitiatedRecoveryState(coll, shardId, replicaCore);
if (ZkStateReader.DOWN.equals(lirState) || ZkStateReader.RECOVERY_FAILED.equals(lirState)) {
log.info("After "+coreName+" was elected leader, found "+
replicaCore+" as "+lirState+" and needing recovery.");
List<ZkCoreNodeProps> replicaProps =
zkController.getZkStateReader().getReplicaProps(
collection, shardId, coreName, replicaCore, null, null);
if (replicaProps != null && replicaProps.size() > 0) {
ZkCoreNodeProps coreNodeProps = null;
for (ZkCoreNodeProps p : replicaProps) {
if (p.getCoreName().equals(replicaCore)) {
coreNodeProps = p;
break;
}
}
LeaderInitiatedRecoveryThread lirThread =
new LeaderInitiatedRecoveryThread(zkController,
cc,
collection,
shardId,
coreNodeProps,
120);
zkController.ensureReplicaInLeaderInitiatedRecovery(
collection, shardId, replicaCore, coreNodeProps, false);
ExecutorService executor = cc.getUpdateShardHandler().getUpdateExecutor();
executor.execute(lirThread);
}
}
}
}
} // core gets closed automagically
} }
private void waitForReplicasToComeUp(boolean weAreReplacement, int timeoutms) throws InterruptedException { private void waitForReplicasToComeUp(boolean weAreReplacement, int timeoutms) throws InterruptedException {
@ -373,7 +438,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
} }
private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core, boolean weAreReplacement) { private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core, boolean weAreReplacement) {
log.info("Checking if I should try and be the leader."); log.info("Checking if I ("+core.getName()+") should try and be the leader.");
if (isClosed) { if (isClosed) {
log.info("Bailing on leader process because we have been closed"); log.info("Bailing on leader process because we have been closed");
@ -386,8 +451,18 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
return true; return true;
} }
if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished() if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished().equals(ZkStateReader.ACTIVE)) {
.equals(ZkStateReader.ACTIVE)) {
// maybe active but if the previous leader marked us as down and
// we haven't recovered, then can't be leader
String lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId, core.getName());
if (ZkStateReader.DOWN.equals(lirState) || ZkStateReader.RECOVERING.equals(lirState)) {
log.warn("Although my last published state is Active, the previous leader marked me "+core.getName()
+ " as " + lirState
+ " and I haven't recovered yet, so I shouldn't be the leader.");
return false;
}
log.info("My last published State was Active, it's okay to be the leader."); log.info("My last published State was Active, it's okay to be the leader.");
return true; return true;
} }

View File

@ -0,0 +1,234 @@
package org.apache.solr.cloud;
import java.net.ConnectException;
import java.net.SocketException;
import java.util.List;
import org.apache.http.NoHttpResponseException;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.core.CoreContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Background daemon thread that tries to send the REQUESTRECOVERY to a downed
* replica; used by a shard leader to nag a replica into recovering after the
* leader experiences an error trying to send an update request to the replica.
*/
public class LeaderInitiatedRecoveryThread extends Thread {
public final static Logger log = LoggerFactory.getLogger(LeaderInitiatedRecoveryThread.class);
protected ZkController zkController;
protected CoreContainer coreContainer;
protected String collection;
protected String shardId;
protected ZkCoreNodeProps nodeProps;
protected int maxTries;
public LeaderInitiatedRecoveryThread(ZkController zkController,
CoreContainer cc,
String collection,
String shardId,
ZkCoreNodeProps nodeProps,
int maxTries)
{
super("LeaderInitiatedRecoveryThread-"+nodeProps.getCoreName());
this.zkController = zkController;
this.coreContainer = cc;
this.collection = collection;
this.shardId = shardId;
this.nodeProps = nodeProps;
this.maxTries = maxTries;
setDaemon(true);
}
public void run() {
long startMs = System.currentTimeMillis();
try {
sendRecoveryCommandWithRetry();
} catch (Exception exc) {
log.error(getName()+" failed due to: "+exc, exc);
if (exc instanceof SolrException) {
throw (SolrException)exc;
} else {
throw new SolrException(ErrorCode.SERVER_ERROR, exc);
}
}
long diffMs = (System.currentTimeMillis() - startMs);
log.info(getName()+" completed successfully after running for "+Math.round(diffMs/1000L)+" secs");
}
protected void sendRecoveryCommandWithRetry() throws Exception {
int tries = 0;
long waitBetweenTriesMs = 5000L;
boolean continueTrying = true;
String recoveryUrl = nodeProps.getBaseUrl();
String replicaNodeName = nodeProps.getNodeName();
String coreNeedingRecovery = nodeProps.getCoreName();
String replicaUrl = nodeProps.getCoreUrl();
log.info(getName()+" started running to send REQUESTRECOVERY command to "+replicaUrl+
"; will try for a max of "+(maxTries * (waitBetweenTriesMs/1000))+" secs");
RequestRecovery recoverRequestCmd = new RequestRecovery();
recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
recoverRequestCmd.setCoreName(coreNeedingRecovery);
while (continueTrying && ++tries < maxTries) {
if (tries > 1) {
log.warn("Asking core "+coreNeedingRecovery+" on " + recoveryUrl +
" to recover; unsuccessful after "+tries+" of "+maxTries+" attempts so far ...");
} else {
log.info("Asking core "+coreNeedingRecovery+" on " + recoveryUrl + " to recover");
}
HttpSolrServer server = new HttpSolrServer(recoveryUrl);
try {
server.setSoTimeout(60000);
server.setConnectionTimeout(15000);
try {
server.request(recoverRequestCmd);
log.info("Successfully sent "+CoreAdminAction.REQUESTRECOVERY+
" command to core "+coreNeedingRecovery+" on "+recoveryUrl);
continueTrying = false; // succeeded, so stop looping
} catch (Throwable t) {
Throwable rootCause = SolrException.getRootCause(t);
boolean wasCommError =
(rootCause instanceof ConnectException ||
rootCause instanceof ConnectTimeoutException ||
rootCause instanceof NoHttpResponseException ||
rootCause instanceof SocketException);
SolrException.log(log, recoveryUrl + ": Could not tell a replica to recover", t);
if (!wasCommError) {
continueTrying = false;
}
}
} finally {
server.shutdown();
}
// wait a few seconds
if (continueTrying) {
try {
Thread.sleep(waitBetweenTriesMs);
} catch (InterruptedException ignoreMe) {
Thread.currentThread().interrupt();
}
if (coreContainer.isShutDown()) {
log.warn("Stop trying to send recovery command to downed replica "+coreNeedingRecovery+
" on "+replicaNodeName+" because my core container is shutdown.");
continueTrying = false;
break;
}
// see if the replica's node is still live, if not, no need to keep doing this loop
ZkStateReader zkStateReader = zkController.getZkStateReader();
try {
zkStateReader.updateClusterState(true);
} catch (Exception exc) {
log.warn("Error when updating cluster state: "+exc);
}
if (!zkStateReader.getClusterState().liveNodesContain(replicaNodeName)) {
log.warn("Node "+replicaNodeName+" hosting core "+coreNeedingRecovery+
" is no longer live. No need to keep trying to tell it to recover!");
continueTrying = false;
break;
}
// additional safeguard against the replica trying to be in the active state
// before acknowledging the leader initiated recovery command
if (continueTrying && collection != null && shardId != null) {
try {
// call out to ZooKeeper to get the leader-initiated recovery state
String lirState =
zkController.getLeaderInitiatedRecoveryState(collection, shardId, coreNeedingRecovery);
if (lirState == null) {
log.warn("Stop trying to send recovery command to downed replica "+coreNeedingRecovery+
" on "+replicaNodeName+" because the znode no longer exists.");
continueTrying = false;
break;
}
if (ZkStateReader.RECOVERING.equals(lirState)) {
// replica has ack'd leader initiated recovery and entered the recovering state
// so we don't need to keep looping to send the command
continueTrying = false;
log.info("Replica "+coreNeedingRecovery+
" on node "+replicaNodeName+" ack'd the leader initiated recovery state, "
+ "no need to keep trying to send recovery command");
} else {
String leaderCoreNodeName = zkStateReader.getLeaderRetry(collection, shardId, 5000).getName();
List<ZkCoreNodeProps> replicaProps =
zkStateReader.getReplicaProps(collection, shardId, leaderCoreNodeName, coreNeedingRecovery, null, null);
if (replicaProps != null && replicaProps.size() > 0) {
String replicaState = replicaProps.get(0).getState();
if (ZkStateReader.ACTIVE.equals(replicaState)) {
// replica published its state as "active",
// which is bad if lirState is still "down"
if (ZkStateReader.DOWN.equals(lirState)) {
// OK, so the replica thinks it is active, but it never ack'd the leader initiated recovery
// so its state cannot be trusted and it needs to be told to recover again ... and we keep looping here
log.warn("Replica "+coreNeedingRecovery+" set to active but the leader thinks it should be in recovery;"
+ " forcing it back to down state to re-run the leader-initiated recovery process; props: "+replicaProps.get(0));
zkController.ensureReplicaInLeaderInitiatedRecovery(collection,
shardId, replicaUrl, nodeProps, true); // force republish state to "down"
}
}
}
}
} catch (Exception ignoreMe) {
log.warn("Failed to determine state of "+coreNeedingRecovery+" due to: "+ignoreMe);
// eventually this loop will exhaust max tries and stop so we can just log this for now
}
}
}
}
// replica is no longer in recovery on this node (may be handled on another node)
zkController.removeReplicaFromLeaderInitiatedRecoveryHandling(replicaUrl);
if (continueTrying) {
// ugh! this means the loop timed out before the recovery command could be delivered
// how exotic do we want to get here?
log.error("Timed out after waiting for "+(tries * (waitBetweenTriesMs/1000))+
" secs to send the recovery request to: "+replicaUrl+"; not much more we can do here?");
// TODO: need to raise a JMX event to allow monitoring tools to take over from here
}
}
}

View File

@ -351,6 +351,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
return; return;
} }
log.info("Publishing state of core "+core.getName()+" as recovering, leader is "+leaderUrl+" and I am "+ourUrl);
zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING); zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING);
@ -608,6 +609,9 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
} }
HttpUriRequestResponse mrr = server.httpUriRequest(prepCmd); HttpUriRequestResponse mrr = server.httpUriRequest(prepCmd);
prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest; prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;
log.info("Sending prep recovery command to {}; {}", leaderBaseUrl, prepCmd.toString());
mrr.future.get(); mrr.future.get();
} finally { } finally {
server.shutdown(); server.shutdown();

View File

@ -17,6 +17,28 @@ package org.apache.solr.cloud;
* limitations under the License. * limitations under the License.
*/ */
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.URLEncoder;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.impl.HttpSolrServer;
@ -46,34 +68,13 @@ import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.update.UpdateLog; import org.apache.solr.update.UpdateLog;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.URLEncoder;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/** /**
* Handle ZooKeeper interactions. * Handle ZooKeeper interactions.
* *
@ -174,6 +175,9 @@ public final class ZkController {
private volatile boolean isClosed; private volatile boolean isClosed;
// keeps track of replicas that have been asked to recover by leaders running on this node
private Map<String,String> replicasInLeaderInitiatedRecovery = new HashMap<String,String>();
public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort, public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
String localHostContext, int leaderVoteWait, int leaderConflictResolveWait, boolean genericCoreNodeNames, final CurrentCoreDescriptorProvider registerOnReconnect) String localHostContext, int leaderVoteWait, int leaderConflictResolveWait, boolean genericCoreNodeNames, final CurrentCoreDescriptorProvider registerOnReconnect)
throws InterruptedException, TimeoutException, IOException throws InterruptedException, TimeoutException, IOException
@ -807,6 +811,7 @@ public final class ZkController {
Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler() Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler()
.getUpdateLog().recoverFromLog(); .getUpdateLog().recoverFromLog();
if (recoveryFuture != null) { if (recoveryFuture != null) {
log.info("Replaying tlog for "+ourUrl+" during startup... NOTE: This can take a while.");
recoveryFuture.get(); // NOTE: this could potentially block for recoveryFuture.get(); // NOTE: this could potentially block for
// minutes or more! // minutes or more!
// TODO: public as recovering in the mean time? // TODO: public as recovering in the mean time?
@ -981,6 +986,14 @@ public final class ZkController {
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor()); core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
return true; return true;
} }
// see if the leader told us to recover
String lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreName);
if (ZkStateReader.DOWN.equals(lirState)) {
log.info("Leader marked core "+core.getName()+" down; starting recovery process");
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
return true;
}
} else { } else {
log.info("I am the leader, no recovery necessary"); log.info("I am the leader, no recovery necessary");
} }
@ -1023,7 +1036,30 @@ public final class ZkController {
assert collection != null && collection.length() > 0; assert collection != null && collection.length() > 0;
String shardId = cd.getCloudDescriptor().getShardId();
String coreNodeName = cd.getCloudDescriptor().getCoreNodeName(); String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
// If the leader initiated recovery, then verify that this replica has performed
// recovery as requested before becoming active; don't even look at lirState if going down
if (!ZkStateReader.DOWN.equals(state)) {
String lirState = getLeaderInitiatedRecoveryState(collection, shardId, cd.getName());
if (lirState != null) {
if ("active".equals(state)) {
// trying to become active, so leader-initiated state must be recovering
if (ZkStateReader.RECOVERING.equals(lirState)) {
updateLeaderInitiatedRecoveryState(collection, shardId, cd.getName(), ZkStateReader.ACTIVE);
} else if (ZkStateReader.DOWN.equals(lirState)) {
throw new SolrException(ErrorCode.INVALID_STATE,
"Cannot publish state of core '"+cd.getName()+"' as active without recovering first!");
}
} else if (ZkStateReader.RECOVERING.equals(state)) {
// if it is currently DOWN, then trying to enter into recovering state is good
if (ZkStateReader.DOWN.equals(lirState)) {
updateLeaderInitiatedRecoveryState(collection, shardId, cd.getName(), ZkStateReader.RECOVERING);
}
}
}
}
//assert cd.getCloudDescriptor().getShardId() != null; //assert cd.getCloudDescriptor().getShardId() != null;
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state", ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
ZkStateReader.STATE_PROP, state, ZkStateReader.STATE_PROP, state,
@ -1450,49 +1486,69 @@ public final class ZkController {
String leaderBaseUrl = leaderProps.getBaseUrl(); String leaderBaseUrl = leaderProps.getBaseUrl();
String leaderCoreName = leaderProps.getCoreName(); String leaderCoreName = leaderProps.getCoreName();
String ourUrl = ZkCoreNodeProps.getCoreUrl(getBaseUrl(), String myCoreName = descriptor.getName();
descriptor.getName()); String ourUrl = ZkCoreNodeProps.getCoreUrl(getBaseUrl(), myCoreName);
boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl); boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl);
if (!isLeader && !SKIP_AUTO_RECOVERY) { if (!isLeader && !SKIP_AUTO_RECOVERY) {
HttpSolrServer server = null;
server = new HttpSolrServer(leaderBaseUrl);
try {
server.setConnectionTimeout(15000);
server.setSoTimeout(120000);
WaitForState prepCmd = new WaitForState();
prepCmd.setCoreName(leaderCoreName);
prepCmd.setNodeName(getNodeName());
prepCmd.setCoreNodeName(coreZkNodeName);
prepCmd.setState(ZkStateReader.DOWN);
// let's retry a couple times - perhaps the leader just went down, // detect if this core is in leader-initiated recovery and if so,
// or perhaps he is just not quite ready for us yet // then we don't need the leader to wait on seeing the down state
retries = 6; String lirState = null;
for (int i = 0; i < retries; i++) { try {
if (isClosed) { lirState = getLeaderInitiatedRecoveryState(collection, shard, myCoreName);
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, } catch (Exception exc) {
"We have been closed"); log.error("Failed to determine if replica "+myCoreName+
} " is in leader-initiated recovery due to: "+exc, exc);
try { }
server.request(prepCmd);
break; if (lirState != null) {
} catch (Exception e) { log.info("Replica "+myCoreName+
SolrException.log(log, " is already in leader-initiated recovery, so not waiting for leader to see down state.");
"There was a problem making a request to the leader", e); } else {
log.info("Replica "+myCoreName+
" NOT in leader-initiated recovery, need to wait for leader to see down state.");
HttpSolrServer server = null;
server = new HttpSolrServer(leaderBaseUrl);
try {
server.setConnectionTimeout(15000);
server.setSoTimeout(120000);
WaitForState prepCmd = new WaitForState();
prepCmd.setCoreName(leaderCoreName);
prepCmd.setNodeName(getNodeName());
prepCmd.setCoreNodeName(coreZkNodeName);
prepCmd.setState(ZkStateReader.DOWN);
// let's retry a couple times - perhaps the leader just went down,
// or perhaps he is just not quite ready for us yet
retries = 6;
for (int i = 0; i < retries; i++) {
if (isClosed) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
"We have been closed");
}
try { try {
Thread.sleep(2000); server.request(prepCmd);
} catch (InterruptedException e1) { break;
Thread.currentThread().interrupt(); } catch (Exception e) {
} SolrException.log(log,
if (i == retries - 1) { "There was a problem making a request to the leader", e);
throw new SolrException(ErrorCode.SERVER_ERROR, try {
"There was a problem making a request to the leader"); Thread.sleep(2000);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
if (i == retries - 1) {
throw new SolrException(ErrorCode.SERVER_ERROR,
"There was a problem making a request to the leader");
}
} }
} }
} finally {
server.shutdown();
} }
} finally {
server.shutdown();
} }
} }
return leaderProps; return leaderProps;
@ -1670,4 +1726,160 @@ public final class ZkController {
return cc; return cc;
} }
/**
* When a leader receives a communication error when trying to send a request to a replica,
* it calls this method to ensure the replica enters recovery when connectivity is restored.
*
* returns true if the node hosting the replica is still considered "live" by ZooKeeper;
* false means the node is not live either, so no point in trying to send recovery commands
* to it.
*/
public boolean ensureReplicaInLeaderInitiatedRecovery(final String collection,
final String shardId, final String replicaUrl, final ZkCoreNodeProps replicaCoreProps, boolean forcePublishState)
throws KeeperException, InterruptedException
{
// First, determine if this replica is already in recovery handling
// which is needed because there can be many concurrent errors flooding in
// about the same replica having trouble and we only need to send the "needs"
// recovery signal once
boolean nodeIsLive = true;
synchronized (replicasInLeaderInitiatedRecovery) {
if (replicasInLeaderInitiatedRecovery.containsKey(replicaUrl)) {
if (!forcePublishState) {
return false; // already in this recovery process
}
}
// if the replica's state is not DOWN right now, make it so ...
String replicaNodeName = replicaCoreProps.getNodeName();
String replicaCoreName = replicaCoreProps.getCoreName();
assert replicaCoreName != null : "No core name for replica "+replicaNodeName;
// we only really need to try to send the recovery command if the node itself is "live"
if (getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) {
replicasInLeaderInitiatedRecovery.put(replicaUrl,
getLeaderInitiatedRecoveryZnodePath(collection, shardId, replicaCoreName));
// create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreName, ZkStateReader.DOWN);
} else {
nodeIsLive = false; // we really don't need to send the recovery request if the node is NOT live
}
}
String replicaCoreName = replicaCoreProps.getCoreName();
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
ZkStateReader.BASE_URL_PROP, replicaCoreProps.getBaseUrl(),
ZkStateReader.CORE_NAME_PROP, replicaCoreProps.getCoreName(),
ZkStateReader.NODE_NAME_PROP, replicaCoreProps.getNodeName(),
ZkStateReader.SHARD_ID_PROP, shardId,
ZkStateReader.COLLECTION_PROP, collection);
log.warn("Leader is publishing core={} state={} on behalf of un-reachable replica {}; forcePublishState? "+forcePublishState,
replicaCoreName, ZkStateReader.DOWN, replicaUrl);
overseerJobQueue.offer(ZkStateReader.toJSON(m));
return nodeIsLive;
}
public boolean isReplicaInRecoveryHandling(String replicaUrl) {
boolean exists = false;
synchronized (replicasInLeaderInitiatedRecovery) {
exists = replicasInLeaderInitiatedRecovery.containsKey(replicaUrl);
}
return exists;
}
public void removeReplicaFromLeaderInitiatedRecoveryHandling(String replicaUrl) {
synchronized(replicasInLeaderInitiatedRecovery) {
replicasInLeaderInitiatedRecovery.remove(replicaUrl);
}
}
public String getLeaderInitiatedRecoveryState(String collection, String shardId, String coreName) {
if (collection == null || shardId == null || coreName == null)
return null; // if we don't have complete data about a core in cloud mode, return null
String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreName);
String state = null;
try {
byte[] data = zkClient.getData(znodePath, null, new Stat(), false);
if (data != null && data.length > 0)
state = new String(data, "UTF-8");
} catch (NoNodeException ignoreMe) {
// safe to ignore as this znode will only exist if the leader initiated recovery
} catch (ConnectionLossException cle) {
// sort of safe to ignore ??? Usually these are seen when the core is going down
// or there are bigger issues to deal with than reading this znode
log.warn("Unable to read "+znodePath+" due to: "+cle);
} catch (SessionExpiredException see) {
// sort of safe to ignore ??? Usually these are seen when the core is going down
// or there are bigger issues to deal with than reading this znode
log.warn("Unable to read "+znodePath+" due to: "+see);
} catch (UnsupportedEncodingException e) {
throw new Error("JVM Does not seem to support UTF-8", e);
} catch (Exception exc) {
log.error("Failed to read data from znode "+znodePath+" due to: "+exc);
if (exc instanceof SolrException) {
throw (SolrException)exc;
} else {
throw new SolrException(ErrorCode.SERVER_ERROR,
"Failed to read data from znodePath: "+znodePath, exc);
}
}
return state;
}
private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreName, String state) {
if (collection == null || shardId == null || coreName == null) {
log.warn("Cannot set leader-initiated recovery state znode to "+state+" using: collection="+collection+
"; shardId="+shardId+"; coreName="+coreName);
return; // if we don't have complete data about a core in cloud mode, do nothing
}
String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreName);
if (ZkStateReader.ACTIVE.equals(state)) {
// since we're marking it active, we don't need this znode anymore, so delete instead of update
try {
zkClient.delete(znodePath, -1, false);
} catch (Exception justLogIt) {
log.warn("Failed to delete znode "+znodePath+" due to: "+justLogIt);
}
return;
}
byte[] znodeData = null;
try {
znodeData = state.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new Error("JVM Does not seem to support UTF-8", e);
}
boolean retryOnConnLoss = true; // be a little more robust when trying to write data
try {
if (zkClient.exists(znodePath, retryOnConnLoss)) {
zkClient.setData(znodePath, znodeData, retryOnConnLoss);
} else {
zkClient.makePath(znodePath, znodeData, retryOnConnLoss);
}
log.info("Wrote "+state+" to "+znodePath);
} catch (Exception exc) {
if (exc instanceof SolrException) {
throw (SolrException)exc;
} else {
throw new SolrException(ErrorCode.SERVER_ERROR,
"Failed to update data to "+state+" for znode: "+znodePath, exc);
}
}
}
public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId) {
return "/collections/"+collection+"/leader_initiated_recovery/"+shardId;
}
public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId, String coreName) {
return getLeaderInitiatedRecoveryZnodePath(collection, shardId)+"/"+coreName;
}
} }

View File

@ -845,7 +845,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
protected void handleRequestRecoveryAction(SolrQueryRequest req, protected void handleRequestRecoveryAction(SolrQueryRequest req,
SolrQueryResponse rsp) throws IOException { SolrQueryResponse rsp) throws IOException {
final SolrParams params = req.getParams(); final SolrParams params = req.getParams();
log.info("It has been requested that we recover"); log.info("It has been requested that we recover: core="+params.get(CoreAdminParams.CORE));
Thread thread = new Thread() { Thread thread = new Thread() {
@Override @Override
public void run() { public void run() {
@ -958,7 +958,8 @@ public class CoreAdminHandler extends RequestHandlerBase {
Boolean onlyIfLeaderActive = params.getBool("onlyIfLeaderActive"); Boolean onlyIfLeaderActive = params.getBool("onlyIfLeaderActive");
log.info("Going to wait for coreNodeName: " + coreNodeName + ", state: " + waitForState log.info("Going to wait for coreNodeName: " + coreNodeName + ", state: " + waitForState
+ ", checkLive: " + checkLive + ", onlyIfLeader: " + onlyIfLeader); + ", checkLive: " + checkLive + ", onlyIfLeader: " + onlyIfLeader
+ ", onlyIfLeaderActive: "+onlyIfLeaderActive);
int maxTries = 0; int maxTries = 0;
String state = null; String state = null;
@ -1015,9 +1016,27 @@ public class CoreAdminHandler extends RequestHandlerBase {
String localState = cloudDescriptor.getLastPublished(); String localState = cloudDescriptor.getLastPublished();
boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive && (localState == null || !localState.equals(ZkStateReader.ACTIVE)); // TODO: This is funky but I've seen this in testing where the replica asks the
// leader to be in recovery? Need to track down how that happens ... in the meantime,
// this is a safeguard
boolean leaderDoesNotNeedRecovery = (onlyIfLeader != null &&
onlyIfLeader &&
core.getName().equals(nodeProps.getStr("core")) &&
ZkStateReader.RECOVERING.equals(waitForState) &&
ZkStateReader.ACTIVE.equals(localState) &&
ZkStateReader.ACTIVE.equals(state));
if (!onlyIfActiveCheckResult && nodeProps != null && state.equals(waitForState)) { if (leaderDoesNotNeedRecovery) {
log.warn("Leader "+core.getName()+" ignoring request to be in the recovering state because it is live and active.");
}
boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive && (localState == null || !localState.equals(ZkStateReader.ACTIVE));
log.info("In WaitForState("+waitForState+"): collection="+collection+", shard="+slice.getName()+
", isLeader? "+core.getCoreDescriptor().getCloudDescriptor().isLeader()+
", live="+live+", currentState="+state+", localState="+localState+", nodeName="+nodeName+
", coreNodeName="+coreNodeName+", onlyIfActiveCheckResult="+onlyIfActiveCheckResult+", nodeProps: "+nodeProps);
if (!onlyIfActiveCheckResult && nodeProps != null && (state.equals(waitForState) || leaderDoesNotNeedRecovery)) {
if (checkLive == null) { if (checkLive == null) {
break; break;
} else if (checkLive && live) { } else if (checkLive && live) {
@ -1040,7 +1059,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
collection = cloudDescriptor.getCollectionName(); collection = cloudDescriptor.getCollectionName();
shardId = cloudDescriptor.getShardId(); shardId = cloudDescriptor.getShardId();
leaderInfo = coreContainer.getZkController(). leaderInfo = coreContainer.getZkController().
getZkStateReader().getLeaderUrl(collection, shardId, 0); getZkStateReader().getLeaderUrl(collection, shardId, 5000);
} catch (Exception exc) { } catch (Exception exc) {
leaderInfo = "Not available due to: " + exc; leaderInfo = "Not available due to: " + exc;
} }

View File

@ -230,6 +230,7 @@ public class SolrCmdDistributor {
+ req.node.getUrl() + " retry:" + req.node.getUrl() + " retry:"
+ req.retries + " " + req.cmdString + " params:" + req.uReq.getParams()); + req.retries + " " + req.cmdString + " params:" + req.uReq.getParams());
} }
try { try {
SolrServer solrServer = servers.getSolrServer(req); SolrServer solrServer = servers.getSolrServer(req);
NamedList<Object> rsp = solrServer.request(req.uReq); NamedList<Object> rsp = solrServer.request(req.uReq);
@ -258,6 +259,13 @@ public class SolrCmdDistributor {
this.synchronous = synchronous; this.synchronous = synchronous;
this.cmdString = cmdString; this.cmdString = cmdString;
} }
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("SolrCmdDistributor$Req: cmd=").append(String.valueOf(cmdString));
sb.append("; node=").append(String.valueOf(node));
return sb.toString();
}
} }
@ -272,6 +280,14 @@ public class SolrCmdDistributor {
public Exception e; public Exception e;
public int statusCode = -1; public int statusCode = -1;
public Req req; public Req req;
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("SolrCmdDistributor$Error: statusCode=").append(statusCode);
sb.append("; exception=").append(String.valueOf(e));
sb.append("; req=").append(String.valueOf(req));
return sb.toString();
}
} }
public static abstract class Node { public static abstract class Node {
@ -284,9 +300,25 @@ public class SolrCmdDistributor {
public static class StdNode extends Node { public static class StdNode extends Node {
protected ZkCoreNodeProps nodeProps; protected ZkCoreNodeProps nodeProps;
protected String collection;
protected String shardId;
public StdNode(ZkCoreNodeProps nodeProps) { public StdNode(ZkCoreNodeProps nodeProps) {
this(nodeProps, null, null);
}
public StdNode(ZkCoreNodeProps nodeProps, String collection, String shardId) {
this.nodeProps = nodeProps; this.nodeProps = nodeProps;
this.collection = collection;
this.shardId = shardId;
}
public String getCollection() {
return collection;
}
public String getShardId() {
return shardId;
} }
@Override @Override
@ -359,11 +391,9 @@ public class SolrCmdDistributor {
public static class RetryNode extends StdNode { public static class RetryNode extends StdNode {
private ZkStateReader zkStateReader; private ZkStateReader zkStateReader;
private String collection;
private String shardId;
public RetryNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId) { public RetryNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId) {
super(nodeProps); super(nodeProps, collection, shardId);
this.zkStateReader = zkStateReader; this.zkStateReader = zkStateReader;
this.collection = collection; this.collection = collection;
this.shardId = shardId; this.shardId = shardId;

View File

@ -20,6 +20,8 @@ package org.apache.solr.update.processor;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -33,12 +35,15 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.http.NoHttpResponseException;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef; import org.apache.lucene.util.CharsRef;
import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery; import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
import org.apache.solr.cloud.CloudDescriptor; import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.DistributedQueue; import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.LeaderInitiatedRecoveryThread;
import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController; import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
@ -64,7 +69,9 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams; import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.Hash; import org.apache.solr.common.util.Hash;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor; import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.RealTimeGetComponent; import org.apache.solr.handler.component.RealTimeGetComponent;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo; import org.apache.solr.request.SolrRequestInfo;
@ -299,10 +306,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
boolean skip = skipListSet.contains(props.getCoreUrl()); boolean skip = skipListSet.contains(props.getCoreUrl());
log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:" + skip); log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:" + skip);
if (!skip) { if (!skip) {
nodes.add(new StdNode(props)); nodes.add(new StdNode(props, collection, shardId));
} }
} else { } else {
nodes.add(new StdNode(props)); nodes.add(new StdNode(props, collection, shardId));
} }
} }
} }
@ -375,7 +382,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (sliceLeader != null && zkController.getClusterState().liveNodesContain(sliceLeader.getNodeName())) { if (sliceLeader != null && zkController.getClusterState().liveNodesContain(sliceLeader.getNodeName())) {
if (nodes == null) nodes = new ArrayList<>(); if (nodes == null) nodes = new ArrayList<>();
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(sliceLeader); ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(sliceLeader);
nodes.add(new StdNode(nodeProps)); nodes.add(new StdNode(nodeProps, coll.getName(), shardId));
} }
} }
} }
@ -471,6 +478,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
String from = req.getParams().get(DISTRIB_FROM); String from = req.getParams().get(DISTRIB_FROM);
ClusterState clusterState = zkController.getClusterState(); ClusterState clusterState = zkController.getClusterState();
CloudDescriptor cloudDescriptor = req.getCore().getCoreDescriptor().getCloudDescriptor(); CloudDescriptor cloudDescriptor = req.getCore().getCoreDescriptor().getCloudDescriptor();
Slice mySlice = clusterState.getSlice(collection, cloudDescriptor.getShardId()); Slice mySlice = clusterState.getSlice(collection, cloudDescriptor.getShardId());
boolean localIsLeader = cloudDescriptor.isLeader(); boolean localIsLeader = cloudDescriptor.isLeader();
@ -528,7 +536,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (replicaProps != null) { if (replicaProps != null) {
nodes = new ArrayList<>(replicaProps.size()); nodes = new ArrayList<>(replicaProps.size());
for (ZkCoreNodeProps props : replicaProps) { for (ZkCoreNodeProps props : replicaProps) {
nodes.add(new StdNode(props)); nodes.add(new StdNode(props, collection, shardId));
} }
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -582,6 +590,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName())); zkController.getBaseUrl(), req.getCore().getName()));
params.set(DISTRIB_FROM_COLLECTION, req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName()); params.set(DISTRIB_FROM_COLLECTION, req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName());
params.set(DISTRIB_FROM_SHARD, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); params.set(DISTRIB_FROM_SHARD, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
for (Node nodesByRoutingRule : nodesByRoutingRules) { for (Node nodesByRoutingRule : nodesByRoutingRules) {
@ -600,7 +609,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
DistribPhase.TOLEADER.toString())); DistribPhase.TOLEADER.toString()));
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName())); zkController.getBaseUrl(), req.getCore().getName()));
cmdDistrib.distribAdd(cmd, nodes, params); cmdDistrib.distribAdd(cmd, nodes, params);
} }
@ -655,46 +663,66 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// legit // legit
for (final SolrCmdDistributor.Error error : errors) { for (final SolrCmdDistributor.Error error : errors) {
if (error.req.node instanceof RetryNode) { if (error.req.node instanceof RetryNode) {
// we don't try to force a leader to recover // we don't try to force a leader to recover
// when we cannot forward to it // when we cannot forward to it
continue; continue;
} }
// TODO: we should force their state to recovering ??
// TODO: do retries??
// TODO: what if its is already recovering? Right now recoveries queue up -
// should they?
final String recoveryUrl = error.req.node.getBaseUrl();
Thread thread = new Thread() { DistribPhase phase =
{ DistribPhase.parseParam(error.req.uReq.getParams().get(DISTRIB_UPDATE_PARAM));
setDaemon(true); if (phase != DistribPhase.FROMLEADER)
continue; // don't have non-leaders try to recovery other nodes
final String replicaUrl = error.req.node.getUrl();
int maxTries = 1;
boolean sendRecoveryCommand = true;
String collection = null;
String shardId = null;
if (error.req.node instanceof StdNode) {
StdNode stdNode = (StdNode)error.req.node;
collection = stdNode.getCollection();
shardId = stdNode.getShardId();
try {
// if false, then the node is probably not "live" anymore
sendRecoveryCommand =
zkController.ensureReplicaInLeaderInitiatedRecovery(collection,
shardId,
replicaUrl,
stdNode.getNodeProps(),
false);
// we want to try more than once, ~10 minutes
if (sendRecoveryCommand) {
maxTries = 120;
} // else the node is no longer "live" so no need to send any recovery command
} catch (Exception e) {
log.error("Leader failed to set replica "+
error.req.node.getUrl()+" state to DOWN due to: "+e, e);
} }
@Override } // else not a StdNode, recovery command still gets sent once
public void run() {
log.info("try and ask " + recoveryUrl + " to recover");
HttpSolrServer server = new HttpSolrServer(recoveryUrl);
try {
server.setSoTimeout(60000);
server.setConnectionTimeout(15000);
RequestRecovery recoverRequestCmd = new RequestRecovery(); if (!sendRecoveryCommand)
recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY); continue; // the replica is already in recovery handling or is not live
recoverRequestCmd.setCoreName(error.req.node.getCoreName());
try {
server.request(recoverRequestCmd);
} catch (Throwable t) {
SolrException.log(log, recoveryUrl
+ ": Could not tell a replica to recover", t);
}
} finally {
server.shutdown();
}
}
};
ExecutorService executor = req.getCore().getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getUpdateExecutor();
executor.execute(thread);
Throwable rootCause = SolrException.getRootCause(error.e);
log.error("Setting up to try to start recovery on replica "+replicaUrl+" after: "+rootCause);
// try to send the recovery command to the downed replica in a background thread
CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer();
LeaderInitiatedRecoveryThread lirThread =
new LeaderInitiatedRecoveryThread(zkController,
coreContainer,
collection,
shardId,
error.req.node.getNodeProps(),
maxTries);
ExecutorService executor = coreContainer.getUpdateShardHandler().getUpdateExecutor();
executor.execute(lirThread);
} }
} }
@ -1151,7 +1179,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// don't forward to ourself // don't forward to ourself
leaderForAnyShard = true; leaderForAnyShard = true;
} else { } else {
leaders.add(new StdNode(coreLeaderProps)); leaders.add(new StdNode(coreLeaderProps, collection, sliceName));
} }
} }
@ -1254,7 +1282,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (replicaProps != null) { if (replicaProps != null) {
List<Node> myReplicas = new ArrayList<>(); List<Node> myReplicas = new ArrayList<>();
for (ZkCoreNodeProps replicaProp : replicaProps) { for (ZkCoreNodeProps replicaProp : replicaProps) {
myReplicas.add(new StdNode(replicaProp)); myReplicas.add(new StdNode(replicaProp, collection, myShardId));
} }
cmdDistrib.distribDelete(cmd, myReplicas, params); cmdDistrib.distribDelete(cmd, myReplicas, params);
someReplicas = true; someReplicas = true;
@ -1521,7 +1549,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
for (Entry<String,Replica> entry : shardMap.entrySet()) { for (Entry<String,Replica> entry : shardMap.entrySet()) {
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue()); ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
if (clusterState.liveNodesContain(nodeProps.getNodeName())) { if (clusterState.liveNodesContain(nodeProps.getNodeName())) {
urls.add(new StdNode(nodeProps)); urls.add(new StdNode(nodeProps, collection, replicas.getName()));
} }
} }
} }

View File

@ -0,0 +1,545 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.File;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.JSONTestUtil;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Simulates HTTP partitions between a leader and replica but the replica does
* not lose its ZooKeeper connection.
*/
@Slow
@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
private static final transient Logger log =
LoggerFactory.getLogger(HttpPartitionTest.class);
// To prevent the test assertions firing too fast before cluster state
// recognizes (and propagates) partitions
private static final long sleepMsBeforeHealPartition = 1000L;
private Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>();
private AtomicInteger portCounter = new AtomicInteger(0);
private int basePort = 49900;
public HttpPartitionTest() {
super();
sliceCount = 2;
shardCount = 2;
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
System.setProperty("numShards", Integer.toString(sliceCount));
}
@Override
@After
public void tearDown() throws Exception {
if (!proxies.isEmpty()) {
for (SocketProxy proxy : proxies.values()) {
proxy.close();
}
}
System.clearProperty("numShards");
try {
super.tearDown();
} catch (Exception exc) {}
resetExceptionIgnores();
}
/**
* Overrides the parent implementation so that we can configure a socket proxy
* to sit infront of each Jetty server, which gives us the ability to simulate
* network partitions without having to fuss with IPTables (which is not very
* cross platform friendly).
*/
@Override
public JettySolrRunner createJetty(File solrHome, String dataDir,
String shardList, String solrConfigOverride, String schemaOverride)
throws Exception {
int jettyPort = basePort + portCounter.incrementAndGet();
JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context,
jettyPort, solrConfigOverride, schemaOverride, false,
getExtraServlets(), sslConfig, getExtraRequestFilters());
jetty.setShards(shardList);
jetty.setDataDir(getDataDir(dataDir));
// setup to proxy Http requests to this server unless it is the control
// server
int proxyPort = basePort + portCounter.incrementAndGet();
jetty.setProxyPort(proxyPort);
jetty.start();
// create a socket proxy for the jetty server ...
SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI());
proxies.put(proxy.getUrl(), proxy);
return jetty;
}
@Override
public void doTest() throws Exception {
waitForThingsToLevelOut(30000);
// test a 1x2 collection
testRf2();
// now do similar for a 1x3 collection while taking 2 replicas on-and-off
// each time
testRf3();
// kill a leader and make sure recovery occurs as expected
testRf3WithLeaderFailover();
}
protected void testRf2() throws Exception {
// create a collection that has 1 shard but 2 replicas
String testCollectionName = "c8n_1x2";
createCollection(testCollectionName, 1, 2, 1);
cloudClient.setDefaultCollection(testCollectionName);
sendDoc(1);
Replica notLeader =
ensureAllReplicasAreActive(testCollectionName, 2, 10).get(0);
// ok, now introduce a network partition between the leader and the replica
SocketProxy proxy = getProxyForReplica(notLeader);
proxy.close();
// indexing during a partition
sendDoc(2);
// Have the partition last at least 1 sec
// While this gives the impression that recovery is timing related, this is
// really only
// to give time for the state to be written to ZK before the test completes.
// In other words,
// without a brief pause, the test finishes so quickly that it doesn't give
// time for the recovery process to kick-in
Thread.sleep(sleepMsBeforeHealPartition);
proxy.reopen();
List<Replica> notLeaders =
ensureAllReplicasAreActive(testCollectionName, 2, 20); // shouldn't take 20 secs but just to be safe
sendDoc(3);
// sent 3 docs in so far, verify they are on the leader and replica
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 3);
// now up the stakes and do more docs
int numDocs = 1000;
boolean hasPartition = false;
for (int d = 0; d < numDocs; d++) {
// create / restore partition every 100 docs
if (d % 100 == 0) {
if (hasPartition) {
proxy.reopen();
hasPartition = false;
} else {
if (d >= 100) {
proxy.close();
hasPartition = true;
Thread.sleep(sleepMsBeforeHealPartition);
}
}
}
sendDoc(d + 4); // 4 is offset as we've already indexed 1-3
}
// restore connectivity if lost
if (hasPartition) {
proxy.reopen();
}
notLeaders = ensureAllReplicasAreActive(testCollectionName, 2, 20);
// verify all docs received
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, numDocs + 3);
}
protected void testRf3() throws Exception {
// create a collection that has 1 shard but 2 replicas
String testCollectionName = "c8n_1x3";
createCollection(testCollectionName, 1, 3, 1);
cloudClient.setDefaultCollection(testCollectionName);
sendDoc(1);
List<Replica> notLeaders =
ensureAllReplicasAreActive(testCollectionName, 3, 10);
assertTrue("Expected 2 replicas for collection " + testCollectionName
+ " but found " + notLeaders.size() + "; clusterState: "
+ cloudClient.getZkStateReader().getClusterState(),
notLeaders.size() == 2);
sendDoc(1);
// ok, now introduce a network partition between the leader and the replica
SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
proxy0.close();
// indexing during a partition
sendDoc(2);
Thread.sleep(sleepMsBeforeHealPartition);
proxy0.reopen();
SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
proxy1.close();
sendDoc(3);
Thread.sleep(sleepMsBeforeHealPartition);
proxy1.reopen();
// sent 4 docs in so far, verify they are on the leader and replica
notLeaders = ensureAllReplicasAreActive(testCollectionName, 3, 20);
sendDoc(4);
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4);
}
protected void testRf3WithLeaderFailover() throws Exception {
// now let's create a partition in one of the replicas and outright
// kill the leader ... see what happens
// create a collection that has 1 shard but 2 replicas
String testCollectionName = "c8n_1x3_lf"; // _lf is leader fails
createCollection(testCollectionName, 1, 3, 1);
cloudClient.setDefaultCollection(testCollectionName);
sendDoc(1);
List<Replica> notLeaders =
ensureAllReplicasAreActive(testCollectionName, 3, 10);
assertTrue("Expected 2 replicas for collection " + testCollectionName
+ " but found " + notLeaders.size() + "; clusterState: "
+ cloudClient.getZkStateReader().getClusterState(),
notLeaders.size() == 2);
sendDoc(1);
// ok, now introduce a network partition between the leader and the replica
SocketProxy proxy0 = null;
proxy0 = getProxyForReplica(notLeaders.get(0));
proxy0.close();
// indexing during a partition
sendDoc(2);
Thread.sleep(sleepMsBeforeHealPartition);
proxy0.reopen();
SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
proxy1.close();
sendDoc(3);
Thread.sleep(sleepMsBeforeHealPartition);
proxy1.reopen();
// sent 4 docs in so far, verify they are on the leader and replica
notLeaders = ensureAllReplicasAreActive(testCollectionName, 3, 20);
sendDoc(4);
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4);
Replica leader =
cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
String leaderNode = leader.getNodeName();
assertNotNull("Could not find leader for shard1 of "+testCollectionName, leader);
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
// since maxShardsPerNode is 1, we're safe to kill the leader
notLeaders = ensureAllReplicasAreActive(testCollectionName, 3, 20);
proxy0 = getProxyForReplica(notLeaders.get(0));
proxy0.close();
// indexing during a partition
// doc should be on leader and 1 replica
sendDoc(5);
Thread.sleep(sleepMsBeforeHealPartition);
String shouldNotBeNewLeaderNode = notLeaders.get(0).getNodeName();
// kill the leader
leaderJetty.stop();
if (leaderJetty.isRunning())
fail("Failed to stop the leader on "+leaderNode);
SocketProxy oldLeaderProxy = getProxyForReplica(leader);
if (oldLeaderProxy != null) {
oldLeaderProxy.close();
} else {
log.warn("No SocketProxy found for old leader node "+leaderNode);
}
Thread.sleep(sleepMsBeforeHealPartition);
Replica newLeader =
cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 30000);
assertNotNull("No new leader was elected after 30 seconds", newLeader);
assertTrue("Expected node "+shouldNotBeNewLeaderNode+
" to NOT be the new leader b/c it was out-of-sync with the old leader! ClusterState: "+
cloudClient.getZkStateReader().getClusterState(),
!shouldNotBeNewLeaderNode.equals(newLeader.getNodeName()));
proxy0.reopen();
Thread.sleep(10000L);
cloudClient.getZkStateReader().updateClusterState(true);
List<Replica> activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
assertTrue("Expected 2 of 3 replicas to be active but only found "+activeReps.size()+"; "+activeReps, activeReps.size() == 2);
sendDoc(6);
assertDocsExistInAllReplicas(activeReps, testCollectionName, 1, 6);
}
protected List<Replica> getActiveOrRecoveringReplicas(String testCollectionName, String shardId) throws Exception {
Map<String,Replica> activeReplicas = new HashMap<String,Replica>();
ZkStateReader zkr = cloudClient.getZkStateReader();
ClusterState cs = zkr.getClusterState();
cs = zkr.getClusterState();
assertNotNull(cs);
for (Slice shard : cs.getActiveSlices(testCollectionName)) {
if (shard.getName().equals(shardId)) {
for (Replica replica : shard.getReplicas()) {
String replicaState = replica.getStr(ZkStateReader.STATE_PROP);
if (ZkStateReader.ACTIVE.equals(replicaState) || ZkStateReader.RECOVERING.equals(replicaState)) {
activeReplicas.put(replica.getName(), replica);
}
}
}
}
List<Replica> replicas = new ArrayList<Replica>();
replicas.addAll(activeReplicas.values());
return replicas;
}
protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
assertNotNull(replicaBaseUrl);
URL baseUrl = new URL(replicaBaseUrl);
SocketProxy proxy = proxies.get(baseUrl.toURI());
if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
baseUrl = new URL(baseUrl.toExternalForm() + "/");
proxy = proxies.get(baseUrl.toURI());
}
assertNotNull("No proxy found for " + baseUrl + "!", proxy);
return proxy;
}
protected void assertDocsExistInAllReplicas(List<Replica> notLeaders,
String testCollectionName, int firstDocId, int lastDocId)
throws Exception {
Replica leader =
cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 10000);
HttpSolrServer leaderSolr = getHttpSolrServer(leader, testCollectionName);
List<HttpSolrServer> replicas =
new ArrayList<HttpSolrServer>(notLeaders.size());
for (Replica r : notLeaders) {
replicas.add(getHttpSolrServer(r, testCollectionName));
}
try {
for (int d = firstDocId; d <= lastDocId; d++) {
String docId = String.valueOf(d);
assertDocExists(leaderSolr, testCollectionName, docId);
for (HttpSolrServer replicaSolr : replicas) {
assertDocExists(replicaSolr, testCollectionName, docId);
}
}
} finally {
if (leaderSolr != null) {
leaderSolr.shutdown();
}
for (HttpSolrServer replicaSolr : replicas) {
replicaSolr.shutdown();
}
}
}
protected HttpSolrServer getHttpSolrServer(Replica replica, String coll) throws Exception {
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(replica);
String url = zkProps.getBaseUrl() + "/" + coll;
return new HttpSolrServer(url);
}
protected void sendDoc(int docId) throws Exception {
SolrInputDocument doc = new SolrInputDocument();
doc.addField(id, String.valueOf(docId));
doc.addField("a_t", "hello" + docId);
cloudClient.add(doc);
}
protected List<Replica> ensureAllReplicasAreActive(String testCollectionName, int rf, int maxWaitSecs) throws Exception {
long startMs = System.currentTimeMillis();
Map<String,Replica> notLeaders = new HashMap<String,Replica>();
ZkStateReader zkr = cloudClient.getZkStateReader();
ClusterState cs = zkr.getClusterState();
Collection<Slice> slices = cs.getActiveSlices(testCollectionName);
assertTrue(slices.size() == 1); // shards == 1
boolean allReplicasUp = false;
long waitMs = 0L;
long maxWaitMs = maxWaitSecs * 1000L;
Replica leader = null;
while (waitMs < maxWaitMs && !allReplicasUp) {
cs = zkr.getClusterState();
assertNotNull(cs);
for (Slice shard : cs.getActiveSlices(testCollectionName)) {
allReplicasUp = true; // assume true
Collection<Replica> replicas = shard.getReplicas();
assertTrue(replicas.size() == rf);
leader = shard.getLeader();
assertNotNull(leader);
// ensure all replicas are "active" and identify the non-leader replica
for (Replica replica : replicas) {
String replicaState = replica.getStr(ZkStateReader.STATE_PROP);
if (!ZkStateReader.ACTIVE.equals(replicaState)) {
log.info("Replica " + replica.getName() + " is currently " + replicaState);
allReplicasUp = false;
}
if (!leader.equals(replica))
notLeaders.put(replica.getName(), replica);
}
if (!allReplicasUp) {
try {
Thread.sleep(500L);
} catch (Exception ignoreMe) {}
waitMs += 500L;
}
}
} // end while
if (!allReplicasUp)
fail("Didn't see all replicas come up within " + maxWaitMs + " ms! ClusterState: " + cs);
if (notLeaders.isEmpty())
fail("Didn't isolate any replicas that are not the leader! ClusterState: " + cs);
long diffMs = (System.currentTimeMillis() - startMs);
log.info("Took " + diffMs + " ms to see all replicas become active.");
List<Replica> replicas = new ArrayList<Replica>();
replicas.addAll(notLeaders.values());
return replicas;
}
/**
* Query the real-time get handler for a specific doc by ID to verify it
* exists in the provided server.
*/
@SuppressWarnings("rawtypes")
protected void assertDocExists(HttpSolrServer solr, String coll, String docId) throws Exception {
QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId));
NamedList rsp = solr.request(qr);
String match =
JSONTestUtil.matchObj("/id", rsp.get("doc"), new Integer(docId));
assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL()
+ " due to: " + match, match == null);
}
protected JettySolrRunner getJettyOnPort(int port) {
JettySolrRunner theJetty = null;
for (JettySolrRunner jetty : jettys) {
if (port == jetty.getLocalPort()) {
theJetty = jetty;
break;
}
}
if (theJetty == null) {
if (controlJetty.getLocalPort() == port) {
theJetty = controlJetty;
}
}
if (theJetty == null)
fail("Not able to find JettySolrRunner for port: "+port);
return theJetty;
}
protected int getReplicaPort(Replica replica) {
String replicaNode = replica.getNodeName();
String tmp = replicaNode.substring(replicaNode.indexOf(':')+1);
if (tmp.indexOf('_') != -1)
tmp = tmp.substring(0,tmp.indexOf('_'));
return Integer.parseInt(tmp);
}
}

View File

@ -0,0 +1,406 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLServerSocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Kindly borrowed the idea and base implementation from the ActiveMQ project;
* useful for blocking traffic on a specified port.
*/
public class SocketProxy {
private static final transient Logger log = LoggerFactory.getLogger(SocketProxy.class);
public static final int ACCEPT_TIMEOUT_MILLIS = 100;
private URI proxyUrl;
private URI target;
private Acceptor acceptor;
private ServerSocket serverSocket;
private CountDownLatch closed = new CountDownLatch(1);
public List<Bridge> connections = new LinkedList<Bridge>();
private int listenPort = 0;
private int receiveBufferSize = -1;
private boolean pauseAtStart = false;
private int acceptBacklog = 50;
public SocketProxy() throws Exception {}
public SocketProxy(URI uri) throws Exception {
this(0, uri);
}
public SocketProxy(int port, URI uri) throws Exception {
listenPort = port;
target = uri;
open();
}
public String toString() {
return "SocketyProxy: port="+listenPort+"; target="+target;
}
public void setReceiveBufferSize(int receiveBufferSize) {
this.receiveBufferSize = receiveBufferSize;
}
public void setTarget(URI tcpBrokerUri) {
target = tcpBrokerUri;
}
public void open() throws Exception {
serverSocket = createServerSocket(target);
serverSocket.setReuseAddress(true);
if (receiveBufferSize > 0) {
serverSocket.setReceiveBufferSize(receiveBufferSize);
}
if (proxyUrl == null) {
serverSocket.bind(new InetSocketAddress(listenPort), acceptBacklog);
proxyUrl = urlFromSocket(target, serverSocket);
} else {
serverSocket.bind(new InetSocketAddress(proxyUrl.getPort()));
}
acceptor = new Acceptor(serverSocket, target);
if (pauseAtStart) {
acceptor.pause();
}
new Thread(null, acceptor, "SocketProxy-Acceptor-"
+ serverSocket.getLocalPort()).start();
closed = new CountDownLatch(1);
}
private boolean isSsl(URI target) {
return "ssl".equals(target.getScheme());
}
private ServerSocket createServerSocket(URI target) throws Exception {
if (isSsl(target)) {
return SSLServerSocketFactory.getDefault().createServerSocket();
}
return new ServerSocket();
}
private Socket createSocket(URI target) throws Exception {
if (isSsl(target)) {
return SSLSocketFactory.getDefault().createSocket();
}
return new Socket();
}
public URI getUrl() {
return proxyUrl;
}
/*
* close all proxy connections and acceptor
*/
public void close() {
List<Bridge> connections;
synchronized (this.connections) {
connections = new ArrayList<Bridge>(this.connections);
}
log.warn("Closing " + connections.size()+" connections to: "+getUrl());
for (Bridge con : connections) {
closeConnection(con);
}
acceptor.close();
closed.countDown();
}
/*
* close all proxy receive connections, leaving acceptor open
*/
public void halfClose() {
List<Bridge> connections;
synchronized (this.connections) {
connections = new ArrayList<Bridge>(this.connections);
}
log.info("halfClose, numConnections=" + connections.size());
for (Bridge con : connections) {
halfCloseConnection(con);
}
}
public boolean waitUntilClosed(long timeoutSeconds)
throws InterruptedException {
return closed.await(timeoutSeconds, TimeUnit.SECONDS);
}
/*
* called after a close to restart the acceptor on the same port
*/
public void reopen() {
log.info("Re-opening connectivity to "+getUrl());
try {
open();
} catch (Exception e) {
log.debug("exception on reopen url:" + getUrl(), e);
}
}
/*
* pause accepting new connections and data transfer through existing proxy
* connections. All sockets remain open
*/
public void pause() {
synchronized (connections) {
log.info("pause, numConnections=" + connections.size());
acceptor.pause();
for (Bridge con : connections) {
con.pause();
}
}
}
/*
* continue after pause
*/
public void goOn() {
synchronized (connections) {
log.info("goOn, numConnections=" + connections.size());
for (Bridge con : connections) {
con.goOn();
}
}
acceptor.goOn();
}
private void closeConnection(Bridge c) {
try {
c.close();
} catch (Exception e) {
log.debug("exception on close of: " + c, e);
}
}
private void halfCloseConnection(Bridge c) {
try {
c.halfClose();
} catch (Exception e) {
log.debug("exception on half close of: " + c, e);
}
}
public boolean isPauseAtStart() {
return pauseAtStart;
}
public void setPauseAtStart(boolean pauseAtStart) {
this.pauseAtStart = pauseAtStart;
}
public int getAcceptBacklog() {
return acceptBacklog;
}
public void setAcceptBacklog(int acceptBacklog) {
this.acceptBacklog = acceptBacklog;
}
private URI urlFromSocket(URI uri, ServerSocket serverSocket)
throws Exception {
int listenPort = serverSocket.getLocalPort();
return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(),
listenPort, uri.getPath(), uri.getQuery(), uri.getFragment());
}
public class Bridge {
private Socket receiveSocket;
private Socket sendSocket;
private Pump requestThread;
private Pump responseThread;
public Bridge(Socket socket, URI target) throws Exception {
receiveSocket = socket;
sendSocket = createSocket(target);
if (receiveBufferSize > 0) {
sendSocket.setReceiveBufferSize(receiveBufferSize);
}
sendSocket.connect(new InetSocketAddress(target.getHost(), target
.getPort()));
linkWithThreads(receiveSocket, sendSocket);
log.info("proxy connection " + sendSocket + ", receiveBufferSize="
+ sendSocket.getReceiveBufferSize());
}
public void goOn() {
responseThread.goOn();
requestThread.goOn();
}
public void pause() {
requestThread.pause();
responseThread.pause();
}
public void close() throws Exception {
synchronized (connections) {
connections.remove(this);
}
receiveSocket.close();
sendSocket.close();
}
public void halfClose() throws Exception {
receiveSocket.close();
}
private void linkWithThreads(Socket source, Socket dest) {
requestThread = new Pump(source, dest);
requestThread.start();
responseThread = new Pump(dest, source);
responseThread.start();
}
public class Pump extends Thread {
protected Socket src;
private Socket destination;
private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
public Pump(Socket source, Socket dest) {
super("SocketProxy-DataTransfer-" + source.getPort() + ":"
+ dest.getPort());
src = source;
destination = dest;
pause.set(new CountDownLatch(0));
}
public void pause() {
pause.set(new CountDownLatch(1));
}
public void goOn() {
pause.get().countDown();
}
public void run() {
byte[] buf = new byte[1024];
try {
InputStream in = src.getInputStream();
OutputStream out = destination.getOutputStream();
while (true) {
int len = in.read(buf);
if (len == -1) {
log.debug("read eof from:" + src);
break;
}
pause.get().await();
out.write(buf, 0, len);
}
} catch (Exception e) {
log.debug("read/write failed, reason: " + e.getLocalizedMessage());
try {
if (!receiveSocket.isClosed()) {
// for halfClose, on read/write failure if we close the
// remote end will see a close at the same time.
close();
}
} catch (Exception ignore) {}
}
}
}
}
public class Acceptor implements Runnable {
private ServerSocket socket;
private URI target;
private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
public Acceptor(ServerSocket serverSocket, URI uri) {
socket = serverSocket;
target = uri;
pause.set(new CountDownLatch(0));
try {
socket.setSoTimeout(ACCEPT_TIMEOUT_MILLIS);
} catch (SocketException e) {
e.printStackTrace();
}
}
public void pause() {
pause.set(new CountDownLatch(1));
}
public void goOn() {
pause.get().countDown();
}
public void run() {
try {
while (!socket.isClosed()) {
pause.get().await();
try {
Socket source = socket.accept();
pause.get().await();
if (receiveBufferSize > 0) {
source.setReceiveBufferSize(receiveBufferSize);
}
log.info("accepted " + source + ", receiveBufferSize:"
+ source.getReceiveBufferSize());
synchronized (connections) {
connections.add(new Bridge(source, target));
}
} catch (SocketTimeoutException expected) {}
}
} catch (Exception e) {
log.debug("acceptor: finished for reason: " + e.getLocalizedMessage());
}
}
public void close() {
try {
socket.close();
closed.countDown();
goOn();
} catch (IOException ignored) {}
}
}
}

View File

@ -254,6 +254,13 @@ public class CoreAdminRequest extends SolrRequest
return params; return params;
} }
public String toString() {
if (action != null) {
return "WaitForState: "+getParams();
} else {
return super.toString();
}
}
} }
public static class RequestRecovery extends CoreAdminRequest { public static class RequestRecovery extends CoreAdminRequest {