SOLR-6554: Speed up overseer operations

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1642437 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2014-11-29 15:53:11 +00:00
parent 9d35397ad5
commit 2b0ecdbf47
21 changed files with 1654 additions and 891 deletions

View File

@ -315,6 +315,11 @@ Optimizations
* SOLR-6603: LBHttpSolrServer - lazily allocate skipped-zombie-servers list.
(Christine Poerschke via shalin)
* SOLR-6554: Speed up overseer operations avoiding cluster state reads from
zookeeper at the start of each loop and instead relying on local state and
compare-and-set writes. This change also adds batching for consecutive messages
belonging to the same collection with stateFormat=2. (shalin)
Other Changes
----------------------

View File

@ -1,6 +1,7 @@
package org.apache.solr.cloud;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Replica;
@ -148,7 +149,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
assert shardId != null;
ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION,
Overseer.OverseerAction.LEADER.toLower(), ZkStateReader.SHARD_ID_PROP, shardId,
OverseerAction.LEADER.toLower(), ZkStateReader.SHARD_ID_PROP, shardId,
ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.BASE_URL_PROP,
leaderProps.getProperties().get(ZkStateReader.BASE_URL_PROP),
ZkStateReader.CORE_NAME_PROP,
@ -201,7 +202,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP);
// clear the leader in clusterstate
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.LEADER.toLower(),
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP,
collection);
Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));

File diff suppressed because it is too large Load Diff

View File

@ -68,6 +68,8 @@ import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.Assign.Node;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.cloud.Overseer.LeaderStatus;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases;
@ -467,7 +469,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
}
//now ask the current leader to QUIT , so that the designate can takeover
Overseer.getInQueue(zkStateReader.getZkClient()).offer(
ZkStateReader.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.QUIT.toLower(),
ZkStateReader.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(),
"id",getLeaderId(zkStateReader.getZkClient()))));
}
@ -698,7 +700,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
SolrZkClient zkClient = zkStateReader.getZkClient();
DistributedQueue inQueue = Overseer.getInQueue(zkClient);
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.LEADER.toLower());
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower());
propMap.put(COLLECTION_PROP, collectionName);
propMap.put(SHARD_ID_PROP, shardId);
propMap.put(BASE_URL_PROP, baseURL);
@ -1148,7 +1150,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
private void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws KeeperException, InterruptedException {
ZkNodeProps m = new ZkNodeProps(
Overseer.QUEUE_OPERATION, Overseer.OverseerAction.DELETECORE.toLower(),
Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
ZkStateReader.CORE_NAME_PROP, core,
ZkStateReader.NODE_NAME_PROP, replica.getStr(ZkStateReader.NODE_NAME_PROP),
ZkStateReader.COLLECTION_PROP, collectionName,
@ -1760,7 +1762,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
log.info("Replication factor is 1 so switching shard states");
DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.UPDATESHARDSTATE.toLower());
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
propMap.put(slice, Slice.INACTIVE);
for (String subSlice : subSlices) {
propMap.put(subSlice, Slice.ACTIVE);
@ -1772,7 +1774,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
log.info("Requesting shard state be set to 'recovery'");
DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.UPDATESHARDSTATE.toLower());
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
for (String subSlice : subSlices) {
propMap.put(subSlice, Slice.RECOVERY);
}
@ -2062,7 +2064,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
completeAsyncRequest(asyncId, requestMap, results);
ZkNodeProps m = new ZkNodeProps(
Overseer.QUEUE_OPERATION, Overseer.OverseerAction.ADDROUTINGRULE.toLower(),
Overseer.QUEUE_OPERATION, OverseerAction.ADDROUTINGRULE.toLower(),
COLLECTION_PROP, sourceCollection.getName(),
SHARD_ID_PROP, sourceSlice.getName(),
"routeKey", SolrIndexSplitter.getRouteKey(splitKey) + "!",
@ -2315,10 +2317,10 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
List<String> shardNames = new ArrayList<>();
if(ImplicitDocRouter.NAME.equals(router)){
Overseer.getShardNames(shardNames, message.getStr("shards",null));
ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null));
numSlices = shardNames.size();
} else {
Overseer.getShardNames(numSlices,shardNames);
ClusterStateMutator.getShardNames(numSlices, shardNames);
}
if (numSlices == null ) {

View File

@ -46,6 +46,8 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.BeforeReconnect;
@ -838,7 +840,7 @@ public final class ZkController {
boolean joinAtHead = false;
Replica replica = zkStateReader.getClusterState().getReplica(desc.getCloudDescriptor().getCollectionName(), coreZkNodeName);
if (replica != null) {
joinAtHead = replica.getBool(Overseer.preferredLeaderProp, false);
joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
}
joinElection(desc, afterExpiration, joinAtHead);
} catch (InterruptedException e) {
@ -1214,7 +1216,7 @@ public final class ZkController {
}
if (removeWatch) zkStateReader.removeZKWatch(collection);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
Overseer.OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, coreName,
OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.NODE_NAME_PROP, getNodeName(),
ZkStateReader.COLLECTION_PROP, cloudDescriptor.getCollectionName(),
ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);

View File

@ -0,0 +1,188 @@
package org.apache.solr.cloud.overseer;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.solr.cloud.OverseerCollectionProcessor;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Collections.singletonMap;
public class ClusterStateMutator {
private static Logger log = LoggerFactory.getLogger(ClusterStateMutator.class);
protected final ZkStateReader zkStateReader;
public ClusterStateMutator(ZkStateReader zkStateReader) {
this.zkStateReader = zkStateReader;
}
public ZkWriteCommand createCollection(ClusterState clusterState, ZkNodeProps message) {
String cName = message.getStr("name");
log.info("building a new cName: " + cName);
if (clusterState.hasCollection(cName)) {
log.warn("Collection {} already exists. exit", cName);
return ZkStateWriter.NO_OP;
}
ArrayList<String> shards = new ArrayList<>();
if (ImplicitDocRouter.NAME.equals(message.getStr("router.name", DocRouter.DEFAULT_NAME))) {
getShardNames(shards, message.getStr("shards", DocRouter.DEFAULT_NAME));
} else {
int numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, -1);
if (numShards < 1)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "numShards is a required parameter for 'compositeId' router");
getShardNames(numShards, shards);
}
Map<String, Object> routerSpec = DocRouter.getRouterSpec(message);
String routerName = routerSpec.get("name") == null ? DocRouter.DEFAULT_NAME : (String) routerSpec.get("name");
DocRouter router = DocRouter.getDocRouter(routerName);
List<DocRouter.Range> ranges = router.partitionRange(shards.size(), router.fullRange());
Map<String, Slice> newSlices = new LinkedHashMap<>();
for (int i = 0; i < shards.size(); i++) {
String sliceName = shards.get(i);
Map<String, Object> sliceProps = new LinkedHashMap<>(1);
sliceProps.put(Slice.RANGE, ranges == null ? null : ranges.get(i));
newSlices.put(sliceName, new Slice(sliceName, null, sliceProps));
}
Map<String, Object> collectionProps = new HashMap<>();
for (Map.Entry<String, Object> e : OverseerCollectionProcessor.COLL_PROPS.entrySet()) {
Object val = message.get(e.getKey());
if (val == null) {
val = OverseerCollectionProcessor.COLL_PROPS.get(e.getKey());
}
if (val != null) collectionProps.put(e.getKey(), val);
}
collectionProps.put(DocCollection.DOC_ROUTER, routerSpec);
if (message.getStr("fromApi") == null) {
collectionProps.put("autoCreated", "true");
}
String znode = message.getInt(DocCollection.STATE_FORMAT, 1) == 1 ? null
: ZkStateReader.getCollectionPath(cName);
DocCollection newCollection = new DocCollection(cName,
newSlices, collectionProps, router, -1, znode);
return new ZkWriteCommand(cName, newCollection);
}
public ZkWriteCommand deleteCollection(ClusterState clusterState, ZkNodeProps message) {
final String collection = message.getStr("name");
if (!CollectionMutator.checkKeyExistence(message, "name")) return ZkStateWriter.NO_OP;
DocCollection coll = clusterState.getCollectionOrNull(collection);
if (coll == null) return ZkStateWriter.NO_OP;
return new ZkWriteCommand(coll.getName(), null);
}
public static ClusterState newState(ClusterState state, String name, DocCollection collection) {
ClusterState newClusterState = null;
if (collection == null) {
newClusterState = state.copyWith(singletonMap(name, (DocCollection) null));
} else {
newClusterState = state.copyWith(singletonMap(name, collection));
}
return newClusterState;
}
public static void getShardNames(Integer numShards, List<String> shardNames) {
if (numShards == null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "numShards" + " is a required param");
for (int i = 0; i < numShards; i++) {
final String sliceName = "shard" + (i + 1);
shardNames.add(sliceName);
}
}
public static void getShardNames(List<String> shardNames, String shards) {
if (shards == null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "shards" + " is a required param");
for (String s : shards.split(",")) {
if (s == null || s.trim().isEmpty()) continue;
shardNames.add(s.trim());
}
if (shardNames.isEmpty())
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "shards" + " is a required param");
}
/*
* Return an already assigned id or null if not assigned
*/
public static String getAssignedId(final ClusterState state, final String nodeName,
final ZkNodeProps coreState) {
Collection<Slice> slices = state.getSlices(coreState.getStr(ZkStateReader.COLLECTION_PROP));
if (slices != null) {
for (Slice slice : slices) {
if (slice.getReplicasMap().get(nodeName) != null) {
return slice.getName();
}
}
}
return null;
}
public static String getAssignedCoreNodeName(ClusterState state, ZkNodeProps message) {
Collection<Slice> slices = state.getSlices(message.getStr(ZkStateReader.COLLECTION_PROP));
if (slices != null) {
for (Slice slice : slices) {
for (Replica replica : slice.getReplicas()) {
String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
String msgNodeName = message.getStr(ZkStateReader.NODE_NAME_PROP);
String msgCore = message.getStr(ZkStateReader.CORE_NAME_PROP);
if (nodeName.equals(msgNodeName) && core.equals(msgCore)) {
return replica.getName();
}
}
}
}
return null;
}
}

View File

@ -0,0 +1,119 @@
package org.apache.solr.cloud.overseer;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CollectionMutator {
private static Logger log = LoggerFactory.getLogger(CollectionMutator.class);
protected final ZkStateReader zkStateReader;
public CollectionMutator(ZkStateReader zkStateReader) {
this.zkStateReader = zkStateReader;
}
public ZkWriteCommand createShard(final ClusterState clusterState, ZkNodeProps message) {
String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP;
String shardId = message.getStr(ZkStateReader.SHARD_ID_PROP);
Slice slice = clusterState.getSlice(collectionName, shardId);
if (slice == null) {
Map<String, Replica> replicas = Collections.EMPTY_MAP;
Map<String, Object> sliceProps = new HashMap<>();
String shardRange = message.getStr(ZkStateReader.SHARD_RANGE_PROP);
String shardState = message.getStr(ZkStateReader.SHARD_STATE_PROP);
String shardParent = message.getStr(ZkStateReader.SHARD_PARENT_PROP);
sliceProps.put(Slice.RANGE, shardRange);
sliceProps.put(Slice.STATE, shardState);
if (shardParent != null) {
sliceProps.put(Slice.PARENT, shardParent);
}
DocCollection collection = updateSlice(collectionName,
clusterState.getCollection(collectionName), new Slice(shardId, replicas, sliceProps));
return new ZkWriteCommand(collectionName, collection);
} else {
log.error("Unable to create Shard: " + shardId + " because it already exists in collection: " + collectionName);
return ZkStateWriter.NO_OP;
}
}
public ZkWriteCommand deleteShard(final ClusterState clusterState, ZkNodeProps message) {
final String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP;
log.info("Removing collection: " + collection + " shard: " + sliceId + " from clusterstate");
DocCollection coll = clusterState.getCollection(collection);
Map<String, Slice> newSlices = new LinkedHashMap<>(coll.getSlicesMap());
newSlices.remove(sliceId);
DocCollection newCollection = coll.copyWithSlices(newSlices);
return new ZkWriteCommand(collection, newCollection);
}
public static DocCollection updateSlice(String collectionName, DocCollection collection, Slice slice) {
DocCollection newCollection = null;
Map<String, Slice> slices;
if (collection == null) {
// when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself
// without explicitly creating a collection. In this current case, we assume custom sharding with an "implicit" router.
slices = new LinkedHashMap<>(1);
slices.put(slice.getName(), slice);
Map<String, Object> props = new HashMap<>(1);
props.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name", ImplicitDocRouter.NAME));
newCollection = new DocCollection(collectionName, slices, props, new ImplicitDocRouter());
} else {
slices = new LinkedHashMap<>(collection.getSlicesMap()); // make a shallow copy
slices.put(slice.getName(), slice);
newCollection = collection.copyWithSlices(slices);
}
return newCollection;
}
static boolean checkCollectionKeyExistence(ZkNodeProps message) {
return checkKeyExistence(message, ZkStateReader.COLLECTION_PROP);
}
static boolean checkKeyExistence(ZkNodeProps message, String key) {
String value = message.getStr(key);
if (value == null || value.trim().length() == 0) {
log.error("Skipping invalid Overseer message because it has no " + key + " specified: " + message);
return false;
}
return true;
}
}

View File

@ -0,0 +1,57 @@
package org.apache.solr.cloud.overseer;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Locale;
import org.apache.solr.cloud.Overseer;
/**
* Enum of actions supported by the overseer only.
*
* There are other actions supported which are public and defined
* in {@link org.apache.solr.common.params.CollectionParams.CollectionAction}
*/
public enum OverseerAction {
LEADER,
DELETECORE,
ADDROUTINGRULE,
REMOVEROUTINGRULE,
UPDATESHARDSTATE,
STATE,
QUIT;
public static OverseerAction get(String p) {
if (p != null) {
try {
return OverseerAction.valueOf(p.toUpperCase(Locale.ROOT));
} catch (Exception ex) {
}
}
return null;
}
public boolean isEqual(String s) {
return s != null && toString().equals(s.toUpperCase(Locale.ROOT));
}
public String toLower() {
return toString().toLowerCase(Locale.ROOT);
}
}

View File

@ -0,0 +1,422 @@
package org.apache.solr.cloud.overseer;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.cloud.Assign;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerCollectionProcessor;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX;
import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
import static org.apache.solr.cloud.overseer.CollectionMutator.checkKeyExistence;
public class ReplicaMutator {
private static Logger log = LoggerFactory.getLogger(ReplicaMutator.class);
protected final ZkStateReader zkStateReader;
public ReplicaMutator(ZkStateReader reader) {
this.zkStateReader = reader;
}
protected Replica setProperty(Replica replica, String key, String value) {
assert key != null;
assert value != null;
if (StringUtils.equalsIgnoreCase(replica.getStr(key), value))
return replica; // already the value we're going to set
Map<String, Object> replicaProps = new LinkedHashMap<>(replica.getProperties());
replicaProps.put(key, value);
return new Replica(replica.getName(), replicaProps);
}
protected Replica unsetProperty(Replica replica, String key) {
assert key != null;
if (!replica.containsKey(key)) return replica;
Map<String, Object> replicaProps = new LinkedHashMap<>(replica.getProperties());
replicaProps.remove(key);
return new Replica(replica.getName(), replicaProps);
}
protected Replica setLeader(Replica replica) {
return setProperty(replica, ZkStateReader.LEADER_PROP, "true");
}
protected Replica unsetLeader(Replica replica) {
return unsetProperty(replica, ZkStateReader.LEADER_PROP);
}
protected Replica setState(Replica replica, String state) {
assert state != null;
return setProperty(replica, ZkStateReader.STATE_PROP, state);
}
public ZkWriteCommand addReplicaProperty(ClusterState clusterState, ZkNodeProps message) {
if (checkKeyExistence(message, ZkStateReader.COLLECTION_PROP) == false ||
checkKeyExistence(message, ZkStateReader.SHARD_ID_PROP) == false ||
checkKeyExistence(message, ZkStateReader.REPLICA_PROP) == false ||
checkKeyExistence(message, ZkStateReader.PROPERTY_PROP) == false ||
checkKeyExistence(message, ZkStateReader.PROPERTY_VALUE_PROP) == false) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Overseer SETREPLICAPROPERTY requires " +
ZkStateReader.COLLECTION_PROP + " and " + ZkStateReader.SHARD_ID_PROP + " and " +
ZkStateReader.REPLICA_PROP + " and " + ZkStateReader.PROPERTY_PROP + " and " +
ZkStateReader.PROPERTY_VALUE_PROP + " no action taken.");
}
String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
String replicaName = message.getStr(ZkStateReader.REPLICA_PROP);
String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT);
if (StringUtils.startsWith(property, COLL_PROP_PREFIX) == false) {
property = OverseerCollectionProcessor.COLL_PROP_PREFIX + property;
}
property = property.toLowerCase(Locale.ROOT);
String propVal = message.getStr(ZkStateReader.PROPERTY_VALUE_PROP);
String shardUnique = message.getStr(OverseerCollectionProcessor.SHARD_UNIQUE);
boolean isUnique = false;
if (SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(property)) {
if (StringUtils.isNotBlank(shardUnique) && Boolean.parseBoolean(shardUnique) == false) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Overseer SETREPLICAPROPERTY for " +
property + " cannot have " + OverseerCollectionProcessor.SHARD_UNIQUE + " set to anything other than" +
"'true'. No action taken");
}
isUnique = true;
} else {
isUnique = Boolean.parseBoolean(shardUnique);
}
Replica replica = clusterState.getReplica(collectionName, replicaName);
if (replica == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection/slice/replica " +
collectionName + "/" + sliceName + "/" + replicaName + " no action taken.");
}
log.info("Setting property " + property + " with value: " + propVal +
" for collection: " + collectionName + ". Full message: " + message);
if (StringUtils.equalsIgnoreCase(replica.getStr(property), propVal)) return ZkStateWriter.NO_OP; // already the value we're going to set
// OK, there's no way we won't change the cluster state now
Map<String,Replica> replicas = clusterState.getSlice(collectionName, sliceName).getReplicasCopy();
if (isUnique == false) {
replicas.get(replicaName).getProperties().put(property, propVal);
} else { // Set prop for this replica, but remove it for all others.
for (Replica rep : replicas.values()) {
if (rep.getName().equalsIgnoreCase(replicaName)) {
rep.getProperties().put(property, propVal);
} else {
rep.getProperties().remove(property);
}
}
}
Slice newSlice = new Slice(sliceName, replicas, clusterState.getSlice(collectionName, sliceName).shallowCopy());
DocCollection newCollection = CollectionMutator.updateSlice(collectionName, clusterState.getCollection(collectionName),
newSlice);
return new ZkWriteCommand(collectionName, newCollection);
}
public ZkWriteCommand removeReplicaProperty(ClusterState clusterState, ZkNodeProps message) {
if (checkKeyExistence(message, ZkStateReader.COLLECTION_PROP) == false ||
checkKeyExistence(message, ZkStateReader.SHARD_ID_PROP) == false ||
checkKeyExistence(message, ZkStateReader.REPLICA_PROP) == false ||
checkKeyExistence(message, ZkStateReader.PROPERTY_PROP) == false) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Overseer DELETEREPLICAPROPERTY requires " +
ZkStateReader.COLLECTION_PROP + " and " + ZkStateReader.SHARD_ID_PROP + " and " +
ZkStateReader.REPLICA_PROP + " and " + ZkStateReader.PROPERTY_PROP + " no action taken.");
}
String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
String replicaName = message.getStr(ZkStateReader.REPLICA_PROP);
String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT);
if (StringUtils.startsWith(property, COLL_PROP_PREFIX) == false) {
property = OverseerCollectionProcessor.COLL_PROP_PREFIX + property;
}
Replica replica = clusterState.getReplica(collectionName, replicaName);
if (replica == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection/slice/replica " +
collectionName + "/" + sliceName + "/" + replicaName + " no action taken.");
}
log.info("Deleting property " + property + " for collection: " + collectionName +
" slice " + sliceName + " replica " + replicaName + ". Full message: " + message);
String curProp = replica.getStr(property);
if (curProp == null) return ZkStateWriter.NO_OP; // not there anyway, nothing to do.
log.info("Deleting property " + property + " for collection: " + collectionName +
" slice " + sliceName + " replica " + replicaName + ". Full message: " + message);
DocCollection collection = clusterState.getCollection(collectionName);
Slice slice = collection.getSlice(sliceName);
DocCollection newCollection = SliceMutator.updateReplica(collection,
slice, replicaName, unsetProperty(replica, property));
return new ZkWriteCommand(collectionName, newCollection);
}
public ZkWriteCommand setState(ClusterState clusterState, ZkNodeProps message) {
if (Overseer.isLegacy(zkStateReader.getClusterProps())) {
return updateState(clusterState, message);
} else {
return updateStateNew(clusterState, message);
}
}
protected ZkWriteCommand updateState(final ClusterState prevState, ZkNodeProps message) {
final String cName = message.getStr(ZkStateReader.COLLECTION_PROP);
if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP;
Integer numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, null);
log.info("Update state numShards={} message={}", numShards, message);
List<String> shardNames = new ArrayList<>();
ZkWriteCommand writeCommand = null;
ClusterState newState = null;
//collection does not yet exist, create placeholders if num shards is specified
boolean collectionExists = prevState.hasCollection(cName);
if (!collectionExists && numShards != null) {
ClusterStateMutator.getShardNames(numShards, shardNames);
Map<String, Object> createMsg = ZkNodeProps.makeMap("name", cName);
createMsg.putAll(message.getProperties());
writeCommand = new ClusterStateMutator(zkStateReader).createCollection(prevState, new ZkNodeProps(createMsg));
DocCollection collection = writeCommand.collection;
newState = ClusterStateMutator.newState(prevState, cName, collection);
}
return updateState(newState != null ? newState : prevState,
message, cName, numShards, collectionExists);
}
private ZkWriteCommand updateState(final ClusterState prevState, ZkNodeProps message, String collectionName, Integer numShards, boolean collectionExists) {
String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
String coreNodeName = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
if (coreNodeName == null) {
coreNodeName = ClusterStateMutator.getAssignedCoreNodeName(prevState, message);
if (coreNodeName != null) {
log.info("node=" + coreNodeName + " is already registered");
} else {
// if coreNodeName is null, auto assign one
coreNodeName = Assign.assignNode(collectionName, prevState);
}
message.getProperties().put(ZkStateReader.CORE_NODE_NAME_PROP,
coreNodeName);
}
// use the provided non null shardId
if (sliceName == null) {
//get shardId from ClusterState
sliceName = ClusterStateMutator.getAssignedId(prevState, coreNodeName, message);
if (sliceName != null) {
log.info("shard=" + sliceName + " is already registered");
}
}
if (sliceName == null) {
//request new shardId
if (collectionExists) {
// use existing numShards
numShards = prevState.getCollection(collectionName).getSlices().size();
log.info("Collection already exists with " + ZkStateReader.NUM_SHARDS_PROP + "=" + numShards);
}
sliceName = Assign.assignShard(collectionName, prevState, numShards);
log.info("Assigning new node to shard shard=" + sliceName);
}
Slice slice = prevState.getSlice(collectionName, sliceName);
Map<String, Object> replicaProps = new LinkedHashMap<>();
replicaProps.putAll(message.getProperties());
if (slice != null) {
Replica oldReplica = slice.getReplicasMap().get(coreNodeName);
if (oldReplica != null) {
if (oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
}
// Move custom props over.
for (Map.Entry<String, Object> ent : oldReplica.getProperties().entrySet()) {
if (ent.getKey().startsWith(COLL_PROP_PREFIX)) {
replicaProps.put(ent.getKey(), ent.getValue());
}
}
}
}
// we don't put these in the clusterstate
replicaProps.remove(ZkStateReader.NUM_SHARDS_PROP);
replicaProps.remove(ZkStateReader.CORE_NODE_NAME_PROP);
replicaProps.remove(ZkStateReader.SHARD_ID_PROP);
replicaProps.remove(ZkStateReader.COLLECTION_PROP);
replicaProps.remove(Overseer.QUEUE_OPERATION);
// remove any props with null values
Set<Map.Entry<String, Object>> entrySet = replicaProps.entrySet();
List<String> removeKeys = new ArrayList<>();
for (Map.Entry<String, Object> entry : entrySet) {
if (entry.getValue() == null) {
removeKeys.add(entry.getKey());
}
}
for (String removeKey : removeKeys) {
replicaProps.remove(removeKey);
}
replicaProps.remove(ZkStateReader.CORE_NODE_NAME_PROP);
// remove shard specific properties
String shardRange = (String) replicaProps.remove(ZkStateReader.SHARD_RANGE_PROP);
String shardState = (String) replicaProps.remove(ZkStateReader.SHARD_STATE_PROP);
String shardParent = (String) replicaProps.remove(ZkStateReader.SHARD_PARENT_PROP);
Replica replica = new Replica(coreNodeName, replicaProps);
Map<String, Object> sliceProps = null;
Map<String, Replica> replicas;
DocCollection collection = prevState.getCollectionOrNull(collectionName);
if (slice != null) {
collection = prevState.getCollection(collectionName);
collection = checkAndCompleteShardSplit(prevState, collection, coreNodeName, sliceName, replicaProps);
// get the current slice again because it may have been updated due to checkAndCompleteShardSplit method
slice = collection.getSlice(sliceName);
sliceProps = slice.getProperties();
replicas = slice.getReplicasCopy();
} else {
replicas = new HashMap<>(1);
sliceProps = new HashMap<>();
sliceProps.put(Slice.RANGE, shardRange);
sliceProps.put(Slice.STATE, shardState);
sliceProps.put(Slice.PARENT, shardParent);
}
replicas.put(replica.getName(), replica);
slice = new Slice(sliceName, replicas, sliceProps);
DocCollection newCollection = CollectionMutator.updateSlice(collectionName, collection, slice);
return new ZkWriteCommand(collectionName, newCollection);
}
/**
* Handles non-legacy state updates
*/
protected ZkWriteCommand updateStateNew(ClusterState clusterState, final ZkNodeProps message) {
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP;
String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
if (collection == null || sliceName == null) {
log.error("Invalid collection and slice {}", message);
return ZkStateWriter.NO_OP;
}
Slice slice = clusterState.getSlice(collection, sliceName);
if (slice == null) {
log.error("No such slice exists {}", message);
return ZkStateWriter.NO_OP;
}
return updateState(clusterState, message);
}
private DocCollection checkAndCompleteShardSplit(ClusterState prevState, DocCollection collection, String coreNodeName, String sliceName, Map<String, Object> replicaProps) {
Slice slice = collection.getSlice(sliceName);
Map<String, Object> sliceProps = slice.getProperties();
String sliceState = slice.getState();
if (Slice.RECOVERY.equals(sliceState)) {
log.info("Shard: {} is in recovery state", sliceName);
// is this replica active?
if (ZkStateReader.ACTIVE.equals(replicaProps.get(ZkStateReader.STATE_PROP))) {
log.info("Shard: {} is in recovery state and coreNodeName: {} is active", sliceName, coreNodeName);
// are all other replicas also active?
boolean allActive = true;
for (Map.Entry<String, Replica> entry : slice.getReplicasMap().entrySet()) {
if (coreNodeName.equals(entry.getKey())) continue;
if (!Slice.ACTIVE.equals(entry.getValue().getStr(Slice.STATE))) {
allActive = false;
break;
}
}
if (allActive) {
log.info("Shard: {} - all replicas are active. Finding status of fellow sub-shards", sliceName);
// find out about other sub shards
Map<String, Slice> allSlicesCopy = new HashMap<>(collection.getSlicesMap());
List<Slice> subShardSlices = new ArrayList<>();
outer:
for (Map.Entry<String, Slice> entry : allSlicesCopy.entrySet()) {
if (sliceName.equals(entry.getKey()))
continue;
Slice otherSlice = entry.getValue();
if (Slice.RECOVERY.equals(otherSlice.getState())) {
if (slice.getParent() != null && slice.getParent().equals(otherSlice.getParent())) {
log.info("Shard: {} - Fellow sub-shard: {} found", sliceName, otherSlice.getName());
// this is a fellow sub shard so check if all replicas are active
for (Map.Entry<String, Replica> sliceEntry : otherSlice.getReplicasMap().entrySet()) {
if (!ZkStateReader.ACTIVE.equals(sliceEntry.getValue().getStr(ZkStateReader.STATE_PROP))) {
allActive = false;
break outer;
}
}
log.info("Shard: {} - Fellow sub-shard: {} has all replicas active", sliceName, otherSlice.getName());
subShardSlices.add(otherSlice);
}
}
}
if (allActive) {
// hurray, all sub shard replicas are active
log.info("Shard: {} - All replicas across all fellow sub-shards are now ACTIVE. Preparing to switch shard states.", sliceName);
String parentSliceName = (String) sliceProps.remove(Slice.PARENT);
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
propMap.put(parentSliceName, Slice.INACTIVE);
propMap.put(sliceName, Slice.ACTIVE);
for (Slice subShardSlice : subShardSlices) {
propMap.put(subShardSlice.getName(), Slice.ACTIVE);
}
propMap.put(ZkStateReader.COLLECTION_PROP, collection.getName());
ZkNodeProps m = new ZkNodeProps(propMap);
return new SliceMutator(zkStateReader).updateShardState(prevState, m).collection;
}
}
}
}
return collection;
}
}

View File

@ -0,0 +1,281 @@
package org.apache.solr.cloud.overseer;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import com.google.common.collect.ImmutableSet;
import org.apache.solr.cloud.Assign;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.RoutingRule;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX;
import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
public class SliceMutator {
private static Logger log = LoggerFactory.getLogger(SliceMutator.class);
public static final String PREFERRED_LEADER_PROP = COLL_PROP_PREFIX + "preferredleader";
public static final Set<String> SLICE_UNIQUE_BOOLEAN_PROPERTIES = ImmutableSet.of(PREFERRED_LEADER_PROP);
protected final ZkStateReader zkStateReader;
public SliceMutator(ZkStateReader zkStateReader) {
this.zkStateReader = zkStateReader;
}
public ZkWriteCommand addReplica(ClusterState clusterState, ZkNodeProps message) {
log.info("createReplica() {} ", message);
String coll = message.getStr(ZkStateReader.COLLECTION_PROP);
if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP;
String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
DocCollection collection = clusterState.getCollection(coll);
Slice sl = collection.getSlice(slice);
if (sl == null) {
log.error("Invalid Collection/Slice {}/{} ", coll, slice);
return ZkStateWriter.NO_OP;
}
String coreNodeName = Assign.assignNode(coll, clusterState);
Replica replica = new Replica(coreNodeName,
makeMap(
ZkStateReader.CORE_NAME_PROP, message.getStr(ZkStateReader.CORE_NAME_PROP),
ZkStateReader.BASE_URL_PROP, message.getStr(ZkStateReader.BASE_URL_PROP),
ZkStateReader.STATE_PROP, message.getStr(ZkStateReader.STATE_PROP)));
return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica));
}
public ZkWriteCommand removeReplica(ClusterState clusterState, ZkNodeProps message) {
final String cnn = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP;
DocCollection coll = clusterState.getCollectionOrNull(collection);
if (coll == null) {
// make sure we delete the zk nodes for this collection just to be safe
return new ZkWriteCommand(collection, null);
}
Map<String, Slice> newSlices = new LinkedHashMap<>();
boolean lastSlice = false;
for (Slice slice : coll.getSlices()) {
Replica replica = slice.getReplica(cnn);
if (replica != null) {
Map<String, Replica> newReplicas = slice.getReplicasCopy();
newReplicas.remove(cnn);
// TODO TODO TODO!!! if there are no replicas left for the slice, and the slice has no hash range, remove it
// if (newReplicas.size() == 0 && slice.getRange() == null) {
// if there are no replicas left for the slice remove it
if (newReplicas.size() == 0) {
slice = null;
lastSlice = true;
} else {
slice = new Slice(slice.getName(), newReplicas, slice.getProperties());
}
}
if (slice != null) {
newSlices.put(slice.getName(), slice);
}
}
if (lastSlice) {
// remove all empty pre allocated slices
for (Slice slice : coll.getSlices()) {
if (slice.getReplicas().size() == 0) {
newSlices.remove(slice.getName());
}
}
}
// if there are no slices left in the collection, remove it?
if (newSlices.size() == 0) {
return new ClusterStateMutator(zkStateReader).deleteCollection(clusterState,
new ZkNodeProps(ZkNodeProps.makeMap("name", collection)));
} else {
return new ZkWriteCommand(collection, coll.copyWithSlices(newSlices));
}
}
public ZkWriteCommand setShardLeader(ClusterState clusterState, ZkNodeProps message) {
StringBuilder sb = new StringBuilder();
String baseUrl = message.getStr(ZkStateReader.BASE_URL_PROP);
String coreName = message.getStr(ZkStateReader.CORE_NAME_PROP);
sb.append(baseUrl);
if (baseUrl != null && !baseUrl.endsWith("/")) sb.append("/");
sb.append(coreName == null ? "" : coreName);
if (!(sb.substring(sb.length() - 1).equals("/"))) sb.append("/");
String leaderUrl = sb.length() > 0 ? sb.toString() : null;
String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
DocCollection coll = clusterState.getCollectionOrNull(collectionName);
if (coll == null) {
log.error("Could not mark shard leader for non existing collection:" + collectionName);
return ZkStateWriter.NO_OP;
}
Map<String, Slice> slices = coll.getSlicesMap();
Slice slice = slices.get(sliceName);
Replica oldLeader = slice.getLeader();
final Map<String, Replica> newReplicas = new LinkedHashMap<>();
for (Replica replica : slice.getReplicas()) {
// TODO: this should only be calculated once and cached somewhere?
String coreURL = ZkCoreNodeProps.getCoreUrl(replica.getStr(ZkStateReader.BASE_URL_PROP), replica.getStr(ZkStateReader.CORE_NAME_PROP));
if (replica == oldLeader && !coreURL.equals(leaderUrl)) {
replica = new ReplicaMutator(zkStateReader).unsetLeader(replica);
} else if (coreURL.equals(leaderUrl)) {
replica = new ReplicaMutator(zkStateReader).setLeader(replica);
}
newReplicas.put(replica.getName(), replica);
}
Map<String, Object> newSliceProps = slice.shallowCopy();
newSliceProps.put(Slice.REPLICAS, newReplicas);
slice = new Slice(slice.getName(), newReplicas, slice.getProperties());
return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice));
}
public ZkWriteCommand updateShardState(ClusterState clusterState, ZkNodeProps message) {
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP;
log.info("Update shard state invoked for collection: " + collection + " with message: " + message);
Map<String, Slice> slicesCopy = new LinkedHashMap<>(clusterState.getSlicesMap(collection));
for (String key : message.keySet()) {
if (ZkStateReader.COLLECTION_PROP.equals(key)) continue;
if (Overseer.QUEUE_OPERATION.equals(key)) continue;
Slice slice = clusterState.getSlice(collection, key);
if (slice == null) {
throw new RuntimeException("Overseer.updateShardState unknown collection: " + collection + " slice: " + key);
}
log.info("Update shard state " + key + " to " + message.getStr(key));
Map<String, Object> props = slice.shallowCopy();
if (Slice.RECOVERY.equals(props.get(Slice.STATE)) && Slice.ACTIVE.equals(message.getStr(key))) {
props.remove(Slice.PARENT);
}
props.put(Slice.STATE, message.getStr(key));
Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
slicesCopy.put(slice.getName(), newSlice);
}
return new ZkWriteCommand(collection, clusterState.getCollection(collection).copyWithSlices(slicesCopy));
}
public ZkWriteCommand addRoutingRule(final ClusterState clusterState, ZkNodeProps message) {
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP;
String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
String routeKey = message.getStr("routeKey");
String range = message.getStr("range");
String targetCollection = message.getStr("targetCollection");
String targetShard = message.getStr("targetShard");
String expireAt = message.getStr("expireAt");
Slice slice = clusterState.getSlice(collection, shard);
if (slice == null) {
throw new RuntimeException("Overseer.addRoutingRule unknown collection: " + collection + " slice:" + shard);
}
Map<String, RoutingRule> routingRules = slice.getRoutingRules();
if (routingRules == null)
routingRules = new HashMap<>();
RoutingRule r = routingRules.get(routeKey);
if (r == null) {
Map<String, Object> map = new HashMap<>();
map.put("routeRanges", range);
map.put("targetCollection", targetCollection);
map.put("expireAt", expireAt);
RoutingRule rule = new RoutingRule(routeKey, map);
routingRules.put(routeKey, rule);
} else {
// add this range
Map<String, Object> map = r.shallowCopy();
map.put("routeRanges", map.get("routeRanges") + "," + range);
map.put("expireAt", expireAt);
routingRules.put(routeKey, new RoutingRule(routeKey, map));
}
Map<String, Object> props = slice.shallowCopy();
props.put("routingRules", routingRules);
Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
return new ZkWriteCommand(collection,
CollectionMutator.updateSlice(collection, clusterState.getCollection(collection), newSlice));
}
public ZkWriteCommand removeRoutingRule(final ClusterState clusterState, ZkNodeProps message) {
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP;
String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
String routeKeyStr = message.getStr("routeKey");
log.info("Overseer.removeRoutingRule invoked for collection: " + collection
+ " shard: " + shard + " routeKey: " + routeKeyStr);
Slice slice = clusterState.getSlice(collection, shard);
if (slice == null) {
log.warn("Unknown collection: " + collection + " shard: " + shard);
return ZkStateWriter.NO_OP;
}
Map<String, RoutingRule> routingRules = slice.getRoutingRules();
if (routingRules != null) {
routingRules.remove(routeKeyStr); // no rules left
Map<String, Object> props = slice.shallowCopy();
props.put("routingRules", routingRules);
Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
return new ZkWriteCommand(collection,
CollectionMutator.updateSlice(collection, clusterState.getCollection(collection), newSlice));
}
return ZkStateWriter.NO_OP;
}
public static DocCollection updateReplica(DocCollection collection, final Slice slice, String coreNodeName, final Replica replica) {
Map<String, Replica> copy = slice.getReplicasCopy();
if (replica == null) {
copy.remove(coreNodeName);
} else {
copy.put(replica.getName(), replica);
}
Slice newSlice = new Slice(slice.getName(), copy, slice.getProperties());
return CollectionMutator.updateSlice(collection.getName(), collection, newSlice);
}
}

View File

@ -0,0 +1,151 @@
package org.apache.solr.cloud.overseer;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.util.stats.TimerContext;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Collections.singletonMap;
public class ZkStateWriter {
private static Logger log = LoggerFactory.getLogger(ZkStateWriter.class);
public static ZkWriteCommand NO_OP = new ZkWriteCommand();
protected final ZkStateReader reader;
protected final Overseer.Stats stats;
protected Map<String, DocCollection> updates = new HashMap<>();
protected ClusterState clusterState = null;
protected boolean isClusterStateModified = false;
protected long lastUpdatedTime = -1;
public ZkStateWriter(ZkStateReader zkStateReader, Overseer.Stats stats) {
assert zkStateReader != null;
this.reader = zkStateReader;
this.stats = stats;
}
public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd) {
if (cmd == NO_OP) return prevState;
if (cmd.collection == null) {
isClusterStateModified = true;
clusterState = prevState.copyWith(singletonMap(cmd.name, (DocCollection) null));
updates.put(cmd.name, null);
} else {
if (cmd.collection.getStateFormat() > 1) {
updates.put(cmd.name, cmd.collection);
} else {
isClusterStateModified = true;
}
clusterState = prevState.copyWith(singletonMap(cmd.name, cmd.collection));
}
return clusterState;
}
public boolean hasPendingUpdates() {
return !updates.isEmpty() || isClusterStateModified;
}
public ClusterState writePendingUpdates() throws KeeperException, InterruptedException {
if (!hasPendingUpdates()) throw new IllegalStateException("No queued updates to execute");
TimerContext timerContext = stats.time("update_state");
boolean success = false;
try {
if (!updates.isEmpty()) {
for (Map.Entry<String, DocCollection> entry : updates.entrySet()) {
String name = entry.getKey();
String path = ZkStateReader.getCollectionPath(name);
DocCollection c = entry.getValue();
if (c == null) {
// let's clean up the collections path for this collection
reader.getZkClient().clean("/collections/" + name);
} else if (c.getStateFormat() > 1) {
byte[] data = ZkStateReader.toJSON(new ClusterState(-1, Collections.<String>emptySet(), singletonMap(c.getName(), c)));
if (reader.getZkClient().exists(path, true)) {
assert c.getZNodeVersion() >= 0;
log.info("going to update_collection {}", path);
Stat stat = reader.getZkClient().setData(path, data, c.getZNodeVersion(), true);
DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), stat.getVersion(), path);
clusterState = clusterState.copyWith(singletonMap(name, newCollection));
} else {
log.info("going to create_collection {}", path);
reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true);
DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), 0, path);
clusterState = clusterState.copyWith(singletonMap(name, newCollection));
isClusterStateModified = true;
}
} else if (c.getStateFormat() == 1) {
isClusterStateModified = true;
}
}
updates.clear();
}
if (isClusterStateModified) {
assert clusterState.getZkClusterStateVersion() >= 0;
lastUpdatedTime = System.nanoTime();
byte[] data = ZkStateReader.toJSON(clusterState);
Stat stat = reader.getZkClient().setData(ZkStateReader.CLUSTER_STATE, data, clusterState.getZkClusterStateVersion(), true);
Set<String> collectionNames = clusterState.getCollections();
Map<String, DocCollection> collectionStates = new HashMap<>(collectionNames.size());
for (String c : collectionNames) {
collectionStates.put(c, clusterState.getCollection(c));
}
// use the reader's live nodes because our cluster state's live nodes may be stale
clusterState = new ClusterState(stat.getVersion(), reader.getClusterState().getLiveNodes(), collectionStates);
isClusterStateModified = false;
}
success = true;
} finally {
timerContext.stop();
if (success) {
stats.success("update_state");
} else {
stats.error("update_state");
}
}
return clusterState;
}
public long getLastUpdatedTime() {
return lastUpdatedTime;
}
public ClusterState getClusterState() {
return clusterState;
}
}

View File

@ -0,0 +1,49 @@
package org.apache.solr.cloud.overseer;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
/**
* Created by shalin on 29/10/14.
*/
public class ZkWriteCommand {
public final String name;
public final DocCollection collection;
// public final ClusterState state;
public final boolean noop;
public ZkWriteCommand(String name, DocCollection collection) {
this.name = name;
this.collection = collection;
// this.state = state;
this.noop = false;
}
/**
* Returns a no-op
*/
public ZkWriteCommand() {
this.noop = true;
this.name = null;
this.collection = null;
// this.state = null;
}
}

View File

@ -0,0 +1,29 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!doctype html public "-//w3c//dtd html 4.0 transitional//en">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=iso-8859-1">
</head>
<body>
<p>
Classes for updating cluster state in <a href="http://wiki.apache.org/solr/SolrCloud">SolrCloud</a> mode.
</p>
</body>
</html>

View File

@ -83,6 +83,7 @@ import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerCollectionProcessor;
import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
@ -301,7 +302,7 @@ public class CollectionsHandler extends RequestHandlerBase {
for (Slice slice : dc.getSlices()) {
for (Replica replica : slice.getReplicas()) {
// Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already
if (replica.getBool(Overseer.preferredLeaderProp, false) == false) {
if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false) {
continue;
}
if (StringUtils.equalsIgnoreCase(replica.getStr(STATE_PROP), ACTIVE) == false) {
@ -442,7 +443,7 @@ public class CollectionsHandler extends RequestHandlerBase {
// Check if we're trying to set a property with parameters that allow us to set the property on multiple replicas
// in a slice on properties that are known to only be one-per-slice and error out if so.
if (StringUtils.isNotBlank((String)map.get(SHARD_UNIQUE)) &&
Overseer.sliceUniqueBooleanProperties.contains(property.toLowerCase(Locale.ROOT)) &&
SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(property.toLowerCase(Locale.ROOT)) &&
uniquePerSlice == false) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Overseer replica property command received for property " + property +
@ -472,7 +473,7 @@ public class CollectionsHandler extends RequestHandlerBase {
}
if (shardUnique == false &&
Overseer.sliceUniqueBooleanProperties.contains(prop) == false) {
SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(prop) == false) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Balancing properties amongst replicas in a slice requires that"
+ " the property be pre-defined as a unique property (e.g. 'preferredLeader') or that 'shardUnique' be set to 'true'. " +
" Property: " + prop + " shardUnique: " + Boolean.toString(shardUnique));

View File

@ -47,6 +47,7 @@ import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.LeaderInitiatedRecoveryThread;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
@ -77,7 +78,6 @@ import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.schema.TrieDateField;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
@ -542,7 +542,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (ruleExpiryLock.tryLock(10, TimeUnit.MILLISECONDS)) {
log.info("Going to expire routing rule");
try {
Map<String, Object> map = ZkNodeProps.makeMap(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.REMOVEROUTINGRULE.toLower(),
Map<String, Object> map = ZkNodeProps.makeMap(Overseer.QUEUE_OPERATION, OverseerAction.REMOVEROUTINGRULE.toLower(),
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.SHARD_ID_PROP, myShardId,
"routeKey", routeKey + "!");

View File

@ -21,6 +21,7 @@ import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Slice;
@ -157,7 +158,7 @@ public class DeleteShardTest extends AbstractFullDistribZkTestBase {
KeeperException, InterruptedException {
DistributedQueue inQueue = Overseer.getInQueue(cloudClient.getZkStateReader().getZkClient());
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.UPDATESHARDSTATE.toLower());
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
propMap.put(slice, state);
propMap.put(ZkStateReader.COLLECTION_PROP, "collection1");
ZkNodeProps m = new ZkNodeProps(propMap);

View File

@ -38,6 +38,7 @@ import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@ -106,7 +107,7 @@ public class OverseerRolesTest extends AbstractFullDistribZkTestBase{
Map m = (Map) ZkStateReader.fromJSON(data);
String s = (String) m.get("id");
String leader = LeaderElector.getNodeName(s);
Overseer.getInQueue(zk).offer(ZkStateReader.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.QUIT.toLower())));
Overseer.getInQueue(zk).offer(ZkStateReader.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower())));
long timeout = System.currentTimeMillis()+10000;
String newLeader=null;
for(;System.currentTimeMillis() < timeout;){

View File

@ -36,6 +36,7 @@ import javax.xml.parsers.ParserConfigurationException;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
@ -43,10 +44,14 @@ import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.MockConfigSolr;
import org.apache.solr.util.stats.Snapshot;
import org.apache.solr.util.stats.Timer;
import org.apache.solr.util.stats.TimerContext;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
@ -54,6 +59,7 @@ import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.xml.sax.SAXException;
@ -112,7 +118,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
if (ec != null) {
ec.cancelElection();
}
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.DELETECORE.toLower(),
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
ZkStateReader.NODE_NAME_PROP, nodeName,
ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName,
@ -121,7 +127,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
q.offer(ZkStateReader.toJSON(m));
return null;
} else {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.STATE.toLower(),
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
ZkStateReader.STATE_PROP, stateName,
ZkStateReader.NODE_NAME_PROP, nodeName,
ZkStateReader.CORE_NAME_PROP, coreName,
@ -525,7 +531,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
DistributedQueue q = Overseer.getInQueue(zkClient);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.STATE.toLower(),
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
ZkStateReader.NODE_NAME_PROP, "node1",
ZkStateReader.COLLECTION_PROP, "collection1",
@ -878,6 +884,133 @@ public class OverseerTest extends SolrTestCaseJ4 {
}
}
@Test
@Ignore
public void testPerformance() throws Exception {
String zkDir = createTempDir("OverseerTest.testPerformance").toFile().getAbsolutePath();
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient controllerClient = null;
SolrZkClient overseerClient = null;
ZkStateReader reader = null;
MockZKController mockController = null;
try {
server.run();
controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
reader = new ZkStateReader(controllerClient);
reader.createClusterStateWatchersAndUpdate();
mockController = new MockZKController(server.getZkAddress(), "node1");
for (int i=0; i<100; i++) {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
"name", "perf" + i,
ZkStateReader.NUM_SHARDS_PROP, "1",
"stateFormat", "2",
ZkStateReader.REPLICATION_FACTOR, "1",
ZkStateReader.MAX_SHARDS_PER_NODE, "1"
);
DistributedQueue q = Overseer.getInQueue(controllerClient);
q.offer(ZkStateReader.toJSON(m));
controllerClient.makePath("/collections/perf" + i, true);
}
for (int i = 0, j = 0, k = 0; i < 20000; i++, j++, k++) {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING,
ZkStateReader.NODE_NAME_PROP, "node1",
ZkStateReader.CORE_NAME_PROP, "core" + k,
ZkStateReader.CORE_NODE_NAME_PROP, "node1",
ZkStateReader.COLLECTION_PROP, "perf" + j,
ZkStateReader.NUM_SHARDS_PROP, "1",
ZkStateReader.BASE_URL_PROP, "http://" + "node1"
+ "/solr/");
DistributedQueue q = Overseer.getInQueue(controllerClient);
q.offer(ZkStateReader.toJSON(m));
if (j == 99) j = 0;
if (k == 9) k = 0;
if (i > 0 && i % 100 == 0) log.info("Published {} items", i);
}
// let's publish a sentinel collection which we'll use to wait for overseer to complete operations
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE,
ZkStateReader.NODE_NAME_PROP, "node1",
ZkStateReader.CORE_NAME_PROP, "core1",
ZkStateReader.CORE_NODE_NAME_PROP, "node1",
ZkStateReader.COLLECTION_PROP, "perf_sentinel",
ZkStateReader.NUM_SHARDS_PROP, "1",
ZkStateReader.BASE_URL_PROP, "http://" + "node1"
+ "/solr/");
DistributedQueue q = Overseer.getInQueue(controllerClient);
q.offer(ZkStateReader.toJSON(m));
Timer t = new Timer();
TimerContext context = t.time();
try {
overseerClient = electNewOverseer(server.getZkAddress());
assertTrue(overseers.size() > 0);
while (true) {
reader.updateClusterState(true);
ClusterState state = reader.getClusterState();
if (state.hasCollection("perf_sentinel")) {
break;
}
Thread.sleep(1000);
}
} finally {
context.stop();
}
log.info("Overseer loop finished processing: ");
printTimingStats(t);
Overseer overseer = overseers.get(0);
Overseer.Stats stats = overseer.getStats();
String[] interestingOps = {"state", "update_state", "am_i_leader", ""};
Arrays.sort(interestingOps);
for (Map.Entry<String, Overseer.Stat> entry : stats.getStats().entrySet()) {
String op = entry.getKey();
if (Arrays.binarySearch(interestingOps, op) < 0)
continue;
Overseer.Stat stat = entry.getValue();
log.info("op: {}, success: {}, failure: {}", op, stat.success.get(), stat.errors.get());
Timer timer = stat.requestTime;
printTimingStats(timer);
}
} finally {
close(overseerClient);
close(mockController);
close(controllerClient);
close(reader);
server.shutdown();
}
}
private void printTimingStats(Timer timer) {
Snapshot snapshot = timer.getSnapshot();
log.info("\t totalTime: {}", timer.getSum());
log.info("\t avgRequestsPerMinute: {}", timer.getMeanRate());
log.info("\t 5minRateRequestsPerMinute: {}", timer.getFiveMinuteRate());
log.info("\t 15minRateRequestsPerMinute: {}", timer.getFifteenMinuteRate());
log.info("\t avgTimePerRequest: {}", timer.getMean());
log.info("\t medianRequestTime: {}", snapshot.getMedian());
log.info("\t 75thPctlRequestTime: {}", snapshot.get75thPercentile());
log.info("\t 95thPctlRequestTime: {}", snapshot.get95thPercentile());
log.info("\t 99thPctlRequestTime: {}", snapshot.get99thPercentile());
log.info("\t 999thPctlRequestTime: {}", snapshot.get999thPercentile());
}
private void close(MockZKController mockController) {
if (mockController != null) {
mockController.close();
@ -928,7 +1061,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
//submit to proper queue
queue = Overseer.getInQueue(zkClient);
m = new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.STATE.toLower(),
m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
ZkStateReader.NODE_NAME_PROP, "node1",
ZkStateReader.SHARD_ID_PROP, "s1",

View File

@ -0,0 +1,70 @@
package org.apache.solr.cloud.overseer;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Collections;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.MockZkStateReader;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.ZkNodeProps;
public class TestClusterStateMutator extends SolrTestCaseJ4 {
public void testCreateCollection() throws Exception {
ClusterState state = new ClusterState(-1, Collections.<String>emptySet(), Collections.<String, DocCollection>emptyMap());
MockZkStateReader zkStateReader = new MockZkStateReader(state, Collections.<String>emptySet());
ClusterState clusterState = zkStateReader.getClusterState();
ClusterStateMutator mutator = new ClusterStateMutator(zkStateReader);
ZkNodeProps message = new ZkNodeProps(ZkNodeProps.makeMap(
"name", "xyz",
"numShards", "1"
));
ZkWriteCommand cmd = mutator.createCollection(clusterState, message);
DocCollection collection = cmd.collection;
assertEquals("xyz", collection.getName());
assertEquals(1, collection.getSlicesMap().size());
assertEquals(1, collection.getMaxShardsPerNode());
state = new ClusterState(-1, Collections.<String>emptySet(), Collections.singletonMap("xyz", collection));
message = new ZkNodeProps(ZkNodeProps.makeMap(
"name", "abc",
"numShards", "2",
"router.name", "implicit",
"shards", "x,y",
"replicationFactor", "3",
"maxShardsPerNode", "4"
));
cmd = mutator.createCollection(state, message);
collection = cmd.collection;
assertEquals("abc", collection.getName());
assertEquals(2, collection.getSlicesMap().size());
assertNotNull(collection.getSlicesMap().get("x"));
assertNotNull(collection.getSlicesMap().get("y"));
assertNull(collection.getSlicesMap().get("x").getRange());
assertNull(collection.getSlicesMap().get("y").getRange());
assertEquals("active", collection.getSlicesMap().get("x").getState());
assertEquals("active", collection.getSlicesMap().get("y").getState());
assertEquals(4, collection.getMaxShardsPerNode());
assertEquals(ImplicitDocRouter.class, collection.getRouter().getClass());
assertNotNull(state.getCollectionOrNull("xyz")); // we still have the old collection
}
}

View File

@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
public class ClusterState implements JSONWriter.Writable {
private static Logger log = LoggerFactory.getLogger(ClusterState.class);
private Integer znodeVersion;
private final Integer znodeVersion;
private final Map<String, CollectionRef> collectionStates;
private Set<String> liveNodes;

View File

@ -35,7 +35,7 @@ public class DocCollection extends ZkNodeProps {
public static final String DOC_ROUTER = "router";
public static final String SHARDS = "shards";
public static final String STATE_FORMAT = "stateFormat";
private int znodeVersion;
private int znodeVersion = -1; // sentinel
private final String name;
private final Map<String, Slice> slices;