SOLR-7245: Temporary ZK election or connection loss should not stall indexing due to LIR

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1668274 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ramkumar Aiyengar 2015-03-21 14:13:17 +00:00
parent 54ba78e4de
commit f15330c32d
9 changed files with 62 additions and 43 deletions

View File

@ -195,6 +195,8 @@ New Features
the JSON request. the JSON request.
(yonik) (yonik)
* SOLR-7245: Temporary ZK election or connection loss should not stall indexing
due to leader initiated recovery (Ramkumar Aiyengar)
Bug Fixes Bug Fixes
---------------------- ----------------------

View File

@ -402,8 +402,9 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
120, 120,
coreNodeName); coreNodeName);
zkController.ensureReplicaInLeaderInitiatedRecovery( zkController.ensureReplicaInLeaderInitiatedRecovery(
collection, shardId, coreNodeProps, false, coreNodeName); collection, shardId, coreNodeProps, coreNodeName,
false /* forcePublishState */, true /* retryOnConnLoss */);
ExecutorService executor = cc.getUpdateShardHandler().getUpdateExecutor(); ExecutorService executor = cc.getUpdateShardHandler().getUpdateExecutor();
executor.execute(lirThread); executor.execute(lirThread);
} }

View File

@ -226,8 +226,10 @@ public class LeaderInitiatedRecoveryThread extends Thread {
// so its state cannot be trusted and it needs to be told to recover again ... and we keep looping here // so its state cannot be trusted and it needs to be told to recover again ... and we keep looping here
log.warn("Replica core={} coreNodeName={} set to active but the leader thinks it should be in recovery;" log.warn("Replica core={} coreNodeName={} set to active but the leader thinks it should be in recovery;"
+ " forcing it back to down state to re-run the leader-initiated recovery process; props: "+replicaProps.get(0), coreNeedingRecovery, replicaCoreNodeName); + " forcing it back to down state to re-run the leader-initiated recovery process; props: "+replicaProps.get(0), coreNeedingRecovery, replicaCoreNodeName);
// force republish state to "down" zkController.ensureReplicaInLeaderInitiatedRecovery(
zkController.ensureReplicaInLeaderInitiatedRecovery(collection, shardId, nodeProps, true, leaderCoreNodeName); collection, shardId, nodeProps, leaderCoreNodeName,
true /* forcePublishState */, true /* retryOnConnLoss */
);
} }
} }
break; break;

View File

@ -1130,7 +1130,7 @@ public final class ZkController {
if (ZkStateReader.ACTIVE.equals(state)) { if (ZkStateReader.ACTIVE.equals(state)) {
// trying to become active, so leader-initiated state must be recovering // trying to become active, so leader-initiated state must be recovering
if (ZkStateReader.RECOVERING.equals(lirState)) { if (ZkStateReader.RECOVERING.equals(lirState)) {
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.ACTIVE, null); updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.ACTIVE, null, true);
} else if (ZkStateReader.DOWN.equals(lirState)) { } else if (ZkStateReader.DOWN.equals(lirState)) {
throw new SolrException(ErrorCode.INVALID_STATE, throw new SolrException(ErrorCode.INVALID_STATE,
"Cannot publish state of core '"+cd.getName()+"' as active without recovering first!"); "Cannot publish state of core '"+cd.getName()+"' as active without recovering first!");
@ -1138,7 +1138,7 @@ public final class ZkController {
} else if (ZkStateReader.RECOVERING.equals(state)) { } else if (ZkStateReader.RECOVERING.equals(state)) {
// if it is currently DOWN, then trying to enter into recovering state is good // if it is currently DOWN, then trying to enter into recovering state is good
if (ZkStateReader.DOWN.equals(lirState)) { if (ZkStateReader.DOWN.equals(lirState)) {
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.RECOVERING, null); updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.RECOVERING, null, true);
} }
} }
} }
@ -1882,8 +1882,9 @@ public final class ZkController {
* false means the node is not live either, so no point in trying to send recovery commands * false means the node is not live either, so no point in trying to send recovery commands
* to it. * to it.
*/ */
public boolean ensureReplicaInLeaderInitiatedRecovery(final String collection, public boolean ensureReplicaInLeaderInitiatedRecovery(
final String shardId, final ZkCoreNodeProps replicaCoreProps, boolean forcePublishState, String leaderCoreNodeName) final String collection, final String shardId, final ZkCoreNodeProps replicaCoreProps,
String leaderCoreNodeName, boolean forcePublishState, boolean retryOnConnLoss)
throws KeeperException, InterruptedException throws KeeperException, InterruptedException
{ {
final String replicaUrl = replicaCoreProps.getCoreUrl(); final String replicaUrl = replicaCoreProps.getCoreUrl();
@ -1893,10 +1894,10 @@ public final class ZkController {
if (shardId == null) if (shardId == null)
throw new IllegalArgumentException("shard parameter cannot be null for starting leader-initiated recovery for replica: "+replicaUrl); throw new IllegalArgumentException("shard parameter cannot be null for starting leader-initiated recovery for replica: "+replicaUrl);
if (replicaUrl == null) if (replicaUrl == null)
throw new IllegalArgumentException("replicaUrl parameter cannot be null for starting leader-initiated recovery"); throw new IllegalArgumentException("replicaUrl parameter cannot be null for starting leader-initiated recovery");
// First, determine if this replica is already in recovery handling // First, determine if this replica is already in recovery handling
// which is needed because there can be many concurrent errors flooding in // which is needed because there can be many concurrent errors flooding in
// about the same replica having trouble and we only need to send the "needs" // about the same replica having trouble and we only need to send the "needs"
@ -1918,7 +1919,7 @@ public final class ZkController {
// we only really need to try to send the recovery command if the node itself is "live" // we only really need to try to send the recovery command if the node itself is "live"
if (getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) { if (getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) {
// create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync // create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, ZkStateReader.DOWN, leaderCoreNodeName); updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, ZkStateReader.DOWN, leaderCoreNodeName, retryOnConnLoss);
replicasInLeaderInitiatedRecovery.put(replicaUrl, replicasInLeaderInitiatedRecovery.put(replicaUrl,
getLeaderInitiatedRecoveryZnodePath(collection, shardId, replicaCoreNodeName)); getLeaderInitiatedRecoveryZnodePath(collection, shardId, replicaCoreNodeName));
log.info("Put replica core={} coreNodeName={} on "+ log.info("Put replica core={} coreNodeName={} on "+
@ -2015,7 +2016,7 @@ public final class ZkController {
} }
private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName, String state, private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName, String state,
String leaderCoreNodeName) { String leaderCoreNodeName, boolean retryOnConnLoss) {
if (collection == null || shardId == null || coreNodeName == null) { if (collection == null || shardId == null || coreNodeName == null) {
log.warn("Cannot set leader-initiated recovery state znode to "+state+" using: collection="+collection+ log.warn("Cannot set leader-initiated recovery state znode to "+state+" using: collection="+collection+
"; shardId="+shardId+"; coreNodeName="+coreNodeName); "; shardId="+shardId+"; coreNodeName="+coreNodeName);

View File

@ -52,7 +52,6 @@ import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor; import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.handler.component.RealTimeGetComponent; import org.apache.solr.handler.component.RealTimeGetComponent;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.SchemaField; import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.AddUpdateCommand; import org.apache.solr.update.AddUpdateCommand;
@ -147,7 +146,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
this.nodeErrorTracker = new HashMap<>(5); this.nodeErrorTracker = new HashMap<>(5);
this.otherLeaderRf = new HashMap<>(); this.otherLeaderRf = new HashMap<>();
} }
// gives the replication factor that was achieved for this request // gives the replication factor that was achieved for this request
public int getAchievedRf() { public int getAchievedRf() {
// look across all shards to find the minimum achieved replication // look across all shards to find the minimum achieved replication
@ -286,7 +285,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
returnVersions = req.getParams().getBool(UpdateParams.VERSIONS ,false); returnVersions = req.getParams().getBool(UpdateParams.VERSIONS ,false);
// TODO: better way to get the response, or pass back info to it? // TODO: better way to get the response, or pass back info to it?
SolrRequestInfo reqInfo = returnVersions ? SolrRequestInfo.getRequestInfo() : null; // SolrRequestInfo reqInfo = returnVersions ? SolrRequestInfo.getRequestInfo() : null;
this.req = req; this.req = req;
@ -847,11 +846,19 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// before we go setting other replicas to down, make sure we're still the leader! // before we go setting other replicas to down, make sure we're still the leader!
String leaderCoreNodeName = null; String leaderCoreNodeName = null;
Exception getLeaderExc = null;
try { try {
leaderCoreNodeName = zkController.getZkStateReader().getLeaderRetry(collection, shardId).getName(); Replica leader = zkController.getZkStateReader().getLeader(collection, shardId);
if (leader != null) {
leaderCoreNodeName = leader.getName();
}
} catch (Exception exc) { } catch (Exception exc) {
log.error("Failed to determine if " + cloudDesc.getCoreNodeName() + " is still the leader for " + collection + getLeaderExc = exc;
" " + shardId + " before putting " + replicaUrl + " into leader-initiated recovery due to: " + exc); }
if (leaderCoreNodeName == null) {
log.warn("Failed to determine if {} is still the leader for collection={} shardId={} " +
"before putting {} into leader-initiated recovery",
cloudDesc.getCoreNodeName(), collection, shardId, replicaUrl, getLeaderExc);
} }
List<ZkCoreNodeProps> myReplicas = zkController.getZkStateReader().getReplicaProps(collection, List<ZkCoreNodeProps> myReplicas = zkController.getZkStateReader().getReplicaProps(collection,
@ -873,8 +880,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
zkController.ensureReplicaInLeaderInitiatedRecovery(collection, zkController.ensureReplicaInLeaderInitiatedRecovery(collection,
shardId, shardId,
stdNode.getNodeProps(), stdNode.getNodeProps(),
false, leaderCoreNodeName,
leaderCoreNodeName); false /* forcePublishState */,
false /* retryOnConnLoss */
);
// we want to try more than once, ~10 minutes // we want to try more than once, ~10 minutes
if (sendRecoveryCommand) { if (sendRecoveryCommand) {
@ -909,7 +918,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
continue; // the replica is already in recovery handling or is not live continue; // the replica is already in recovery handling or is not live
Throwable rootCause = SolrException.getRootCause(error.e); Throwable rootCause = SolrException.getRootCause(error.e);
log.error("Setting up to try to start recovery on replica " + replicaUrl + " after: " + rootCause); log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
// try to send the recovery command to the downed replica in a background thread // try to send the recovery command to the downed replica in a background thread
CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer(); CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer();
@ -1591,7 +1600,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (!zkEnabled || req.getParams().getBool(COMMIT_END_POINT, false) || singleLeader) { if (!zkEnabled || req.getParams().getBool(COMMIT_END_POINT, false) || singleLeader) {
doLocalCommit(cmd); doLocalCommit(cmd);
} else if (zkEnabled) { } else {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
if (!req.getParams().getBool(COMMIT_END_POINT, false)) { if (!req.getParams().getBool(COMMIT_END_POINT, false)) {
params.set(COMMIT_END_POINT, true); params.set(COMMIT_END_POINT, true);

View File

@ -25,7 +25,6 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrCore;
import org.apache.solr.request.LocalSolrQueryRequest; import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -106,7 +105,10 @@ public class BasicZkTest extends AbstractZkTestCase {
int zkPort = zkServer.getPort(); int zkPort = zkServer.getPort();
zkServer.shutdown(); zkServer.shutdown();
// document indexing shouldn't stop immediately after a ZK disconnect
assertU(adoc("id", "201"));
Thread.sleep(300); Thread.sleep(300);
// try a reconnect from disconnect // try a reconnect from disconnect
@ -174,9 +176,4 @@ public class BasicZkTest extends AbstractZkTestCase {
req.setParams(params); req.setParams(params);
return req; return req;
} }
@AfterClass
public static void afterClass() {
}
} }

View File

@ -17,7 +17,6 @@ package org.apache.solr.cloud;
* limitations under the License. * limitations under the License.
*/ */
import org.apache.http.NoHttpResponseException;
import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.JSONTestUtil; import org.apache.solr.JSONTestUtil;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL; import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
@ -26,7 +25,6 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
@ -146,7 +144,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
String replicaUrl = replicaCoreNodeProps.getCoreUrl(); String replicaUrl = replicaCoreNodeProps.getCoreUrl();
assertTrue(!zkController.isReplicaInRecoveryHandling(replicaUrl)); assertTrue(!zkController.isReplicaInRecoveryHandling(replicaUrl));
assertTrue(zkController.ensureReplicaInLeaderInitiatedRecovery(testCollectionName, shardId, replicaCoreNodeProps, false, leader.getName())); assertTrue(zkController.ensureReplicaInLeaderInitiatedRecovery(testCollectionName, shardId, replicaCoreNodeProps, leader.getName(), false, true));
assertTrue(zkController.isReplicaInRecoveryHandling(replicaUrl)); assertTrue(zkController.isReplicaInRecoveryHandling(replicaUrl));
Map<String,Object> lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName()); Map<String,Object> lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName());
assertNotNull(lirStateMap); assertNotNull(lirStateMap);

View File

@ -278,7 +278,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
try { try {
// this method doesn't throw exception when node isn't leader // this method doesn't throw exception when node isn't leader
zkController.ensureReplicaInLeaderInitiatedRecovery("c1", "shard1", zkController.ensureReplicaInLeaderInitiatedRecovery("c1", "shard1",
new ZkCoreNodeProps(replica), false, "non_existent_leader"); new ZkCoreNodeProps(replica), "non_existent_leader", false, false);
fail("ZkController should not write LIR state for node which is not leader"); fail("ZkController should not write LIR state for node which is not leader");
} catch (Exception e) { } catch (Exception e) {
assertNull("ZkController should not write LIR state for node which is not leader", assertNull("ZkController should not write LIR state for node which is not leader",

View File

@ -102,6 +102,8 @@ public class ZkStateReader implements Closeable {
protected volatile ClusterState clusterState; protected volatile ClusterState clusterState;
private static final long SOLRCLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("solrcloud.update.delay", "5000")); private static final long SOLRCLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("solrcloud.update.delay", "5000"));
private static final int GET_LEADER_RETRY_INTERVAL_MS = 50;
private static final int GET_LEADER_RETRY_DEFAULT_TIMEOUT = 4000;
public static final String LEADER_ELECT_ZKNODE = "leader_elect"; public static final String LEADER_ELECT_ZKNODE = "leader_elect";
@ -642,12 +644,22 @@ public class ZkStateReader implements Closeable {
shard, timeout)); shard, timeout));
return props.getCoreUrl(); return props.getCoreUrl();
} }
public Replica getLeader(String collection, String shard) throws InterruptedException {
if (clusterState != null) {
Replica replica = clusterState.getLeader(collection, shard);
if (replica != null && getClusterState().liveNodesContain(replica.getNodeName())) {
return replica;
}
}
return null;
}
/** /**
* Get shard leader properties, with retry if none exist. * Get shard leader properties, with retry if none exist.
*/ */
public Replica getLeaderRetry(String collection, String shard) throws InterruptedException { public Replica getLeaderRetry(String collection, String shard) throws InterruptedException {
return getLeaderRetry(collection, shard, 4000); return getLeaderRetry(collection, shard, GET_LEADER_RETRY_DEFAULT_TIMEOUT);
} }
/** /**
@ -655,14 +667,11 @@ public class ZkStateReader implements Closeable {
*/ */
public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException { public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException {
long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS); long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
while (System.nanoTime() < timeoutAt && !closed) { while (true) {
if (clusterState != null) { Replica leader = getLeader(collection, shard);
Replica replica = clusterState.getLeader(collection, shard); if (leader != null) return leader;
if (replica != null && getClusterState().liveNodesContain(replica.getNodeName())) { if (System.nanoTime() >= timeoutAt || closed) break;
return replica; Thread.sleep(GET_LEADER_RETRY_INTERVAL_MS);
}
}
Thread.sleep(50);
} }
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "No registered leader was found after waiting for " throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "No registered leader was found after waiting for "
+ timeout + "ms " + ", collection: " + collection + " slice: " + shard); + timeout + "ms " + ", collection: " + collection + " slice: " + shard);