From 119717611094c755b271db6e7a8614fe9406bb5e Mon Sep 17 00:00:00 2001 From: Cao Manh Dat Date: Mon, 9 Jul 2018 09:14:23 +0700 Subject: [PATCH] SOLR-12412: Leader should give up leadership when IndexWriter.tragedy occur --- solr/CHANGES.txt | 2 + .../org/apache/solr/cloud/ZkController.java | 56 ++++++ .../org/apache/solr/core/CoreContainer.java | 16 ++ .../solr/handler/RequestHandlerBase.java | 3 + .../org/apache/solr/update/SolrCoreState.java | 5 + .../cloud-minimal/conf/solrconfig.xml | 3 + .../solr/cloud/LeaderTragicEventTest.java | 166 ++++++++++++++++++ .../core/MockConcurrentMergeScheduler.java | 35 ++++ 8 files changed, 286 insertions(+) create mode 100644 solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java create mode 100644 solr/test-framework/src/java/org/apache/solr/core/MockConcurrentMergeScheduler.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 025b6b0f877..e0a3b431a6e 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -168,6 +168,8 @@ Other Changes * SOLR-12527: factor out a test-framework/ConfigRequest class (Christine Poerschke) +* SOLR-12412: Leader should give up leadership when IndexWriter.tragedy occur (Cao Manh Dat, Tomas Fernandez-Lobbe) + ================== 7.4.0 ================== Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 367a1316c00..831827731bc 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -88,6 +88,7 @@ import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZooKeeperException; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.IOUtils; import org.apache.solr.common.util.ObjectReleaseTracker; @@ -127,6 +128,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP; import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP; import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP; import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; /** @@ -229,6 +231,8 @@ public class ZkController { private volatile boolean isClosed; + private final ConcurrentHashMap replicasMetTragicEvent = new ConcurrentHashMap<>(); + @Deprecated // keeps track of replicas that have been asked to recover by leaders running on this node private final Map replicasInLeaderInitiatedRecovery = new HashMap(); @@ -592,6 +596,57 @@ public class ZkController { assert ObjectReleaseTracker.release(this); } + public void giveupLeadership(CoreDescriptor cd, Throwable tragicException) { + DocCollection dc = getClusterState().getCollectionOrNull(cd.getCollectionName()); + if (dc == null) return; + + Slice shard = dc.getSlice(cd.getCloudDescriptor().getShardId()); + if (shard == null) return; + + // if this replica is not a leader, it will be put in recovery state by the leader + if (shard.getReplica(cd.getCloudDescriptor().getCoreNodeName()) != shard.getLeader()) return; + + int numActiveReplicas = shard.getReplicas( + rep -> rep.getState() == Replica.State.ACTIVE + && rep.getType() != Type.PULL + && getClusterState().getLiveNodes().contains(rep.getNodeName()) + ).size(); + + // at least the leader still be able to search, we should give up leadership if other replicas can take over + if (numActiveReplicas >= 2) { + String key = cd.getCollectionName() + ":" + cd.getCloudDescriptor().getCoreNodeName(); + //TODO better handling the case when delete replica was failed + if (replicasMetTragicEvent.putIfAbsent(key, tragicException) == null) { + log.warn("Leader {} met tragic exception, give up its leadership", key, tragicException); + try { + // by using Overseer to remove and add replica back, we can do the task in an async/robust manner + Map props = new HashMap<>(); + props.put(Overseer.QUEUE_OPERATION, "deletereplica"); + props.put(COLLECTION_PROP, cd.getCollectionName()); + props.put(SHARD_ID_PROP, shard.getName()); + props.put(REPLICA_PROP, cd.getCloudDescriptor().getCoreNodeName()); + getOverseerCollectionQueue().offer(Utils.toJSON(new ZkNodeProps(props))); + + props.clear(); + props.put(Overseer.QUEUE_OPERATION, "addreplica"); + props.put(COLLECTION_PROP, cd.getCollectionName()); + props.put(SHARD_ID_PROP, shard.getName()); + props.put(ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().name().toUpperCase(Locale.ROOT)); + props.put(CoreAdminParams.NODE, getNodeName()); + getOverseerCollectionQueue().offer(Utils.toJSON(new ZkNodeProps(props))); + } catch (KeeperException e) { + log.info("Met exception on give up leadership for {}", key, e); + replicasMetTragicEvent.remove(key); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.info("Met exception on give up leadership for {}", key, e); + replicasMetTragicEvent.remove(key); + } + } + } + } + + /** * Returns true if config file exists */ @@ -1522,6 +1577,7 @@ public class ZkController { final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName(); final String collection = cd.getCloudDescriptor().getCollectionName(); getCollectionTerms(collection).remove(cd.getCloudDescriptor().getShardId(), cd); + replicasMetTragicEvent.remove(collection+":"+coreNodeName); if (Strings.isNullOrEmpty(collection)) { log.error("No collection was specified."); diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index 37a660fdc41..17f1cdb37fb 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -1806,6 +1806,22 @@ public class CoreContainer { return false; } + public void checkTragicException(SolrCore solrCore) { + Throwable tragicException = null; + try { + tragicException = solrCore.getSolrCoreState().getTragicException(); + } catch (IOException e) { + // failed to open an indexWriter + tragicException = e; + } + + if (tragicException != null) { + if (isZooKeeperAware()) { + getZkController().giveupLeadership(solrCore.getCoreDescriptor(), tragicException); + } + } + } + } class CloserThread extends Thread { diff --git a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java index 28b91e77590..af8d3be7062 100644 --- a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java +++ b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java @@ -208,6 +208,9 @@ public abstract class RequestHandlerBase implements SolrRequestHandler, SolrInfo } } } catch (Exception e) { + if (req.getCore() != null) { + req.getCore().getCoreContainer().checkTragicException(req.getCore()); + } boolean incrementErrors = true; boolean isServerError = true; if (e instanceof SolrException) { diff --git a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java index 9da25161a54..64f6bc6b76b 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java +++ b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java @@ -194,4 +194,9 @@ public abstract class SolrCoreState { public abstract void setCdcrBootstrapCallable(Callable cdcrBootstrapCallable); + public Throwable getTragicException() throws IOException { + RefCounted ref = getIndexWriter(null); + if (ref == null) return null; + return ref.get().getTragicException(); + } } diff --git a/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml b/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml index 8da7d2847e9..f6718427632 100644 --- a/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml +++ b/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml @@ -44,5 +44,8 @@ + + + diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java new file mode 100644 index 00000000000..c838effc991 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.lucene.store.MockDirectoryWrapper; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.common.cloud.ClusterStateUtil; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.DirectoryFactory; +import org.apache.solr.core.SolrCore; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LeaderTragicEventTest extends SolrCloudTestCase { + + private static final String COLLECTION = "collection1"; + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @BeforeClass + public static void setupCluster() throws Exception { + System.setProperty("solr.mscheduler", "org.apache.solr.core.MockConcurrentMergeScheduler"); + + configureCluster(2) + .addConfig("config", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf")) + .configure(); + + cluster.getSolrClient().setDefaultCollection(COLLECTION); + } + + @AfterClass + public static void cleanup() { + System.clearProperty("solr.mscheduler"); + } + + + @Test + public void test() throws Exception { + CollectionAdminRequest + .createCollection(COLLECTION, "config", 1, 2) + .process(cluster.getSolrClient()); + ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), COLLECTION, 120000); + + List addedIds = new ArrayList<>(); + Replica oldLeader = corruptLeader(addedIds); + + waitForState("Timeout waiting for new replica become leader", COLLECTION, (liveNodes, collectionState) -> { + Slice slice = collectionState.getSlice("shard1"); + + if (slice.getReplicas().size() != 2) return false; + if (slice.getLeader().getName().equals(oldLeader.getName())) return false; + + return true; + }); + ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), COLLECTION, 120000); + Slice shard = getCollectionState(COLLECTION).getSlice("shard1"); + assertNotSame(shard.getLeader().getNodeName(), oldLeader.getNodeName()); + assertEquals(getNonLeader(shard).getNodeName(), oldLeader.getNodeName()); + + for (String id : addedIds) { + assertNotNull(cluster.getSolrClient().getById(COLLECTION,id)); + } + log.info("The test success oldLeader:{} currentState:{}", oldLeader, getCollectionState(COLLECTION)); + + CollectionAdminRequest.deleteCollection(COLLECTION).process(cluster.getSolrClient()); + } + + private Replica corruptLeader(List addedIds) throws IOException { + DocCollection dc = getCollectionState(COLLECTION); + Replica oldLeader = dc.getLeader("shard1"); + CoreContainer leaderCC = cluster.getReplicaJetty(oldLeader).getCoreContainer(); + SolrCore leaderCore = leaderCC.getCores().iterator().next(); + MockDirectoryWrapper dir = (MockDirectoryWrapper) leaderCore.getDirectoryFactory().get(leaderCore.getIndexDir(), DirectoryFactory.DirContext.DEFAULT, leaderCore.getSolrConfig().indexConfig.lockType); + leaderCore.getDirectoryFactory().release(dir); + + try (HttpSolrClient solrClient = new HttpSolrClient.Builder(dc.getLeader("shard1").getCoreUrl()).build()) { + for (int i = 0; i < 100; i++) { + new UpdateRequest() + .add("id", i + "") + .process(solrClient); + solrClient.commit(); + addedIds.add(i + ""); + + for (String file : dir.listAll()) { + if (file.contains("segments_")) continue; + if (file.endsWith("si")) continue; + if (file.endsWith("fnm")) continue; + if (random().nextBoolean()) continue; + + dir.corruptFiles(Collections.singleton(file)); + } + } + } catch (Exception e) { + // Expected + } + return oldLeader; + } + + private Replica getNonLeader(Slice slice) { + if (slice.getReplicas().size() <= 1) return null; + return slice.getReplicas(rep -> !rep.getName().equals(slice.getLeader().getName())).get(0); + } + + @Test + public void testOtherReplicasAreNotActive() throws Exception { + int numReplicas = random().nextInt(2) + 1; + // won't do anything if leader is the only one active replica in the shard + CollectionAdminRequest + .createCollection(COLLECTION, "config", 1, numReplicas) + .process(cluster.getSolrClient()); + ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), COLLECTION, 120000); + + JettySolrRunner otherReplicaJetty = null; + if (numReplicas == 2) { + Slice shard = getCollectionState(COLLECTION).getSlice("shard1"); + otherReplicaJetty = cluster.getReplicaJetty(getNonLeader(shard)); + otherReplicaJetty.stop(); + waitForState("Timeout waiting for replica get down", COLLECTION, (liveNodes, collectionState) -> getNonLeader(collectionState.getSlice("shard1")).getState() != Replica.State.ACTIVE); + } + + Replica oldLeader = corruptLeader(new ArrayList<>()); + + //TODO better way to test this + Thread.sleep(5000); + Replica leader = getCollectionState(COLLECTION).getSlice("shard1").getLeader(); + assertEquals(leader.getName(), oldLeader.getName()); + + if (otherReplicaJetty != null) { + // won't be able to do anything here, since this replica can't recovery from the leader + otherReplicaJetty.start(); + } + + CollectionAdminRequest.deleteCollection(COLLECTION).process(cluster.getSolrClient()); + } + + +} diff --git a/solr/test-framework/src/java/org/apache/solr/core/MockConcurrentMergeScheduler.java b/solr/test-framework/src/java/org/apache/solr/core/MockConcurrentMergeScheduler.java new file mode 100644 index 00000000000..ba66aa3056a --- /dev/null +++ b/solr/test-framework/src/java/org/apache/solr/core/MockConcurrentMergeScheduler.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.core; + +import java.lang.invoke.MethodHandles; + +import org.apache.lucene.index.ConcurrentMergeScheduler; +import org.apache.lucene.store.Directory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MockConcurrentMergeScheduler extends ConcurrentMergeScheduler { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Override + protected void handleMergeException(Directory dir, Throwable exc) { + // swallow the exception + log.warn("Merge exception:", exc); + } +}