mirror of
https://github.com/apache/lucene.git
synced 2025-02-21 01:18:45 +00:00
SOLR-3755: Refactored ShardSplitTest and ChaosMonkeyShardSplitTest to share code.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1467799 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b1391bef1c
commit
f975a02658
@ -17,23 +17,8 @@ package org.apache.solr.cloud;
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServer;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrServer;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.common.SolrDocument;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
@ -42,84 +27,40 @@ import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.Hash;
|
||||
import org.apache.solr.handler.admin.CollectionsHandler;
|
||||
import org.apache.solr.handler.component.HttpShardHandlerFactory;
|
||||
import org.apache.solr.update.DirectUpdateHandler2;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Test split phase that occurs when a Collection API split call is made.
|
||||
*/
|
||||
@Slow
|
||||
public class ChaosMonkeyShardSplitTest extends AbstractFullDistribZkTestBase {
|
||||
|
||||
public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
|
||||
|
||||
static final int TIMEOUT = 10000;
|
||||
private AtomicInteger killCounter = new AtomicInteger();
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeSuperClass() throws Exception {}
|
||||
|
||||
@AfterClass
|
||||
public static void afterSuperClass() {
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Before
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
useFactory(null);
|
||||
System.setProperty("numShards", Integer.toString(sliceCount));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (VERBOSE || printLayoutOnTearDown) {
|
||||
super.printLayout();
|
||||
}
|
||||
if (controlClient != null) {
|
||||
controlClient.shutdown();
|
||||
}
|
||||
if (cloudClient != null) {
|
||||
cloudClient.shutdown();
|
||||
}
|
||||
if (controlClientCloud != null) {
|
||||
controlClientCloud.shutdown();
|
||||
}
|
||||
super.tearDown();
|
||||
|
||||
resetExceptionIgnores();
|
||||
System.clearProperty("zkHost");
|
||||
System.clearProperty("numShards");
|
||||
System.clearProperty("solr.xml.persist");
|
||||
|
||||
// insurance
|
||||
DirectUpdateHandler2.commitOnClose = true;
|
||||
}
|
||||
|
||||
public ChaosMonkeyShardSplitTest() {
|
||||
super();
|
||||
// fixShardCount = true;
|
||||
// sliceCount = 1;
|
||||
// shardCount = TEST_NIGHTLY ? 7 : 4;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void doTest() throws Exception {
|
||||
handle.clear();
|
||||
handle.put("QTime", SKIPVAL);
|
||||
handle.put("timestamp", SKIPVAL);
|
||||
|
||||
waitForThingsToLevelOut(15);
|
||||
printLayout();
|
||||
|
||||
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
|
||||
DocRouter router = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION).getRouter();
|
||||
@ -128,43 +69,26 @@ public class ChaosMonkeyShardSplitTest extends AbstractFullDistribZkTestBase {
|
||||
final List<DocRouter.Range> ranges = router.partitionRange(2, shard1Range);
|
||||
final int[] docCounts = new int[ranges.size()];
|
||||
int numReplicas = shard1.getReplicas().size();
|
||||
|
||||
Thread indexThread = null;
|
||||
OverseerRestarter killer = null;
|
||||
Thread killerThread = null;
|
||||
final SolrServer solrServer = clients.get(0);
|
||||
|
||||
|
||||
try {
|
||||
solrServer.deleteByQuery("*:*");
|
||||
for (int i = 0; i < 100; i++) {
|
||||
indexr("id", i);
|
||||
|
||||
// todo - hook in custom hashing
|
||||
byte[] bytes = String.valueOf(i).getBytes("UTF-8");
|
||||
int hash = Hash.murmurhash3_x86_32(bytes, 0, bytes.length, 0);
|
||||
for (int i2 = 0; i2 < ranges.size(); i2++) {
|
||||
DocRouter.Range range = ranges.get(i2);
|
||||
if (range.includes(hash)) docCounts[i2]++;
|
||||
}
|
||||
del("*:*");
|
||||
for (int id = 0; id < 100; id++) {
|
||||
indexAndUpdateCount(ranges, docCounts, id);
|
||||
}
|
||||
solrServer.commit();
|
||||
|
||||
waitForRecoveriesToFinish(false);
|
||||
|
||||
commit();
|
||||
|
||||
indexThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
for (int i = 101; i < 201; i++) {
|
||||
for (int id = 101; id < atLeast(401); id++) {
|
||||
try {
|
||||
indexr("id", i);
|
||||
|
||||
// todo - hook in custom hashing
|
||||
byte[] bytes = String.valueOf(i).getBytes("UTF-8");
|
||||
int hash = Hash.murmurhash3_x86_32(bytes, 0, bytes.length, 0);
|
||||
for (int i2 = 0; i2 < ranges.size(); i2++) {
|
||||
DocRouter.Range range = ranges.get(i2);
|
||||
if (range.includes(hash)) docCounts[i2]++;
|
||||
}
|
||||
Thread.sleep(100);
|
||||
indexAndUpdateCount(ranges, docCounts, id);
|
||||
Thread.sleep(atLeast(25));
|
||||
} catch (Exception e) {
|
||||
log.error("Exception while adding doc", e);
|
||||
}
|
||||
@ -172,68 +96,50 @@ public class ChaosMonkeyShardSplitTest extends AbstractFullDistribZkTestBase {
|
||||
}
|
||||
};
|
||||
indexThread.start();
|
||||
|
||||
|
||||
// kill the leader
|
||||
CloudJettyRunner leaderJetty = shardToLeaderJetty.get("shard1");
|
||||
log.info("Cluster State: "
|
||||
+ cloudClient.getZkStateReader().getClusterState());
|
||||
|
||||
chaosMonkey.killJetty(leaderJetty);
|
||||
|
||||
|
||||
Thread.sleep(2000);
|
||||
|
||||
|
||||
waitForThingsToLevelOut(90);
|
||||
|
||||
|
||||
Thread.sleep(1000);
|
||||
checkShardConsistency(false, true);
|
||||
|
||||
|
||||
CloudJettyRunner deadJetty = leaderJetty;
|
||||
|
||||
|
||||
// TODO: Check total docs ?
|
||||
// long cloudClientDocs = cloudClient.query(new
|
||||
// SolrQuery("*:*")).getResults().getNumFound();
|
||||
|
||||
|
||||
// Wait until new leader is elected
|
||||
while (deadJetty == leaderJetty) {
|
||||
updateMappingsFromZk(this.jettys, this.clients);
|
||||
leaderJetty = shardToLeaderJetty.get("shard1");
|
||||
}
|
||||
|
||||
|
||||
// bring back dead node
|
||||
ChaosMonkey.start(deadJetty.jetty); // he is not the leader anymore
|
||||
|
||||
|
||||
waitTillRecovered();
|
||||
|
||||
|
||||
// Kill the overseer
|
||||
// TODO: Actually kill the Overseer instance
|
||||
killer = new OverseerRestarter(zkServer.getZkAddress());
|
||||
killerThread = new Thread(killer);
|
||||
killerThread.start();
|
||||
killCounter.incrementAndGet();
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set("action",
|
||||
CollectionParams.CollectionAction.SPLITSHARD.toString());
|
||||
params.set("collection", "collection1");
|
||||
params.set("shard", "shard1");
|
||||
SolrRequest request = new QueryRequest(params);
|
||||
request.setPath("/admin/collections");
|
||||
|
||||
String baseUrl = ((HttpSolrServer) shardToJetty.get("shard1").get(0).client.solrClient)
|
||||
.getBaseURL();
|
||||
baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
|
||||
|
||||
HttpSolrServer baseServer = new HttpSolrServer(baseUrl);
|
||||
baseServer.setConnectionTimeout(15000);
|
||||
baseServer.setSoTimeout((int) (CollectionsHandler.DEFAULT_ZK_TIMEOUT * 5));
|
||||
baseServer.request(request);
|
||||
|
||||
System.out.println("Layout after split: \n");
|
||||
|
||||
splitShard(SHARD1);
|
||||
|
||||
log.info("Layout after split: \n");
|
||||
printLayout();
|
||||
|
||||
// distributed commit on all shards
|
||||
|
||||
// distributed commit on all shards
|
||||
} finally {
|
||||
if(indexThread != null)
|
||||
if (indexThread != null)
|
||||
indexThread.join();
|
||||
if (solrServer != null)
|
||||
solrServer.commit();
|
||||
@ -244,106 +150,25 @@ public class ChaosMonkeyShardSplitTest extends AbstractFullDistribZkTestBase {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SolrQuery query = new SolrQuery("*:*").setRows(0).setFields("id");
|
||||
query.set("distrib", false);
|
||||
|
||||
String shard1_0_url = cloudClient.getZkStateReader().getLeaderUrl(
|
||||
AbstractFullDistribZkTestBase.DEFAULT_COLLECTION, "shard1_0",
|
||||
DEFAULT_CONNECTION_TIMEOUT);
|
||||
HttpSolrServer shard1_0Server = new HttpSolrServer(shard1_0_url);
|
||||
QueryResponse response = shard1_0Server.query(query);
|
||||
long shard10Count = response.getResults().getNumFound();
|
||||
System.out.println("Resp: shard: shard1_0 url: " + shard1_0_url + "\n"
|
||||
+ response.getResponse());
|
||||
|
||||
String shard1_1_url = cloudClient.getZkStateReader().getLeaderUrl(
|
||||
AbstractFullDistribZkTestBase.DEFAULT_COLLECTION, "shard1_1",
|
||||
DEFAULT_CONNECTION_TIMEOUT);
|
||||
HttpSolrServer shard1_1Server = new HttpSolrServer(shard1_1_url);
|
||||
QueryResponse response2 = shard1_1Server.query(query);
|
||||
long shard11Count = response2.getResults().getNumFound();
|
||||
System.out.println("Resp: shard: shard1_1 url: " + shard1_1_url + "\n"
|
||||
+ response2.getResponse());
|
||||
|
||||
for (int i = 0; i < docCounts.length; i++) {
|
||||
int docCount = docCounts[i];
|
||||
System.out
|
||||
.println("Expected docCount for shard1_" + i + " = " + docCount);
|
||||
}
|
||||
|
||||
// DEBUGGING CODE
|
||||
log.info("Actual docCount for shard1_0 = {}", shard10Count);
|
||||
log.info("Actual docCount for shard1_1 = {}", shard11Count);
|
||||
Map<String, String> idVsVersion = new HashMap<String, String>();
|
||||
Map<String, SolrDocument> shard10Docs = new HashMap<String, SolrDocument>();
|
||||
Map<String, SolrDocument> shard11Docs = new HashMap<String, SolrDocument>();
|
||||
for (int i = 0; i < response.getResults().size(); i++) {
|
||||
SolrDocument document = response.getResults().get(i);
|
||||
idVsVersion.put(document.getFieldValue("id").toString(), document.getFieldValue("_version_").toString());
|
||||
SolrDocument old = shard10Docs.put(document.getFieldValue("id").toString(), document);
|
||||
if (old != null) {
|
||||
log.error("EXTRA: ID: " + document.getFieldValue("id") + " on shard1_0. Old version: " + old.getFieldValue("_version_") + " new version: " + document.getFieldValue("_version_"));
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < response2.getResults().size(); i++) {
|
||||
SolrDocument document = response2.getResults().get(i);
|
||||
String value = document.getFieldValue("id").toString();
|
||||
String version = idVsVersion.get(value);
|
||||
if (version != null) {
|
||||
log.error("DUPLICATE: ID: " + value + " , shard1_0Version: " + version + " shard1_1Version:" + document.getFieldValue("_version_"));
|
||||
}
|
||||
SolrDocument old = shard11Docs.put(document.getFieldValue("id").toString(), document);
|
||||
if (old != null) {
|
||||
log.error("EXTRA: ID: " + document.getFieldValue("id") + " on shard1_1. Old version: " + old.getFieldValue("_version_") + " new version: " + document.getFieldValue("_version_"));
|
||||
}
|
||||
}
|
||||
// END DEBUGGING CODE
|
||||
|
||||
assertEquals("Wrong doc count on shard1_0", docCounts[0], shard10Count);
|
||||
assertEquals("Wrong doc count on shard1_1", docCounts[1], shard11Count);
|
||||
checkDocCountsAndShardStates(docCounts, numReplicas);
|
||||
|
||||
Slice slice1_0 = null, slice1_1 = null;
|
||||
int i = 0;
|
||||
for (i = 0; i < 10; i++) {
|
||||
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
|
||||
zkStateReader.updateClusterState(true);
|
||||
clusterState = zkStateReader.getClusterState();
|
||||
slice1_0 = clusterState.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, "shard1_0");
|
||||
slice1_1 = clusterState.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, "shard1_1");
|
||||
if (Slice.ACTIVE.equals(slice1_0.getState()) && Slice.ACTIVE.equals(slice1_1.getState()))
|
||||
break;
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
log.info("ShardSplitTest waited for {} ms for shard state to be set to active", i * 500);
|
||||
|
||||
assertNotNull("Cluster state does not contain shard1_0", slice1_0);
|
||||
assertNotNull("Cluster state does not contain shard1_0", slice1_1);
|
||||
assertEquals("shard1_0 is not active", Slice.ACTIVE, slice1_0.getState());
|
||||
assertEquals("shard1_1 is not active", Slice.ACTIVE, slice1_1.getState());
|
||||
assertEquals("Wrong number of replicas created for shard1_0", numReplicas, slice1_0.getReplicas().size());
|
||||
assertEquals("Wrong number of replicas created for shard1_1", numReplicas, slice1_1.getReplicas().size());
|
||||
|
||||
// todo - can't call waitForThingsToLevelOut because it looks for
|
||||
// jettys of all shards
|
||||
// and the new sub-shards don't have any.
|
||||
waitForRecoveriesToFinish(true);
|
||||
// waitForThingsToLevelOut(15);
|
||||
|
||||
// todo - more and better tests
|
||||
|
||||
}
|
||||
|
||||
|
||||
private class OverseerRestarter implements Runnable {
|
||||
SolrZkClient overseerClient = null;
|
||||
public volatile boolean run = true;
|
||||
private final String zkAddress;
|
||||
|
||||
|
||||
public OverseerRestarter(String zkAddress) {
|
||||
this.zkAddress = zkAddress;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
@ -379,7 +204,7 @@ public class ChaosMonkeyShardSplitTest extends AbstractFullDistribZkTestBase {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void waitTillRecovered() throws Exception {
|
||||
for (int i = 0; i < 30; i++) {
|
||||
Thread.sleep(3000);
|
||||
@ -393,7 +218,7 @@ public class ChaosMonkeyShardSplitTest extends AbstractFullDistribZkTestBase {
|
||||
for (Replica replica : replicas) {
|
||||
if (!clusterState.liveNodesContain(replica.getNodeName())
|
||||
|| !replica.get(ZkStateReader.STATE_PROP).equals(
|
||||
ZkStateReader.ACTIVE)) {
|
||||
ZkStateReader.ACTIVE)) {
|
||||
allActive = false;
|
||||
break;
|
||||
}
|
||||
@ -405,26 +230,7 @@ public class ChaosMonkeyShardSplitTest extends AbstractFullDistribZkTestBase {
|
||||
printLayout();
|
||||
fail("timeout waiting to see recovered node");
|
||||
}
|
||||
|
||||
protected void indexDoc(List<CloudJettyRunner> skipServers, Object... fields)
|
||||
throws IOException, SolrServerException {
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
|
||||
addFields(doc, fields);
|
||||
addFields(doc, "rnd_b", true);
|
||||
|
||||
controlClient.add(doc);
|
||||
|
||||
UpdateRequest ureq = new UpdateRequest();
|
||||
ureq.add(doc);
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
for (CloudJettyRunner skip : skipServers) {
|
||||
params.add("test.distrib.skip.servers", skip.url + "/");
|
||||
}
|
||||
ureq.setParams(params);
|
||||
ureq.process(cloudClient);
|
||||
}
|
||||
|
||||
|
||||
// skip the randoms - they can deadlock...
|
||||
@Override
|
||||
protected void indexr(Object... fields) throws Exception {
|
||||
@ -433,10 +239,10 @@ public class ChaosMonkeyShardSplitTest extends AbstractFullDistribZkTestBase {
|
||||
addFields(doc, "rnd_b", true);
|
||||
indexDoc(doc);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Elects a new overseer
|
||||
*
|
||||
*
|
||||
* @return SolrZkClient
|
||||
*/
|
||||
private SolrZkClient electNewOverseer(String address) throws KeeperException,
|
||||
@ -444,7 +250,7 @@ public class ChaosMonkeyShardSplitTest extends AbstractFullDistribZkTestBase {
|
||||
SolrZkClient zkClient = new SolrZkClient(address, TIMEOUT);
|
||||
ZkStateReader reader = new ZkStateReader(zkClient);
|
||||
LeaderElector overseerElector = new LeaderElector(zkClient);
|
||||
|
||||
|
||||
// TODO: close Overseer
|
||||
Overseer overseer = new Overseer(
|
||||
new HttpShardHandlerFactory().getShardHandler(), "/admin/cores", reader);
|
||||
@ -455,5 +261,5 @@ public class ChaosMonkeyShardSplitTest extends AbstractFullDistribZkTestBase {
|
||||
overseerElector.joinElection(ec, false);
|
||||
return zkClient;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -19,7 +19,7 @@ package org.apache.solr.cloud;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServer;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrServer;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
@ -34,9 +34,11 @@ import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.Hash;
|
||||
import org.apache.solr.handler.admin.CollectionsHandler;
|
||||
import org.apache.solr.update.DirectUpdateHandler2;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -46,13 +48,6 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
||||
public static final String SHARD1_0 = SHARD1 + "_0";
|
||||
public static final String SHARD1_1 = SHARD1 + "_1";
|
||||
|
||||
public ShardSplitTest() {
|
||||
super();
|
||||
// fixShardCount = true;
|
||||
// shardCount = 8;
|
||||
// sliceCount = 4;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
@ -93,7 +88,6 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
||||
@Override
|
||||
public void doTest() throws Exception {
|
||||
waitForThingsToLevelOut(15);
|
||||
printLayout();
|
||||
|
||||
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
|
||||
DocRouter router = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION).getRouter();
|
||||
@ -103,40 +97,19 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
||||
final int[] docCounts = new int[ranges.size()];
|
||||
int numReplicas = shard1.getReplicas().size();
|
||||
|
||||
final SolrServer solrServer = clients.get(0);
|
||||
solrServer.deleteByQuery("*:*");
|
||||
for (int i = 0; i < 100; i++) {
|
||||
indexr("id", i);
|
||||
|
||||
// todo - hook in custom hashing
|
||||
byte[] bytes = String.valueOf(i).getBytes("UTF-8");
|
||||
int hash = Hash.murmurhash3_x86_32(bytes, 0, bytes.length, 0);
|
||||
for (int i2 = 0; i2 < ranges.size(); i2++) {
|
||||
DocRouter.Range range = ranges.get(i2);
|
||||
if (range.includes(hash))
|
||||
docCounts[i2]++;
|
||||
}
|
||||
del("*:*");
|
||||
for (int id = 0; id < 100; id++) {
|
||||
indexAndUpdateCount(ranges, docCounts, id);
|
||||
}
|
||||
solrServer.commit();
|
||||
|
||||
waitForRecoveriesToFinish(false);
|
||||
commit();
|
||||
|
||||
Thread indexThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
for (int i = 101; i < 201; i++) {
|
||||
for (int id = 101; id < atLeast(401); id++) {
|
||||
try {
|
||||
indexr("id", i);
|
||||
|
||||
// todo - hook in custom hashing
|
||||
byte[] bytes = String.valueOf(i).getBytes("UTF-8");
|
||||
int hash = Hash.murmurhash3_x86_32(bytes, 0, bytes.length, 0);
|
||||
for (int i2 = 0; i2 < ranges.size(); i2++) {
|
||||
DocRouter.Range range = ranges.get(i2);
|
||||
if (range.includes(hash))
|
||||
docCounts[i2]++;
|
||||
}
|
||||
Thread.sleep(100);
|
||||
indexAndUpdateCount(ranges, docCounts, id);
|
||||
Thread.sleep(atLeast(25));
|
||||
} catch (Exception e) {
|
||||
log.error("Exception while adding doc", e);
|
||||
}
|
||||
@ -145,10 +118,71 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
||||
};
|
||||
indexThread.start();
|
||||
|
||||
splitShard(SHARD1);
|
||||
|
||||
log.info("Layout after split: \n");
|
||||
printLayout();
|
||||
|
||||
indexThread.join();
|
||||
|
||||
commit();
|
||||
|
||||
checkDocCountsAndShardStates(docCounts, numReplicas);
|
||||
|
||||
// todo can't call waitForThingsToLevelOut because it looks for jettys of all shards
|
||||
// and the new sub-shards don't have any.
|
||||
waitForRecoveriesToFinish(true);
|
||||
//waitForThingsToLevelOut(15);
|
||||
}
|
||||
|
||||
protected void checkDocCountsAndShardStates(int[] docCounts, int numReplicas) throws SolrServerException, KeeperException, InterruptedException {
|
||||
SolrQuery query = new SolrQuery("*:*").setRows(1000).setFields("id", "_version_");
|
||||
query.set("distrib", false);
|
||||
|
||||
ZkCoreNodeProps shard1_0 = getLeaderUrlFromZk(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1_0);
|
||||
HttpSolrServer shard1_0Server = new HttpSolrServer(shard1_0.getCoreUrl());
|
||||
QueryResponse response = shard1_0Server.query(query);
|
||||
long shard10Count = response.getResults().getNumFound();
|
||||
|
||||
ZkCoreNodeProps shard1_1 = getLeaderUrlFromZk(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1_1);
|
||||
HttpSolrServer shard1_1Server = new HttpSolrServer(shard1_1.getCoreUrl());
|
||||
QueryResponse response2 = shard1_1Server.query(query);
|
||||
long shard11Count = response2.getResults().getNumFound();
|
||||
|
||||
logDebugHelp(docCounts, response, shard10Count, response2, shard11Count);
|
||||
|
||||
assertEquals("Wrong doc count on shard1_0", docCounts[0], shard10Count);
|
||||
assertEquals("Wrong doc count on shard1_1", docCounts[1], shard11Count);
|
||||
|
||||
ClusterState clusterState = null;
|
||||
Slice slice1_0 = null, slice1_1 = null;
|
||||
int i = 0;
|
||||
for (i = 0; i < 10; i++) {
|
||||
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
|
||||
zkStateReader.updateClusterState(true);
|
||||
clusterState = zkStateReader.getClusterState();
|
||||
slice1_0 = clusterState.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, "shard1_0");
|
||||
slice1_1 = clusterState.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, "shard1_1");
|
||||
if (Slice.ACTIVE.equals(slice1_0.getState()) && Slice.ACTIVE.equals(slice1_1.getState()))
|
||||
break;
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
log.info("ShardSplitTest waited for {} ms for shard state to be set to active", i * 500);
|
||||
|
||||
assertNotNull("Cluster state does not contain shard1_0", slice1_0);
|
||||
assertNotNull("Cluster state does not contain shard1_0", slice1_1);
|
||||
assertEquals("shard1_0 is not active", Slice.ACTIVE, slice1_0.getState());
|
||||
assertEquals("shard1_1 is not active", Slice.ACTIVE, slice1_1.getState());
|
||||
assertEquals("Wrong number of replicas created for shard1_0", numReplicas, slice1_0.getReplicas().size());
|
||||
assertEquals("Wrong number of replicas created for shard1_1", numReplicas, slice1_1.getReplicas().size());
|
||||
}
|
||||
|
||||
protected void splitShard(String shardId) throws SolrServerException, IOException {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set("action", CollectionParams.CollectionAction.SPLITSHARD.toString());
|
||||
params.set("collection", "collection1");
|
||||
params.set("shard", SHARD1);
|
||||
params.set("shard", shardId);
|
||||
SolrRequest request = new QueryRequest(params);
|
||||
request.setPath("/admin/collections");
|
||||
|
||||
@ -160,35 +194,27 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
||||
baseServer.setConnectionTimeout(15000);
|
||||
baseServer.setSoTimeout((int) (CollectionsHandler.DEFAULT_ZK_TIMEOUT * 5));
|
||||
baseServer.request(request);
|
||||
}
|
||||
|
||||
log.info("Layout after split: \n");
|
||||
printLayout();
|
||||
protected void indexAndUpdateCount(List<DocRouter.Range> ranges, int[] docCounts, int id) throws Exception {
|
||||
indexr("id", id);
|
||||
|
||||
indexThread.join();
|
||||
|
||||
solrServer.commit(); // distributed commit on all shards
|
||||
|
||||
SolrQuery query = new SolrQuery("*:*").setRows(1000).setFields("id", "_version_");
|
||||
query.set("distrib", false);
|
||||
|
||||
ZkCoreNodeProps shard1_0 = getLeaderUrlFromZk(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1_0);
|
||||
HttpSolrServer shard1_0Server = new HttpSolrServer(shard1_0.getCoreUrl());
|
||||
QueryResponse response = shard1_0Server.query(query);
|
||||
long shard10Count = response.getResults().getNumFound();
|
||||
// log.info("Resp: shard: shard1_0 url: " + shard1_0.getCoreUrl() + "\n" + response.getResponse());
|
||||
|
||||
ZkCoreNodeProps shard1_1 = getLeaderUrlFromZk(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1_1);
|
||||
HttpSolrServer shard1_1Server = new HttpSolrServer(shard1_1.getCoreUrl());
|
||||
QueryResponse response2 = shard1_1Server.query(query);
|
||||
long shard11Count = response2.getResults().getNumFound();
|
||||
//log.info("Resp: shard: shard1_1 url: " + shard1_1.getCoreUrl() + "\n" + response.getResponse());
|
||||
// todo - hook in custom hashing
|
||||
byte[] bytes = String.valueOf(id).getBytes("UTF-8");
|
||||
int hash = Hash.murmurhash3_x86_32(bytes, 0, bytes.length, 0);
|
||||
for (int i = 0; i < ranges.size(); i++) {
|
||||
DocRouter.Range range = ranges.get(i);
|
||||
if (range.includes(hash))
|
||||
docCounts[i]++;
|
||||
}
|
||||
}
|
||||
|
||||
protected void logDebugHelp(int[] docCounts, QueryResponse response, long shard10Count, QueryResponse response2, long shard11Count) {
|
||||
for (int i = 0; i < docCounts.length; i++) {
|
||||
int docCount = docCounts[i];
|
||||
log.info("Expected docCount for shard1_{} = {}", i, docCount);
|
||||
}
|
||||
|
||||
// DEBUGGING CODE
|
||||
log.info("Actual docCount for shard1_0 = {}", shard10Count);
|
||||
log.info("Actual docCount for shard1_1 = {}", shard11Count);
|
||||
Map<String, String> idVsVersion = new HashMap<String, String>();
|
||||
@ -214,39 +240,6 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
||||
log.error("EXTRA: ID: " + document.getFieldValue("id") + " on shard1_1. Old version: " + old.getFieldValue("_version_") + " new version: " + document.getFieldValue("_version_"));
|
||||
}
|
||||
}
|
||||
// END DEBUGGING CODE
|
||||
|
||||
assertEquals("Wrong doc count on shard1_0", docCounts[0], shard10Count);
|
||||
assertEquals("Wrong doc count on shard1_1", docCounts[1], shard11Count);
|
||||
|
||||
Slice slice1_0 = null, slice1_1 = null;
|
||||
int i = 0;
|
||||
for (i = 0; i < 10; i++) {
|
||||
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
|
||||
zkStateReader.updateClusterState(true);
|
||||
clusterState = zkStateReader.getClusterState();
|
||||
slice1_0 = clusterState.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, "shard1_0");
|
||||
slice1_1 = clusterState.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, "shard1_1");
|
||||
if (Slice.ACTIVE.equals(slice1_0.getState()) && Slice.ACTIVE.equals(slice1_1.getState()))
|
||||
break;
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
log.info("ShardSplitTest waited for {} ms for shard state to be set to active", i * 500);
|
||||
|
||||
assertNotNull("Cluster state does not contain shard1_0", slice1_0);
|
||||
assertNotNull("Cluster state does not contain shard1_0", slice1_1);
|
||||
assertEquals("shard1_0 is not active", Slice.ACTIVE, slice1_0.getState());
|
||||
assertEquals("shard1_1 is not active", Slice.ACTIVE, slice1_1.getState());
|
||||
assertEquals("Wrong number of replicas created for shard1_0", numReplicas, slice1_0.getReplicas().size());
|
||||
assertEquals("Wrong number of replicas created for shard1_1", numReplicas, slice1_1.getReplicas().size());
|
||||
|
||||
// todo can't call waitForThingsToLevelOut because it looks for jettys of all shards
|
||||
// and the new sub-shards don't have any.
|
||||
waitForRecoveriesToFinish(true);
|
||||
//waitForThingsToLevelOut(15);
|
||||
|
||||
// todo - more and better tests
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user