mirror of https://github.com/apache/lucene.git
SOLR-2799: Update CloudState incrementally rather than always reading the data at each zk node.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1177360 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
02736c0976
commit
d4b4aefabb
|
@ -91,6 +91,8 @@ New Features
|
|||
Additional Work:
|
||||
SOLR-2324: SolrCloud solr.xml parameters are not persisted by CoreContainer.
|
||||
(Massimo Schiavon, Mark Miller)
|
||||
SOLR-2799: Update CloudState incrementally rather than always reading the data at each zk
|
||||
node. (Jamie Johnson via Mark Miller)
|
||||
|
||||
* SOLR-1729: Evaluation of NOW for date math is done only once per request for
|
||||
consistency, and is also propagated to shards in distributed search.
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.solr.cloud;
|
|||
*/
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -26,12 +27,14 @@ import org.apache.solr.common.cloud.CloudState;
|
|||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.CoreContainer.Initializer;
|
||||
import org.apache.solr.core.CoreDescriptor;
|
||||
import org.apache.solr.core.SolrConfig;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -46,6 +49,17 @@ public class CloudStateUpdateTest extends SolrTestCaseJ4 {
|
|||
|
||||
private static final boolean VERBOSE = false;
|
||||
|
||||
private static final String URL1 = "http://localhost:3133/solr/core0";
|
||||
private static final String URL3 = "http://localhost:3133/solr/core1";
|
||||
private static final String URL2 = "http://localhost:3123/solr/core1";
|
||||
private static final String URL4 = "http://localhost:3123/solr/core4";
|
||||
private static final String SHARD4 = "localhost:3123_solr_core4";
|
||||
private static final String SHARD3 = "localhost:3123_solr_core3";
|
||||
private static final String SHARD2 = "localhost:3123_solr_core2";
|
||||
private static final String SHARD1 = "localhost:3123_solr_core1";
|
||||
|
||||
private static final int TIMEOUT = 10000;
|
||||
|
||||
protected ZkTestServer zkServer;
|
||||
|
||||
protected String zkDir;
|
||||
|
@ -123,6 +137,70 @@ public class CloudStateUpdateTest extends SolrTestCaseJ4 {
|
|||
log.info("####SETUP_END " + getName());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementalUpdate() throws Exception {
|
||||
System.setProperty("CLOUD_UPDATE_DELAY", "1");
|
||||
String zkDir = dataDir.getAbsolutePath() + File.separator
|
||||
+ "zookeeper/server1/data";
|
||||
ZkTestServer server = null;
|
||||
SolrZkClient zkClient = null;
|
||||
ZkController zkController = null;
|
||||
|
||||
server = new ZkTestServer(zkDir);
|
||||
server.run();
|
||||
try {
|
||||
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
|
||||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
|
||||
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
|
||||
String shardsPath1 = "/collections/collection1/shards/shardid1";
|
||||
String shardsPath2 = "/collections/collection1/shards/shardid2";
|
||||
zkClient.makePath(shardsPath1);
|
||||
zkClient.makePath(shardsPath2);
|
||||
|
||||
addShardToZk(zkClient, shardsPath1, SHARD1, URL1);
|
||||
addShardToZk(zkClient, shardsPath1, SHARD2, URL2);
|
||||
addShardToZk(zkClient, shardsPath2, SHARD3, URL3);
|
||||
|
||||
removeShardFromZk(server.getZkAddress(), zkClient, shardsPath1);
|
||||
|
||||
zkController = new ZkController(server.getZkAddress(), TIMEOUT, 1000,
|
||||
"localhost", "8983", "solr");
|
||||
|
||||
zkController.getZkStateReader().updateCloudState(true);
|
||||
CloudState cloudInfo = zkController.getCloudState();
|
||||
Map<String,Slice> slices = cloudInfo.getSlices("collection1");
|
||||
assertFalse(slices.containsKey("shardid1"));
|
||||
|
||||
zkClient.makePath(shardsPath1);
|
||||
addShardToZk(zkClient, shardsPath1, SHARD1, URL1);
|
||||
|
||||
zkController.getZkStateReader().updateCloudState(true);
|
||||
cloudInfo = zkController.getCloudState();
|
||||
slices = cloudInfo.getSlices("collection1");
|
||||
assertTrue(slices.containsKey("shardid1"));
|
||||
|
||||
updateUrl(zkClient, shardsPath1, SHARD1, "fake");
|
||||
|
||||
addShardToZk(zkClient, shardsPath2, SHARD4, URL4);
|
||||
|
||||
zkController.getZkStateReader().updateCloudState(true);
|
||||
cloudInfo = zkController.getCloudState();
|
||||
String url = cloudInfo.getSlices("collection1").get("shardid1").getShards().get(SHARD1).get("url");
|
||||
|
||||
// because of incremental update, we don't expect to find the new 'fake'
|
||||
// url - instead we should still
|
||||
// be using the original url - the correct way to update this would be to
|
||||
// remove the whole node and readd it
|
||||
assertEquals(URL1, url);
|
||||
|
||||
} finally {
|
||||
server.shutdown();
|
||||
zkClient.close();
|
||||
zkController.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoreRegistration() throws Exception {
|
||||
|
@ -237,6 +315,37 @@ public class CloudStateUpdateTest extends SolrTestCaseJ4 {
|
|||
SolrConfig.severeErrors.clear();
|
||||
}
|
||||
|
||||
private void addShardToZk(SolrZkClient zkClient, String shardsPath,
|
||||
String zkNodeName, String url) throws IOException,
|
||||
KeeperException, InterruptedException {
|
||||
|
||||
ZkNodeProps props = new ZkNodeProps();
|
||||
props.put(ZkStateReader.URL_PROP, url);
|
||||
props.put(ZkStateReader.NODE_NAME, zkNodeName);
|
||||
byte[] bytes = props.store();
|
||||
|
||||
zkClient
|
||||
.create(shardsPath + "/" + zkNodeName, bytes, CreateMode.PERSISTENT);
|
||||
}
|
||||
|
||||
private void updateUrl(SolrZkClient zkClient, String shardsPath,
|
||||
String zkNodeName, String url) throws IOException,
|
||||
KeeperException, InterruptedException {
|
||||
|
||||
ZkNodeProps props = new ZkNodeProps();
|
||||
props.put(ZkStateReader.URL_PROP, url);
|
||||
props.put(ZkStateReader.NODE_NAME, zkNodeName);
|
||||
byte[] bytes = props.store();
|
||||
|
||||
zkClient
|
||||
.setData(shardsPath + "/" + zkNodeName, bytes);
|
||||
}
|
||||
|
||||
private void removeShardFromZk(String zkHost, SolrZkClient zkClient, String shardsPath) throws Exception {
|
||||
|
||||
AbstractZkTestCase.tryCleanPath(zkHost, shardsPath);
|
||||
}
|
||||
|
||||
private void printLayout(String zkHost) throws Exception {
|
||||
SolrZkClient zkClient = new SolrZkClient(
|
||||
zkHost, AbstractZkTestCase.TIMEOUT);
|
||||
|
|
|
@ -84,8 +84,16 @@ public class CloudState {
|
|||
}
|
||||
Map<String,Slice> slices = new HashMap<String,Slice>();
|
||||
for (String shardIdZkPath : shardIdNames) {
|
||||
Slice oldSlice = null;
|
||||
if (oldCloudState.getCollectionStates().containsKey(collection)
|
||||
&& oldCloudState.getCollectionStates().get(collection)
|
||||
.containsKey(shardIdZkPath)) {
|
||||
oldSlice = oldCloudState.getCollectionStates().get(collection)
|
||||
.get(shardIdZkPath);
|
||||
}
|
||||
|
||||
Map<String,ZkNodeProps> shardsMap = readShards(zkClient, shardIdPaths
|
||||
+ "/" + shardIdZkPath);
|
||||
+ "/" + shardIdZkPath, oldSlice);
|
||||
Slice slice = new Slice(shardIdZkPath, shardsMap);
|
||||
slices.put(shardIdZkPath, slice);
|
||||
}
|
||||
|
@ -108,7 +116,7 @@ public class CloudState {
|
|||
* @throws InterruptedException
|
||||
* @throws IOException
|
||||
*/
|
||||
private static Map<String,ZkNodeProps> readShards(SolrZkClient zkClient, String shardsZkPath)
|
||||
private static Map<String,ZkNodeProps> readShards(SolrZkClient zkClient, String shardsZkPath, Slice oldSlice)
|
||||
throws KeeperException, InterruptedException, IOException {
|
||||
|
||||
Map<String,ZkNodeProps> shardNameToProps = new HashMap<String,ZkNodeProps>();
|
||||
|
@ -120,12 +128,18 @@ public class CloudState {
|
|||
|
||||
List<String> shardZkPaths = zkClient.getChildren(shardsZkPath, null);
|
||||
|
||||
for(String shardPath : shardZkPaths) {
|
||||
byte[] data = zkClient.getData(shardsZkPath + "/" + shardPath, null,
|
||||
null);
|
||||
for (String shardPath : shardZkPaths) {
|
||||
ZkNodeProps props;
|
||||
if (oldSlice != null && oldSlice.getShards().containsKey(shardPath)) {
|
||||
props = oldSlice.getShards().get(shardPath);
|
||||
} else {
|
||||
byte[] data = zkClient.getData(shardsZkPath + "/" + shardPath, null,
|
||||
null);
|
||||
|
||||
props = new ZkNodeProps();
|
||||
props.load(data);
|
||||
}
|
||||
|
||||
ZkNodeProps props = new ZkNodeProps();
|
||||
props.load(data);
|
||||
shardNameToProps.put(shardPath, props);
|
||||
}
|
||||
|
||||
|
|
|
@ -117,8 +117,6 @@ public class ZkStateReader {
|
|||
// load and publish a new CollectionInfo
|
||||
private synchronized void updateCloudState(boolean immediate, final boolean onlyLiveNodes) throws KeeperException, InterruptedException,
|
||||
IOException {
|
||||
|
||||
// TODO: - possibly: incremental update rather than reread everything
|
||||
|
||||
// build immutable CloudInfo
|
||||
|
||||
|
|
Loading…
Reference in New Issue