mirror of https://github.com/apache/lucene.git
SOLR-4414: Add 'state' to shards (default to 'active') and read/write them to ZooKeeper
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1447336 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
edba3cc309
commit
be3ee82ece
|
@ -178,6 +178,9 @@ Other Changes
|
|||
|
||||
* SOLR-4384: Make post.jar report timing information (Upayavira via janhoy)
|
||||
|
||||
* SOLR-4414: Add 'state' to shards (default to 'active') and read/write them to
|
||||
ZooKeeper (Anshum Gupta via shalin)
|
||||
|
||||
================== 4.1.0 ==================
|
||||
|
||||
Versions of Major Components
|
||||
|
|
|
@ -254,7 +254,7 @@ public class ClusterStateUpdateTest extends SolrTestCaseJ4 {
|
|||
System.clearProperty("solrcloud.update.delay");
|
||||
}
|
||||
|
||||
private void printLayout(String zkHost) throws Exception {
|
||||
static void printLayout(String zkHost) throws Exception {
|
||||
SolrZkClient zkClient = new SolrZkClient(
|
||||
zkHost, AbstractZkTestCase.TIMEOUT);
|
||||
zkClient.printLayoutToStdOut();
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
package org.apache.solr.cloud;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to You under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class SliceStateTest extends SolrTestCaseJ4 {
|
||||
@Test
|
||||
public void testDefaultSliceState() throws Exception {
|
||||
Map<String, DocCollection> collectionStates = new HashMap<String, DocCollection>();
|
||||
Set<String> liveNodes = new HashSet<String>();
|
||||
liveNodes.add("node1");
|
||||
|
||||
Map<String, Slice> slices = new HashMap<String, Slice>();
|
||||
Map<String, Replica> sliceToProps = new HashMap<String, Replica>();
|
||||
Map<String, Object> props = new HashMap<String, Object>();
|
||||
|
||||
Replica replica = new Replica("node1", props);
|
||||
sliceToProps.put("node1", replica);
|
||||
Slice slice = new Slice("shard1", sliceToProps, null);
|
||||
assertEquals("Default state not set to active", Slice.ACTIVE, slice.getState());
|
||||
slices.put("shard1", slice);
|
||||
collectionStates.put("collection1", new DocCollection("collection1", slices, null, DocRouter.DEFAULT));
|
||||
|
||||
ClusterState clusterState = new ClusterState(liveNodes, collectionStates);
|
||||
byte[] bytes = ZkStateReader.toJSON(clusterState);
|
||||
ClusterState loadedClusterState = ClusterState.load(null, bytes, liveNodes);
|
||||
|
||||
assertEquals("Default state not set to active", "active", loadedClusterState.getSlice("collection1", "shard1").getState());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,203 @@
|
|||
package org.apache.solr.cloud;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.ImplicitDocRouter;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.CoreContainer.Initializer;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@Slow
|
||||
public class SliceStateUpdateTest extends SolrTestCaseJ4 {
|
||||
protected static Logger log = LoggerFactory
|
||||
.getLogger(AbstractZkTestCase.class);
|
||||
|
||||
private static final boolean VERBOSE = false;
|
||||
|
||||
protected ZkTestServer zkServer;
|
||||
|
||||
protected String zkDir;
|
||||
|
||||
private CoreContainer container1;
|
||||
|
||||
private CoreContainer container2;
|
||||
|
||||
private File dataDir1;
|
||||
|
||||
private File dataDir2;
|
||||
|
||||
private File dataDir3;
|
||||
|
||||
private Initializer init2;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
System.setProperty("solrcloud.skip.autorecovery", "true");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws InterruptedException {
|
||||
System.clearProperty("solrcloud.skip.autorecovery");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
createTempDir();
|
||||
System.setProperty("zkClientTimeout", "3000");
|
||||
|
||||
zkDir = dataDir.getAbsolutePath() + File.separator
|
||||
+ "zookeeper/server1/data";
|
||||
zkServer = new ZkTestServer(zkDir);
|
||||
zkServer.run();
|
||||
System.setProperty("zkHost", zkServer.getZkAddress());
|
||||
AbstractZkTestCase.buildZooKeeper(zkServer.getZkHost(), zkServer
|
||||
.getZkAddress(), "solrconfig.xml", "schema.xml");
|
||||
|
||||
log.info("####SETUP_START " + getTestName());
|
||||
Map<String, Object> props2 = new HashMap<String, Object>();
|
||||
props2.put("configName", "conf1");
|
||||
|
||||
ZkNodeProps zkProps2 = new ZkNodeProps(props2);
|
||||
|
||||
SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(),
|
||||
AbstractZkTestCase.TIMEOUT);
|
||||
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/testcore",
|
||||
ZkStateReader.toJSON(zkProps2), CreateMode.PERSISTENT, true);
|
||||
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/testcore/shards",
|
||||
CreateMode.PERSISTENT, true);
|
||||
zkClient.close();
|
||||
|
||||
dataDir1 = new File(dataDir + File.separator + "data1");
|
||||
dataDir1.mkdirs();
|
||||
|
||||
dataDir2 = new File(dataDir + File.separator + "data2");
|
||||
dataDir2.mkdirs();
|
||||
|
||||
dataDir3 = new File(dataDir + File.separator + "data3");
|
||||
dataDir3.mkdirs();
|
||||
|
||||
// set some system properties for use by tests
|
||||
System.setProperty("solr.test.sys.prop1", "propone");
|
||||
System.setProperty("solr.test.sys.prop2", "proptwo");
|
||||
|
||||
System.setProperty("solr.solr.home", TEST_HOME());
|
||||
System.setProperty("hostPort", "1661");
|
||||
CoreContainer.Initializer init1 = new CoreContainer.Initializer();
|
||||
System.setProperty("solr.data.dir", SliceStateUpdateTest.this.dataDir1.getAbsolutePath());
|
||||
container1 = init1.initialize();
|
||||
|
||||
System.clearProperty("hostPort");
|
||||
|
||||
System.setProperty("hostPort", "1662");
|
||||
init2 = new CoreContainer.Initializer();
|
||||
System.setProperty("solr.data.dir", SliceStateUpdateTest.this.dataDir2.getAbsolutePath());
|
||||
container2 = init2.initialize();
|
||||
System.clearProperty("hostPort");
|
||||
|
||||
System.clearProperty("solr.solr.home");
|
||||
|
||||
log.info("####SETUP_END " + getTestName());
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSliceStateUpdate() throws Exception {
|
||||
System.setProperty("solrcloud.update.delay", "1");
|
||||
|
||||
/* Get ClusterState, update slice state and publish it to Zookeeper */
|
||||
|
||||
ClusterState clusterState = container1.getZkController().getClusterState();
|
||||
Map<String, DocCollection> collectionStates =
|
||||
new LinkedHashMap<String, DocCollection>(clusterState.getCollectionStates());
|
||||
|
||||
Map<String, Slice> slicesMap = clusterState.getSlicesMap("collection1");
|
||||
Map<String, Object> props = new HashMap<String, Object>(1);
|
||||
Slice slice = slicesMap.get("shard1");
|
||||
Map<String, Object> prop = slice.getProperties();
|
||||
prop.put("state", "inactive");
|
||||
Slice newSlice = new Slice(slice.getName(), slice.getReplicasMap(), prop);
|
||||
slicesMap.put(newSlice.getName(), newSlice);
|
||||
props.put(DocCollection.DOC_ROUTER, ImplicitDocRouter.NAME);
|
||||
|
||||
DocCollection coll = new DocCollection("collection1", slicesMap, props, DocRouter.DEFAULT);
|
||||
collectionStates.put("collection1", coll);
|
||||
SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(),
|
||||
AbstractZkTestCase.TIMEOUT);
|
||||
|
||||
ClusterState newState = new ClusterState(clusterState.getLiveNodes(), collectionStates);
|
||||
zkClient.setData(ZkStateReader.CLUSTER_STATE,
|
||||
ZkStateReader.toJSON(newState), true);
|
||||
zkClient.close();
|
||||
|
||||
/* Read state from another container and confirm the change */
|
||||
ZkController zkController2 = container2.getZkController();
|
||||
ClusterState clusterState2 = null;
|
||||
Map<String, Slice> slices = null;
|
||||
for (int i = 75; i > 0; i--) {
|
||||
clusterState2 = zkController2.getClusterState();
|
||||
slices = clusterState2.getAllSlicesMap("collection1");
|
||||
if (slices != null && slices.containsKey("shard1")
|
||||
&& slices.get("shard1").getState().equals("inactive")) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
assertNotNull(slices);
|
||||
|
||||
assertEquals("shard1", slices.get("shard1").getName());
|
||||
assertEquals("inactive", slices.get("shard1").getState());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
if (VERBOSE) {
|
||||
ClusterStateUpdateTest.printLayout(zkServer.getZkHost());
|
||||
}
|
||||
container1.shutdown();
|
||||
container2.shutdown();
|
||||
|
||||
zkServer.shutdown();
|
||||
super.tearDown();
|
||||
System.clearProperty("zkClientTimeout");
|
||||
System.clearProperty("zkHost");
|
||||
System.clearProperty("hostPort");
|
||||
System.clearProperty("solrcloud.update.delay");
|
||||
}
|
||||
}
|
|
@ -118,6 +118,12 @@ public class ClusterState implements JSONWriter.Writable {
|
|||
return coll.getSlicesMap();
|
||||
}
|
||||
|
||||
public Map<String, Slice> getAllSlicesMap(String collection) {
|
||||
DocCollection coll = collectionStates.get(collection);
|
||||
if (coll == null) return null;
|
||||
return coll.getAllSlicesMap();
|
||||
}
|
||||
|
||||
public Collection<Slice> getSlices(String collection) {
|
||||
DocCollection coll = collectionStates.get(collection);
|
||||
if (coll == null) return null;
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.noggit.JSONWriter;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -35,6 +36,7 @@ public class DocCollection extends ZkNodeProps {
|
|||
|
||||
private final String name;
|
||||
private final Map<String, Slice> slices;
|
||||
private final Map<String, Slice> allSlices;
|
||||
private final DocRouter router;
|
||||
|
||||
/**
|
||||
|
@ -45,7 +47,17 @@ public class DocCollection extends ZkNodeProps {
|
|||
public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
|
||||
super( props==null ? Collections.<String,Object>emptyMap() : props);
|
||||
this.name = name;
|
||||
this.slices = slices;
|
||||
|
||||
this.allSlices = slices;
|
||||
this.slices = new HashMap<String, Slice>();
|
||||
|
||||
Iterator<Map.Entry<String, Slice>> iter = slices.entrySet().iterator();
|
||||
|
||||
while (iter.hasNext()) {
|
||||
Map.Entry<String, Slice> slice = iter.next();
|
||||
if (slice.getValue().getState().equals(Slice.ACTIVE))
|
||||
this.slices.put(slice.getKey(), slice.getValue());
|
||||
}
|
||||
this.router = router;
|
||||
|
||||
assert name != null && slices != null;
|
||||
|
@ -60,23 +72,38 @@ public class DocCollection extends ZkNodeProps {
|
|||
}
|
||||
|
||||
public Slice getSlice(String sliceName) {
|
||||
return slices.get(sliceName);
|
||||
return allSlices.get(sliceName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the list of slices for this collection.
|
||||
* Gets the list of active slices for this collection.
|
||||
*/
|
||||
public Collection<Slice> getSlices() {
|
||||
return slices.values();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the map of slices (sliceName->Slice) for this collection.
|
||||
* Return the list of all slices for this collection.
|
||||
*/
|
||||
public Collection<Slice> getAllSlices() {
|
||||
return allSlices.values();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the map of active slices (sliceName->Slice) for this collection.
|
||||
*/
|
||||
public Map<String, Slice> getSlicesMap() {
|
||||
return slices;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the map of all slices (sliceName->Slice) for this collection.
|
||||
*/
|
||||
public Map<String, Slice> getAllSlicesMap() {
|
||||
return allSlices;
|
||||
}
|
||||
|
||||
public DocRouter getRouter() {
|
||||
return router;
|
||||
}
|
||||
|
@ -88,9 +115,9 @@ public class DocCollection extends ZkNodeProps {
|
|||
|
||||
@Override
|
||||
public void write(JSONWriter jsonWriter) {
|
||||
LinkedHashMap<String,Object> all = new LinkedHashMap<String,Object>(slices.size()+1);
|
||||
LinkedHashMap<String, Object> all = new LinkedHashMap<String, Object>(allSlices.size() + 1);
|
||||
all.putAll(propMap);
|
||||
all.put(SHARDS, slices);
|
||||
all.put(SHARDS, allSlices);
|
||||
jsonWriter.write(all);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,13 +31,16 @@ import java.util.Map;
|
|||
public class Slice extends ZkNodeProps {
|
||||
public static String REPLICAS = "replicas";
|
||||
public static String RANGE = "range";
|
||||
public static String STATE = "state";
|
||||
public static String LEADER = "leader"; // FUTURE: do we want to record the leader as a slice property in the JSON (as opposed to isLeader as a replica property?)
|
||||
public static String ACTIVE = "active";
|
||||
|
||||
private final String name;
|
||||
private final DocRouter.Range range;
|
||||
private final Integer replicationFactor; // FUTURE: optional per-slice override of the collection replicationFactor
|
||||
private final Map<String,Replica> replicas;
|
||||
private final Replica leader;
|
||||
private final String state;
|
||||
|
||||
/**
|
||||
* @param name The name of the slice
|
||||
|
@ -49,6 +52,12 @@ public class Slice extends ZkNodeProps {
|
|||
this.name = name;
|
||||
|
||||
Object rangeObj = propMap.get(RANGE);
|
||||
if (propMap.containsKey(STATE))
|
||||
state = (String) propMap.get(STATE);
|
||||
else {
|
||||
state = ACTIVE; //Default to ACTIVE
|
||||
propMap.put(STATE, this.state);
|
||||
}
|
||||
DocRouter.Range tmpRange = null;
|
||||
if (rangeObj instanceof DocRouter.Range) {
|
||||
tmpRange = (DocRouter.Range)rangeObj;
|
||||
|
@ -135,6 +144,10 @@ public class Slice extends ZkNodeProps {
|
|||
return range;
|
||||
}
|
||||
|
||||
public String getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return name + ':' + JSONUtil.toJSON(propMap);
|
||||
|
|
Loading…
Reference in New Issue