From 57b33c19a4a8d8fe675c190fd72144c317ed43be Mon Sep 17 00:00:00 2001 From: Shalin Shekhar Mangar Date: Thu, 16 Aug 2018 16:07:05 +0530 Subject: [PATCH] SOLR-12607: Minor refactorings Replaced a few private instances with lambdas and extracted common code for retrying splits into a new method --- .../cloud/api/collections/ShardSplitTest.java | 249 ++++++++---------- 1 file changed, 110 insertions(+), 139 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java index 2b48fbe10b8..6619ee519c3 100644 --- a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java @@ -49,7 +49,6 @@ import org.apache.solr.cloud.ChaosMonkey; import org.apache.solr.cloud.StoppableIndexingThread; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.cloud.ClusterState; -import org.apache.solr.common.cloud.CollectionStateWatcher; import org.apache.solr.common.cloud.CompositeIdRouter; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocRouter; @@ -80,8 +79,8 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - public static final String SHARD1_0 = SHARD1 + "_0"; - public static final String SHARD1_1 = SHARD1 + "_1"; + private static final String SHARD1_0 = SHARD1 + "_0"; + private static final String SHARD1_1 = SHARD1 + "_1"; public ShardSplitTest() { schemaString = "schema15.xml"; // we need a string id @@ -161,21 +160,18 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase { waitForRecoveriesToFinish(collectionName, true); // let's wait to see parent shard become inactive CountDownLatch latch = new CountDownLatch(1); - client.getZkStateReader().registerCollectionStateWatcher(collectionName, new CollectionStateWatcher() { - @Override - public boolean onStateChanged(Set liveNodes, DocCollection collectionState) { - Slice parent = collectionState.getSlice(SHARD1); - Slice slice10 = collectionState.getSlice(SHARD1_0); - Slice slice11 = collectionState.getSlice(SHARD1_1); - if (slice10 != null && slice11 != null && - parent.getState() == Slice.State.INACTIVE && - slice10.getState() == Slice.State.ACTIVE && - slice11.getState() == Slice.State.ACTIVE) { - latch.countDown(); - return true; // removes the watch - } - return false; + client.getZkStateReader().registerCollectionStateWatcher(collectionName, (liveNodes, collectionState) -> { + Slice parent = collectionState.getSlice(SHARD1); + Slice slice10 = collectionState.getSlice(SHARD1_0); + Slice slice11 = collectionState.getSlice(SHARD1_1); + if (slice10 != null && slice11 != null && + parent.getState() == Slice.State.INACTIVE && + slice10.getState() == Slice.State.ACTIVE && + slice11.getState() == Slice.State.ACTIVE) { + latch.countDown(); + return true; // removes the watch } + return false; }); latch.await(1, TimeUnit.MINUTES); if (latch.getCount() != 0) { @@ -211,22 +207,19 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase { } if (state == RequestStatusState.COMPLETED) { CountDownLatch newReplicaLatch = new CountDownLatch(1); - client.getZkStateReader().registerCollectionStateWatcher(collectionName, new CollectionStateWatcher() { - @Override - public boolean onStateChanged(Set liveNodes, DocCollection collectionState) { - if (liveNodes.size() != liveNodeCount) { - return false; - } - Slice slice = collectionState.getSlice(SHARD1_0); - if (slice.getReplicas().size() == 2) { - if (!slice.getReplicas().stream().anyMatch(r -> r.getState() == Replica.State.RECOVERING)) { - // we see replicas and none of them are recovering - newReplicaLatch.countDown(); - return true; - } - } + client.getZkStateReader().registerCollectionStateWatcher(collectionName, (liveNodes, collectionState) -> { + if (liveNodes.size() != liveNodeCount) { return false; } + Slice slice = collectionState.getSlice(SHARD1_0); + if (slice.getReplicas().size() == 2) { + if (slice.getReplicas().stream().noneMatch(r -> r.getState() == Replica.State.RECOVERING)) { + // we see replicas and none of them are recovering + newReplicaLatch.countDown(); + return true; + } + } + return false; }); newReplicaLatch.await(30, TimeUnit.SECONDS); // check consistency of sub-shard replica explicitly because checkShardConsistency methods doesn't @@ -422,41 +415,34 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase { AtomicBoolean stop = new AtomicBoolean(); AtomicBoolean killed = new AtomicBoolean(false); - Runnable monkey = new Runnable() { - @Override - public void run() { - ZkStateReader zkStateReader = cloudClient.getZkStateReader(); - zkStateReader.registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, new CollectionStateWatcher() { - @Override - public boolean onStateChanged(Set liveNodes, DocCollection collectionState) { - if (stop.get()) { - return true; // abort and remove the watch + Runnable monkey = () -> { + ZkStateReader zkStateReader = cloudClient.getZkStateReader(); + zkStateReader.registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, (liveNodes, collectionState) -> { + if (stop.get()) { + return true; // abort and remove the watch + } + Slice slice = collectionState.getSlice(SHARD1_0); + if (slice != null && slice.getReplicas().size() > 1) { + // ensure that only one watcher invocation thread can kill! + if (killed.compareAndSet(false, true)) { + log.info("Monkey thread found 2 replicas for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1); + CloudJettyRunner cjetty = shardToLeaderJetty.get(SHARD1); + try { + Thread.sleep(1000 + random().nextInt(500)); + ChaosMonkey.kill(cjetty); + stop.set(true); + return true; + } catch (Exception e) { + log.error("Monkey unable to kill jetty at port " + cjetty.jetty.getLocalPort(), e); } - Slice slice = collectionState.getSlice(SHARD1_0); - if (slice != null && slice.getReplicas().size() > 1) { - // ensure that only one watcher invocation thread can kill! - if (killed.compareAndSet(false, true)) { - log.info("Monkey thread found 2 replicas for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1); - CloudJettyRunner cjetty = shardToLeaderJetty.get(SHARD1); - try { - Thread.sleep(1000 + random().nextInt(500)); - ChaosMonkey.kill(cjetty); - stop.set(true); - return true; - } catch (Exception e) { - log.error("Monkey unable to kill jetty at port " + cjetty.jetty.getLocalPort(), e); - } - } - } - log.info("Monkey thread found only one replica for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1); - return false; } - }); - } + } + log.info("Monkey thread found only one replica for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1); + return false; + }); }; - Thread monkeyThread = null; - monkeyThread = new Thread(monkey); + Thread monkeyThread = new Thread(monkey); monkeyThread.start(); try { CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION); @@ -497,29 +483,26 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase { waitForRecoveriesToFinish(AbstractDistribZkTestBase.DEFAULT_COLLECTION, true); // let's wait for the overseer to switch shard states CountDownLatch latch = new CountDownLatch(1); - cloudClient.getZkStateReader().registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, new CollectionStateWatcher() { - @Override - public boolean onStateChanged(Set liveNodes, DocCollection collectionState) { - Slice parent = collectionState.getSlice(SHARD1); - Slice slice10 = collectionState.getSlice(SHARD1_0); - Slice slice11 = collectionState.getSlice(SHARD1_1); - if (slice10 != null && slice11 != null && - parent.getState() == Slice.State.INACTIVE && - slice10.getState() == Slice.State.ACTIVE && - slice11.getState() == Slice.State.ACTIVE) { - areSubShardsActive.set(true); - latch.countDown(); - return true; // removes the watch - } else if (slice10 != null && slice11 != null && - parent.getState() == Slice.State.ACTIVE && - slice10.getState() == Slice.State.RECOVERY_FAILED && - slice11.getState() == Slice.State.RECOVERY_FAILED) { - areSubShardsActive.set(false); - latch.countDown(); - return true; - } - return false; + cloudClient.getZkStateReader().registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, (liveNodes, collectionState) -> { + Slice parent = collectionState.getSlice(SHARD1); + Slice slice10 = collectionState.getSlice(SHARD1_0); + Slice slice11 = collectionState.getSlice(SHARD1_1); + if (slice10 != null && slice11 != null && + parent.getState() == Slice.State.INACTIVE && + slice10.getState() == Slice.State.ACTIVE && + slice11.getState() == Slice.State.ACTIVE) { + areSubShardsActive.set(true); + latch.countDown(); + return true; // removes the watch + } else if (slice10 != null && slice11 != null && + parent.getState() == Slice.State.ACTIVE && + slice10.getState() == Slice.State.RECOVERY_FAILED && + slice11.getState() == Slice.State.RECOVERY_FAILED) { + areSubShardsActive.set(false); + latch.countDown(); + return true; } + return false; }); latch.await(2, TimeUnit.MINUTES); @@ -660,37 +643,34 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase { } commit(); - Thread indexThread = new Thread() { - @Override - public void run() { - Random random = random(); - int max = atLeast(random, 401); - int sleep = atLeast(random, 25); - log.info("SHARDSPLITTEST: Going to add " + max + " number of docs at 1 doc per " + sleep + "ms"); - Set deleted = new HashSet<>(); - for (int id = 101; id < max; id++) { - try { - indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id, documentIds); - Thread.sleep(sleep); - if (usually(random)) { - String delId = String.valueOf(random.nextInt(id - 101 + 1) + 101); - if (deleted.contains(delId)) continue; - try { - deleteAndUpdateCount(router, ranges, docCounts, delId); - deleted.add(delId); - documentIds.remove(String.valueOf(delId)); - } catch (Exception e) { - log.error("Exception while deleting docs", e); - } + Thread indexThread = new Thread(() -> { + Random random = random(); + int max = atLeast(random, 401); + int sleep = atLeast(random, 25); + log.info("SHARDSPLITTEST: Going to add " + max + " number of docs at 1 doc per " + sleep + "ms"); + Set deleted = new HashSet<>(); + for (int id = 101; id < max; id++) { + try { + indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id, documentIds); + Thread.sleep(sleep); + if (usually(random)) { + String delId = String.valueOf(random.nextInt(id - 101 + 1) + 101); + if (deleted.contains(delId)) continue; + try { + deleteAndUpdateCount(router, ranges, docCounts, delId); + deleted.add(delId); + documentIds.remove(String.valueOf(delId)); + } catch (Exception e) { + log.error("Exception while deleting docs", e); } - } catch (Exception e) { - log.error("Exception while adding doc id = " + id, e); - // do not select this id for deletion ever - deleted.add(String.valueOf(id)); } + } catch (Exception e) { + log.error("Exception while adding doc id = " + id, e); + // do not select this id for deletion ever + deleted.add(String.valueOf(id)); } } - }; + }); indexThread.start(); try { @@ -776,20 +756,7 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase { collectionClient.commit(); - for (int i = 0; i < 3; i++) { - try { - splitShard(collectionName, SHARD1, null, null, false); - break; - } catch (HttpSolrClient.RemoteSolrException e) { - if (e.code() != 500) { - throw e; - } - log.error("SPLITSHARD failed. " + (i < 2 ? " Retring split" : ""), e); - if (i == 2) { - fail("SPLITSHARD was not successful even after three tries"); - } - } - } + trySplit(collectionName, null, SHARD1, 3); waitForRecoveriesToFinish(collectionName, false); @@ -858,20 +825,7 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase { collectionClient.commit(); - for (int i = 0; i < 3; i++) { - try { - splitShard(collectionName, null, null, splitKey, false); - break; - } catch (HttpSolrClient.RemoteSolrException e) { - if (e.code() != 500) { - throw e; - } - log.error("SPLITSHARD failed. " + (i < 2 ? " Retring split" : ""), e); - if (i == 2) { - fail("SPLITSHARD was not successful even after three tries"); - } - } - } + trySplit(collectionName, splitKey, null, 3); waitForRecoveriesToFinish(collectionName, false); SolrQuery solrQuery = new SolrQuery("*:*"); @@ -886,6 +840,23 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase { } } + private void trySplit(String collectionName, String splitKey, String shardId, int maxTries) throws SolrServerException, IOException { + for (int i = 0; i < maxTries; i++) { + try { + splitShard(collectionName, shardId, null, splitKey, false); + break; + } catch (HttpSolrClient.RemoteSolrException e) { + if (e.code() != 500) { + throw e; + } + log.error("SPLITSHARD failed. " + (i < maxTries - 1 ? " Retring split" : ""), e); + if (i == 2) { + fail("SPLITSHARD was not successful even after three tries"); + } + } + } + } + protected void checkDocCountsAndShardStates(int[] docCounts, int numReplicas, Set documentIds) throws Exception { ClusterState clusterState = null; Slice slice1_0 = null, slice1_1 = null;