SOLR-12607: Minor refactorings

Replaced a few private instances with lambdas and extracted common code for retrying splits into a new method
This commit is contained in:
Shalin Shekhar Mangar 2018-08-16 16:07:05 +05:30
parent 94ecb0616a
commit 57b33c19a4
1 changed files with 110 additions and 139 deletions

View File

@ -49,7 +49,6 @@ import org.apache.solr.cloud.ChaosMonkey;
import org.apache.solr.cloud.StoppableIndexingThread; import org.apache.solr.cloud.StoppableIndexingThread;
import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.CompositeIdRouter; import org.apache.solr.common.cloud.CompositeIdRouter;
import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.DocRouter;
@ -80,8 +79,8 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String SHARD1_0 = SHARD1 + "_0"; private static final String SHARD1_0 = SHARD1 + "_0";
public static final String SHARD1_1 = SHARD1 + "_1"; private static final String SHARD1_1 = SHARD1 + "_1";
public ShardSplitTest() { public ShardSplitTest() {
schemaString = "schema15.xml"; // we need a string id schemaString = "schema15.xml"; // we need a string id
@ -161,21 +160,18 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
waitForRecoveriesToFinish(collectionName, true); waitForRecoveriesToFinish(collectionName, true);
// let's wait to see parent shard become inactive // let's wait to see parent shard become inactive
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
client.getZkStateReader().registerCollectionStateWatcher(collectionName, new CollectionStateWatcher() { client.getZkStateReader().registerCollectionStateWatcher(collectionName, (liveNodes, collectionState) -> {
@Override Slice parent = collectionState.getSlice(SHARD1);
public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) { Slice slice10 = collectionState.getSlice(SHARD1_0);
Slice parent = collectionState.getSlice(SHARD1); Slice slice11 = collectionState.getSlice(SHARD1_1);
Slice slice10 = collectionState.getSlice(SHARD1_0); if (slice10 != null && slice11 != null &&
Slice slice11 = collectionState.getSlice(SHARD1_1); parent.getState() == Slice.State.INACTIVE &&
if (slice10 != null && slice11 != null && slice10.getState() == Slice.State.ACTIVE &&
parent.getState() == Slice.State.INACTIVE && slice11.getState() == Slice.State.ACTIVE) {
slice10.getState() == Slice.State.ACTIVE && latch.countDown();
slice11.getState() == Slice.State.ACTIVE) { return true; // removes the watch
latch.countDown();
return true; // removes the watch
}
return false;
} }
return false;
}); });
latch.await(1, TimeUnit.MINUTES); latch.await(1, TimeUnit.MINUTES);
if (latch.getCount() != 0) { if (latch.getCount() != 0) {
@ -211,22 +207,19 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
} }
if (state == RequestStatusState.COMPLETED) { if (state == RequestStatusState.COMPLETED) {
CountDownLatch newReplicaLatch = new CountDownLatch(1); CountDownLatch newReplicaLatch = new CountDownLatch(1);
client.getZkStateReader().registerCollectionStateWatcher(collectionName, new CollectionStateWatcher() { client.getZkStateReader().registerCollectionStateWatcher(collectionName, (liveNodes, collectionState) -> {
@Override if (liveNodes.size() != liveNodeCount) {
public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
if (liveNodes.size() != liveNodeCount) {
return false;
}
Slice slice = collectionState.getSlice(SHARD1_0);
if (slice.getReplicas().size() == 2) {
if (!slice.getReplicas().stream().anyMatch(r -> r.getState() == Replica.State.RECOVERING)) {
// we see replicas and none of them are recovering
newReplicaLatch.countDown();
return true;
}
}
return false; return false;
} }
Slice slice = collectionState.getSlice(SHARD1_0);
if (slice.getReplicas().size() == 2) {
if (slice.getReplicas().stream().noneMatch(r -> r.getState() == Replica.State.RECOVERING)) {
// we see replicas and none of them are recovering
newReplicaLatch.countDown();
return true;
}
}
return false;
}); });
newReplicaLatch.await(30, TimeUnit.SECONDS); newReplicaLatch.await(30, TimeUnit.SECONDS);
// check consistency of sub-shard replica explicitly because checkShardConsistency methods doesn't // check consistency of sub-shard replica explicitly because checkShardConsistency methods doesn't
@ -422,41 +415,34 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
AtomicBoolean stop = new AtomicBoolean(); AtomicBoolean stop = new AtomicBoolean();
AtomicBoolean killed = new AtomicBoolean(false); AtomicBoolean killed = new AtomicBoolean(false);
Runnable monkey = new Runnable() { Runnable monkey = () -> {
@Override ZkStateReader zkStateReader = cloudClient.getZkStateReader();
public void run() { zkStateReader.registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, (liveNodes, collectionState) -> {
ZkStateReader zkStateReader = cloudClient.getZkStateReader(); if (stop.get()) {
zkStateReader.registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, new CollectionStateWatcher() { return true; // abort and remove the watch
@Override }
public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) { Slice slice = collectionState.getSlice(SHARD1_0);
if (stop.get()) { if (slice != null && slice.getReplicas().size() > 1) {
return true; // abort and remove the watch // ensure that only one watcher invocation thread can kill!
if (killed.compareAndSet(false, true)) {
log.info("Monkey thread found 2 replicas for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
CloudJettyRunner cjetty = shardToLeaderJetty.get(SHARD1);
try {
Thread.sleep(1000 + random().nextInt(500));
ChaosMonkey.kill(cjetty);
stop.set(true);
return true;
} catch (Exception e) {
log.error("Monkey unable to kill jetty at port " + cjetty.jetty.getLocalPort(), e);
} }
Slice slice = collectionState.getSlice(SHARD1_0);
if (slice != null && slice.getReplicas().size() > 1) {
// ensure that only one watcher invocation thread can kill!
if (killed.compareAndSet(false, true)) {
log.info("Monkey thread found 2 replicas for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
CloudJettyRunner cjetty = shardToLeaderJetty.get(SHARD1);
try {
Thread.sleep(1000 + random().nextInt(500));
ChaosMonkey.kill(cjetty);
stop.set(true);
return true;
} catch (Exception e) {
log.error("Monkey unable to kill jetty at port " + cjetty.jetty.getLocalPort(), e);
}
}
}
log.info("Monkey thread found only one replica for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
return false;
} }
}); }
} log.info("Monkey thread found only one replica for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
return false;
});
}; };
Thread monkeyThread = null; Thread monkeyThread = new Thread(monkey);
monkeyThread = new Thread(monkey);
monkeyThread.start(); monkeyThread.start();
try { try {
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION); CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
@ -497,29 +483,26 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
waitForRecoveriesToFinish(AbstractDistribZkTestBase.DEFAULT_COLLECTION, true); waitForRecoveriesToFinish(AbstractDistribZkTestBase.DEFAULT_COLLECTION, true);
// let's wait for the overseer to switch shard states // let's wait for the overseer to switch shard states
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
cloudClient.getZkStateReader().registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, new CollectionStateWatcher() { cloudClient.getZkStateReader().registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, (liveNodes, collectionState) -> {
@Override Slice parent = collectionState.getSlice(SHARD1);
public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) { Slice slice10 = collectionState.getSlice(SHARD1_0);
Slice parent = collectionState.getSlice(SHARD1); Slice slice11 = collectionState.getSlice(SHARD1_1);
Slice slice10 = collectionState.getSlice(SHARD1_0); if (slice10 != null && slice11 != null &&
Slice slice11 = collectionState.getSlice(SHARD1_1); parent.getState() == Slice.State.INACTIVE &&
if (slice10 != null && slice11 != null && slice10.getState() == Slice.State.ACTIVE &&
parent.getState() == Slice.State.INACTIVE && slice11.getState() == Slice.State.ACTIVE) {
slice10.getState() == Slice.State.ACTIVE && areSubShardsActive.set(true);
slice11.getState() == Slice.State.ACTIVE) { latch.countDown();
areSubShardsActive.set(true); return true; // removes the watch
latch.countDown(); } else if (slice10 != null && slice11 != null &&
return true; // removes the watch parent.getState() == Slice.State.ACTIVE &&
} else if (slice10 != null && slice11 != null && slice10.getState() == Slice.State.RECOVERY_FAILED &&
parent.getState() == Slice.State.ACTIVE && slice11.getState() == Slice.State.RECOVERY_FAILED) {
slice10.getState() == Slice.State.RECOVERY_FAILED && areSubShardsActive.set(false);
slice11.getState() == Slice.State.RECOVERY_FAILED) { latch.countDown();
areSubShardsActive.set(false); return true;
latch.countDown();
return true;
}
return false;
} }
return false;
}); });
latch.await(2, TimeUnit.MINUTES); latch.await(2, TimeUnit.MINUTES);
@ -660,37 +643,34 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
} }
commit(); commit();
Thread indexThread = new Thread() { Thread indexThread = new Thread(() -> {
@Override Random random = random();
public void run() { int max = atLeast(random, 401);
Random random = random(); int sleep = atLeast(random, 25);
int max = atLeast(random, 401); log.info("SHARDSPLITTEST: Going to add " + max + " number of docs at 1 doc per " + sleep + "ms");
int sleep = atLeast(random, 25); Set<String> deleted = new HashSet<>();
log.info("SHARDSPLITTEST: Going to add " + max + " number of docs at 1 doc per " + sleep + "ms"); for (int id = 101; id < max; id++) {
Set<String> deleted = new HashSet<>(); try {
for (int id = 101; id < max; id++) { indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id, documentIds);
try { Thread.sleep(sleep);
indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id, documentIds); if (usually(random)) {
Thread.sleep(sleep); String delId = String.valueOf(random.nextInt(id - 101 + 1) + 101);
if (usually(random)) { if (deleted.contains(delId)) continue;
String delId = String.valueOf(random.nextInt(id - 101 + 1) + 101); try {
if (deleted.contains(delId)) continue; deleteAndUpdateCount(router, ranges, docCounts, delId);
try { deleted.add(delId);
deleteAndUpdateCount(router, ranges, docCounts, delId); documentIds.remove(String.valueOf(delId));
deleted.add(delId); } catch (Exception e) {
documentIds.remove(String.valueOf(delId)); log.error("Exception while deleting docs", e);
} catch (Exception e) {
log.error("Exception while deleting docs", e);
}
} }
} catch (Exception e) {
log.error("Exception while adding doc id = " + id, e);
// do not select this id for deletion ever
deleted.add(String.valueOf(id));
} }
} catch (Exception e) {
log.error("Exception while adding doc id = " + id, e);
// do not select this id for deletion ever
deleted.add(String.valueOf(id));
} }
} }
}; });
indexThread.start(); indexThread.start();
try { try {
@ -776,20 +756,7 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
collectionClient.commit(); collectionClient.commit();
for (int i = 0; i < 3; i++) { trySplit(collectionName, null, SHARD1, 3);
try {
splitShard(collectionName, SHARD1, null, null, false);
break;
} catch (HttpSolrClient.RemoteSolrException e) {
if (e.code() != 500) {
throw e;
}
log.error("SPLITSHARD failed. " + (i < 2 ? " Retring split" : ""), e);
if (i == 2) {
fail("SPLITSHARD was not successful even after three tries");
}
}
}
waitForRecoveriesToFinish(collectionName, false); waitForRecoveriesToFinish(collectionName, false);
@ -858,20 +825,7 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
collectionClient.commit(); collectionClient.commit();
for (int i = 0; i < 3; i++) { trySplit(collectionName, splitKey, null, 3);
try {
splitShard(collectionName, null, null, splitKey, false);
break;
} catch (HttpSolrClient.RemoteSolrException e) {
if (e.code() != 500) {
throw e;
}
log.error("SPLITSHARD failed. " + (i < 2 ? " Retring split" : ""), e);
if (i == 2) {
fail("SPLITSHARD was not successful even after three tries");
}
}
}
waitForRecoveriesToFinish(collectionName, false); waitForRecoveriesToFinish(collectionName, false);
SolrQuery solrQuery = new SolrQuery("*:*"); SolrQuery solrQuery = new SolrQuery("*:*");
@ -886,6 +840,23 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
} }
} }
private void trySplit(String collectionName, String splitKey, String shardId, int maxTries) throws SolrServerException, IOException {
for (int i = 0; i < maxTries; i++) {
try {
splitShard(collectionName, shardId, null, splitKey, false);
break;
} catch (HttpSolrClient.RemoteSolrException e) {
if (e.code() != 500) {
throw e;
}
log.error("SPLITSHARD failed. " + (i < maxTries - 1 ? " Retring split" : ""), e);
if (i == 2) {
fail("SPLITSHARD was not successful even after three tries");
}
}
}
}
protected void checkDocCountsAndShardStates(int[] docCounts, int numReplicas, Set<String> documentIds) throws Exception { protected void checkDocCountsAndShardStates(int[] docCounts, int numReplicas, Set<String> documentIds) throws Exception {
ClusterState clusterState = null; ClusterState clusterState = null;
Slice slice1_0 = null, slice1_1 = null; Slice slice1_0 = null, slice1_1 = null;