SOLR-12412: Leader should give up leadership when IndexWriter.tragedy occur

This commit is contained in:
Cao Manh Dat 2018-07-09 09:14:23 +07:00
parent 5f5e5dbfb5
commit 1197176110
8 changed files with 286 additions and 0 deletions

View File

@ -168,6 +168,8 @@ Other Changes
* SOLR-12527: factor out a test-framework/ConfigRequest class (Christine Poerschke)
* SOLR-12412: Leader should give up leadership when IndexWriter.tragedy occur (Cao Manh Dat, Tomas Fernandez-Lobbe)
================== 7.4.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -88,6 +88,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.ObjectReleaseTracker;
@ -127,6 +128,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
/**
@ -229,6 +231,8 @@ public class ZkController {
private volatile boolean isClosed;
private final ConcurrentHashMap<String, Throwable> replicasMetTragicEvent = new ConcurrentHashMap<>();
@Deprecated
// keeps track of replicas that have been asked to recover by leaders running on this node
private final Map<String, String> replicasInLeaderInitiatedRecovery = new HashMap<String, String>();
@ -592,6 +596,57 @@ public class ZkController {
assert ObjectReleaseTracker.release(this);
}
public void giveupLeadership(CoreDescriptor cd, Throwable tragicException) {
DocCollection dc = getClusterState().getCollectionOrNull(cd.getCollectionName());
if (dc == null) return;
Slice shard = dc.getSlice(cd.getCloudDescriptor().getShardId());
if (shard == null) return;
// if this replica is not a leader, it will be put in recovery state by the leader
if (shard.getReplica(cd.getCloudDescriptor().getCoreNodeName()) != shard.getLeader()) return;
int numActiveReplicas = shard.getReplicas(
rep -> rep.getState() == Replica.State.ACTIVE
&& rep.getType() != Type.PULL
&& getClusterState().getLiveNodes().contains(rep.getNodeName())
).size();
// at least the leader still be able to search, we should give up leadership if other replicas can take over
if (numActiveReplicas >= 2) {
String key = cd.getCollectionName() + ":" + cd.getCloudDescriptor().getCoreNodeName();
//TODO better handling the case when delete replica was failed
if (replicasMetTragicEvent.putIfAbsent(key, tragicException) == null) {
log.warn("Leader {} met tragic exception, give up its leadership", key, tragicException);
try {
// by using Overseer to remove and add replica back, we can do the task in an async/robust manner
Map<String,Object> props = new HashMap<>();
props.put(Overseer.QUEUE_OPERATION, "deletereplica");
props.put(COLLECTION_PROP, cd.getCollectionName());
props.put(SHARD_ID_PROP, shard.getName());
props.put(REPLICA_PROP, cd.getCloudDescriptor().getCoreNodeName());
getOverseerCollectionQueue().offer(Utils.toJSON(new ZkNodeProps(props)));
props.clear();
props.put(Overseer.QUEUE_OPERATION, "addreplica");
props.put(COLLECTION_PROP, cd.getCollectionName());
props.put(SHARD_ID_PROP, shard.getName());
props.put(ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().name().toUpperCase(Locale.ROOT));
props.put(CoreAdminParams.NODE, getNodeName());
getOverseerCollectionQueue().offer(Utils.toJSON(new ZkNodeProps(props)));
} catch (KeeperException e) {
log.info("Met exception on give up leadership for {}", key, e);
replicasMetTragicEvent.remove(key);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.info("Met exception on give up leadership for {}", key, e);
replicasMetTragicEvent.remove(key);
}
}
}
}
/**
* Returns true if config file exists
*/
@ -1522,6 +1577,7 @@ public class ZkController {
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
final String collection = cd.getCloudDescriptor().getCollectionName();
getCollectionTerms(collection).remove(cd.getCloudDescriptor().getShardId(), cd);
replicasMetTragicEvent.remove(collection+":"+coreNodeName);
if (Strings.isNullOrEmpty(collection)) {
log.error("No collection was specified.");

View File

@ -1806,6 +1806,22 @@ public class CoreContainer {
return false;
}
public void checkTragicException(SolrCore solrCore) {
Throwable tragicException = null;
try {
tragicException = solrCore.getSolrCoreState().getTragicException();
} catch (IOException e) {
// failed to open an indexWriter
tragicException = e;
}
if (tragicException != null) {
if (isZooKeeperAware()) {
getZkController().giveupLeadership(solrCore.getCoreDescriptor(), tragicException);
}
}
}
}
class CloserThread extends Thread {

View File

@ -208,6 +208,9 @@ public abstract class RequestHandlerBase implements SolrRequestHandler, SolrInfo
}
}
} catch (Exception e) {
if (req.getCore() != null) {
req.getCore().getCoreContainer().checkTragicException(req.getCore());
}
boolean incrementErrors = true;
boolean isServerError = true;
if (e instanceof SolrException) {

View File

@ -194,4 +194,9 @@ public abstract class SolrCoreState {
public abstract void setCdcrBootstrapCallable(Callable cdcrBootstrapCallable);
public Throwable getTragicException() throws IOException {
RefCounted<IndexWriter> ref = getIndexWriter(null);
if (ref == null) return null;
return ref.get().getTragicException();
}
}

View File

@ -44,5 +44,8 @@
</lst>
</requestHandler>
<indexConfig>
<mergeScheduler class="${solr.mscheduler:org.apache.lucene.index.ConcurrentMergeScheduler}"/>
</indexConfig>
</config>

View File

@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.cloud.ClusterStateUtil;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.SolrCore;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LeaderTragicEventTest extends SolrCloudTestCase {
private static final String COLLECTION = "collection1";
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@BeforeClass
public static void setupCluster() throws Exception {
System.setProperty("solr.mscheduler", "org.apache.solr.core.MockConcurrentMergeScheduler");
configureCluster(2)
.addConfig("config", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
cluster.getSolrClient().setDefaultCollection(COLLECTION);
}
@AfterClass
public static void cleanup() {
System.clearProperty("solr.mscheduler");
}
@Test
public void test() throws Exception {
CollectionAdminRequest
.createCollection(COLLECTION, "config", 1, 2)
.process(cluster.getSolrClient());
ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), COLLECTION, 120000);
List<String> addedIds = new ArrayList<>();
Replica oldLeader = corruptLeader(addedIds);
waitForState("Timeout waiting for new replica become leader", COLLECTION, (liveNodes, collectionState) -> {
Slice slice = collectionState.getSlice("shard1");
if (slice.getReplicas().size() != 2) return false;
if (slice.getLeader().getName().equals(oldLeader.getName())) return false;
return true;
});
ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), COLLECTION, 120000);
Slice shard = getCollectionState(COLLECTION).getSlice("shard1");
assertNotSame(shard.getLeader().getNodeName(), oldLeader.getNodeName());
assertEquals(getNonLeader(shard).getNodeName(), oldLeader.getNodeName());
for (String id : addedIds) {
assertNotNull(cluster.getSolrClient().getById(COLLECTION,id));
}
log.info("The test success oldLeader:{} currentState:{}", oldLeader, getCollectionState(COLLECTION));
CollectionAdminRequest.deleteCollection(COLLECTION).process(cluster.getSolrClient());
}
private Replica corruptLeader(List<String> addedIds) throws IOException {
DocCollection dc = getCollectionState(COLLECTION);
Replica oldLeader = dc.getLeader("shard1");
CoreContainer leaderCC = cluster.getReplicaJetty(oldLeader).getCoreContainer();
SolrCore leaderCore = leaderCC.getCores().iterator().next();
MockDirectoryWrapper dir = (MockDirectoryWrapper) leaderCore.getDirectoryFactory().get(leaderCore.getIndexDir(), DirectoryFactory.DirContext.DEFAULT, leaderCore.getSolrConfig().indexConfig.lockType);
leaderCore.getDirectoryFactory().release(dir);
try (HttpSolrClient solrClient = new HttpSolrClient.Builder(dc.getLeader("shard1").getCoreUrl()).build()) {
for (int i = 0; i < 100; i++) {
new UpdateRequest()
.add("id", i + "")
.process(solrClient);
solrClient.commit();
addedIds.add(i + "");
for (String file : dir.listAll()) {
if (file.contains("segments_")) continue;
if (file.endsWith("si")) continue;
if (file.endsWith("fnm")) continue;
if (random().nextBoolean()) continue;
dir.corruptFiles(Collections.singleton(file));
}
}
} catch (Exception e) {
// Expected
}
return oldLeader;
}
private Replica getNonLeader(Slice slice) {
if (slice.getReplicas().size() <= 1) return null;
return slice.getReplicas(rep -> !rep.getName().equals(slice.getLeader().getName())).get(0);
}
@Test
public void testOtherReplicasAreNotActive() throws Exception {
int numReplicas = random().nextInt(2) + 1;
// won't do anything if leader is the only one active replica in the shard
CollectionAdminRequest
.createCollection(COLLECTION, "config", 1, numReplicas)
.process(cluster.getSolrClient());
ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), COLLECTION, 120000);
JettySolrRunner otherReplicaJetty = null;
if (numReplicas == 2) {
Slice shard = getCollectionState(COLLECTION).getSlice("shard1");
otherReplicaJetty = cluster.getReplicaJetty(getNonLeader(shard));
otherReplicaJetty.stop();
waitForState("Timeout waiting for replica get down", COLLECTION, (liveNodes, collectionState) -> getNonLeader(collectionState.getSlice("shard1")).getState() != Replica.State.ACTIVE);
}
Replica oldLeader = corruptLeader(new ArrayList<>());
//TODO better way to test this
Thread.sleep(5000);
Replica leader = getCollectionState(COLLECTION).getSlice("shard1").getLeader();
assertEquals(leader.getName(), oldLeader.getName());
if (otherReplicaJetty != null) {
// won't be able to do anything here, since this replica can't recovery from the leader
otherReplicaJetty.start();
}
CollectionAdminRequest.deleteCollection(COLLECTION).process(cluster.getSolrClient());
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.core;
import java.lang.invoke.MethodHandles;
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.store.Directory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MockConcurrentMergeScheduler extends ConcurrentMergeScheduler {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
protected void handleMergeException(Directory dir, Throwable exc) {
// swallow the exception
log.warn("Merge exception:", exc);
}
}