mirror of https://github.com/apache/lucene.git
SOLR-3088: create shard placeholders
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1242639 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7c51126b86
commit
9332f2e027
|
@ -71,7 +71,6 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
|
|||
}
|
||||
}
|
||||
|
||||
public static final String ASSIGNMENTS_NODE = "/node_assignments";
|
||||
public static final String STATES_NODE = "/node_states";
|
||||
private static Logger log = LoggerFactory.getLogger(Overseer.class);
|
||||
|
||||
|
@ -189,7 +188,13 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
|
|||
private CloudState updateState(CloudState state, String nodeName, CoreState coreState) throws KeeperException, InterruptedException {
|
||||
String collection = coreState.getCollectionName();
|
||||
String zkCoreNodeName = coreState.getCoreNodeName();
|
||||
|
||||
|
||||
//collection does not yet exist, create placeholders if num shards is specified
|
||||
if (!state.getCollections().contains(coreState.getCollectionName())
|
||||
&& coreState.getNumShards() != null) {
|
||||
state = createCollection(state, collection, coreState.getNumShards());
|
||||
}
|
||||
|
||||
// use the provided non null shardId
|
||||
String shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP);
|
||||
if(shardId==null) {
|
||||
|
@ -220,6 +225,19 @@ public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
|
|||
return newCloudState;
|
||||
}
|
||||
|
||||
private CloudState createCollection(CloudState state, String collectionName, int numShards) {
|
||||
Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String, Slice>>();
|
||||
Map<String, Slice> newSlices = new LinkedHashMap<String,Slice>();
|
||||
newStates.putAll(state.getCollectionStates());
|
||||
for (int i = 0; i < numShards; i++) {
|
||||
final String sliceName = "shard" + (i+1);
|
||||
newSlices.put(sliceName, new Slice(sliceName, Collections.EMPTY_MAP));
|
||||
}
|
||||
newStates.put(collectionName, newSlices);
|
||||
CloudState newCloudState = new CloudState(state.getLiveNodes(), newStates);
|
||||
return newCloudState;
|
||||
}
|
||||
|
||||
/*
|
||||
* Return an already assigned id or null if not assigned
|
||||
*/
|
||||
|
|
|
@ -222,9 +222,14 @@ public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
|
|||
protected void createServers(int numServers) throws Exception {
|
||||
|
||||
System.setProperty("collection", "control_collection");
|
||||
String numShards = System.getProperty(ZkStateReader.NUM_SHARDS_PROP);
|
||||
System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
|
||||
controlJetty = createJetty(testDir, testDir + "/control/data",
|
||||
"control_shard");
|
||||
System.clearProperty("collection");
|
||||
if(numShards != null) {
|
||||
System.setProperty(ZkStateReader.NUM_SHARDS_PROP, numShards);
|
||||
}
|
||||
controlClient = createNewSolrServer(controlJetty.getLocalPort());
|
||||
|
||||
createJettys(numServers, true);
|
||||
|
|
|
@ -156,8 +156,8 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
assertEquals("shard2", ids[4]);
|
||||
assertEquals("shard3", ids[5]);
|
||||
|
||||
waitForSliceCount(reader, "collection1", 3);
|
||||
|
||||
waitForCollections(reader, "collection1");
|
||||
|
||||
//make sure leaders are in cloud state
|
||||
assertNotNull(reader.getLeaderUrl("collection1", "shard1", 15000));
|
||||
assertNotNull(reader.getLeaderUrl("collection1", "shard2", 15000));
|
||||
|
@ -345,20 +345,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
|
||||
}
|
||||
|
||||
//wait until i slices for collection have appeared
|
||||
private void waitForSliceCount(ZkStateReader stateReader, String collection, int i) throws InterruptedException, KeeperException {
|
||||
waitForCollections(stateReader, collection);
|
||||
int maxIterations = 200;
|
||||
while (0 < maxIterations--) {
|
||||
CloudState state = stateReader.getCloudState();
|
||||
Map<String,Slice> sliceMap = state.getSlices(collection);
|
||||
if (sliceMap != null && sliceMap.keySet().size() == i) {
|
||||
return;
|
||||
}
|
||||
Thread.sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
//wait until collections are available
|
||||
private void waitForCollections(ZkStateReader stateReader, String... collections) throws InterruptedException, KeeperException {
|
||||
int maxIterations = 100;
|
||||
|
@ -425,12 +411,10 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
throw ke;
|
||||
}
|
||||
}
|
||||
//publish node state (recovering)
|
||||
zkClient.setData(nodePath, ZkStateReader.toJSON(new CoreState[]{state}), true);
|
||||
|
||||
//wait overseer assignment
|
||||
waitForSliceCount(reader, "collection1", 1);
|
||||
|
||||
zkClient.setData(nodePath, ZkStateReader.toJSON(new CoreState[]{state}), true);
|
||||
waitForCollections(reader, "collection1");
|
||||
|
||||
assertEquals(reader.getCloudState().toString(), ZkStateReader.RECOVERING,
|
||||
reader.getCloudState().getSlice("collection1", "shard1").getShards()
|
||||
.get("node1_core1").get(ZkStateReader.STATE_PROP));
|
||||
|
@ -446,7 +430,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
verifyStatus(reader, ZkStateReader.ACTIVE);
|
||||
|
||||
} finally {
|
||||
System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
|
||||
|
||||
if (zkClient != null) {
|
||||
zkClient.close();
|
||||
|
@ -466,9 +449,12 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
int maxIterations = 100;
|
||||
String coreState = null;
|
||||
while(maxIterations-->0) {
|
||||
coreState = reader.getCloudState().getSlice("collection1", "shard1").getShards().get("node1_core1").get(ZkStateReader.STATE_PROP);
|
||||
if(coreState.equals(expectedState)) {
|
||||
return;
|
||||
Slice slice = reader.getCloudState().getSlice("collection1", "shard1");
|
||||
if(slice!=null) {
|
||||
coreState = slice.getShards().get("node1_core1").get(ZkStateReader.STATE_PROP);
|
||||
if(coreState.equals(expectedState)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
Thread.sleep(50);
|
||||
}
|
||||
|
@ -479,7 +465,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
public void testOverseerFailure() throws Exception {
|
||||
String zkDir = dataDir.getAbsolutePath() + File.separator
|
||||
+ "zookeeper/server1/data";
|
||||
|
||||
ZkTestServer server = new ZkTestServer(zkDir);
|
||||
|
||||
SolrZkClient controllerClient = null;
|
||||
|
@ -501,11 +486,10 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
|
||||
overseerClient = electNewOverseer(server.getZkAddress());
|
||||
|
||||
Thread.sleep(1000);
|
||||
mockController.publishState("core1", ZkStateReader.RECOVERING, 1);
|
||||
|
||||
// wait overseer assignment
|
||||
waitForSliceCount(reader, "collection1", 1);
|
||||
|
||||
|
||||
waitForCollections(reader, "collection1");
|
||||
verifyStatus(reader, ZkStateReader.RECOVERING);
|
||||
|
||||
int version = getCloudStateVersion(controllerClient);
|
||||
|
@ -556,8 +540,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
String zkDir = dataDir.getAbsolutePath() + File.separator
|
||||
+ "zookeeper/server1/data";
|
||||
|
||||
System.setProperty(ZkStateReader.NUM_SHARDS_PROP, "2");
|
||||
|
||||
ZkTestServer server = new ZkTestServer(zkDir);
|
||||
|
||||
SolrZkClient controllerClient = null;
|
||||
|
@ -582,8 +564,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
|
||||
mockController.publishState("core1", ZkStateReader.RECOVERING, 1);
|
||||
|
||||
// wait overseer assignment
|
||||
waitForSliceCount(reader, "collection1", 1);
|
||||
waitForCollections(reader, "collection1");
|
||||
|
||||
verifyStatus(reader, ZkStateReader.RECOVERING);
|
||||
|
||||
|
@ -598,10 +579,70 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
|
||||
reader.updateCloudState(true);
|
||||
CloudState state = reader.getCloudState();
|
||||
assertEquals("more than 1 shard id was assigned to same core", 1, state.getSlices("collection1").size());
|
||||
|
||||
int numFound = 0;
|
||||
for (Map<String,Slice> collection : state.getCollectionStates().values()) {
|
||||
for (Slice slice : collection.values()) {
|
||||
if (slice.getShards().get("node1_core1") != null) {
|
||||
numFound++;
|
||||
}
|
||||
}
|
||||
}
|
||||
assertEquals("Shard was found in more than 1 times in CloudState", 1,
|
||||
numFound);
|
||||
|
||||
} finally {
|
||||
System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
|
||||
if (overseerClient != null) {
|
||||
overseerClient.close();
|
||||
}
|
||||
if (mockController != null) {
|
||||
mockController.close();
|
||||
}
|
||||
|
||||
if (controllerClient != null) {
|
||||
controllerClient.close();
|
||||
}
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
server.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPlaceholders() throws Exception {
|
||||
String zkDir = dataDir.getAbsolutePath() + File.separator
|
||||
+ "zookeeper/server1/data";
|
||||
|
||||
ZkTestServer server = new ZkTestServer(zkDir);
|
||||
|
||||
SolrZkClient controllerClient = null;
|
||||
SolrZkClient overseerClient = null;
|
||||
ZkStateReader reader = null;
|
||||
MockZKController mockController = null;
|
||||
|
||||
try {
|
||||
server.run();
|
||||
controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
|
||||
|
||||
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
|
||||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
|
||||
|
||||
reader = new ZkStateReader(controllerClient);
|
||||
reader.createClusterStateWatchersAndUpdate();
|
||||
|
||||
mockController = new MockZKController(server.getZkAddress(), "node1");
|
||||
|
||||
overseerClient = electNewOverseer(server.getZkAddress());
|
||||
|
||||
mockController.publishState("core1", ZkStateReader.RECOVERING, 12);
|
||||
|
||||
waitForCollections(reader, "collection1");
|
||||
|
||||
assertEquals("Slicecount does not match", 12, reader.getCloudState().getSlices("collection1").size());
|
||||
|
||||
} finally {
|
||||
if (overseerClient != null) {
|
||||
overseerClient.close();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue