From 2b0ecdbf47c1e80f6ef760e0ccf8818f94b70c5b Mon Sep 17 00:00:00 2001 From: Shalin Shekhar Mangar Date: Sat, 29 Nov 2014 15:53:11 +0000 Subject: [PATCH] SOLR-6554: Speed up overseer operations git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1642437 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 5 + .../apache/solr/cloud/ElectionContext.java | 5 +- .../java/org/apache/solr/cloud/Overseer.java | 983 +++--------------- .../cloud/OverseerCollectionProcessor.java | 18 +- .../org/apache/solr/cloud/ZkController.java | 6 +- .../cloud/overseer/ClusterStateMutator.java | 188 ++++ .../cloud/overseer/CollectionMutator.java | 119 +++ .../solr/cloud/overseer/OverseerAction.java | 57 + .../solr/cloud/overseer/ReplicaMutator.java | 422 ++++++++ .../solr/cloud/overseer/SliceMutator.java | 281 +++++ .../solr/cloud/overseer/ZkStateWriter.java | 151 +++ .../solr/cloud/overseer/ZkWriteCommand.java | 49 + .../apache/solr/cloud/overseer/package.html | 29 + .../handler/admin/CollectionsHandler.java | 7 +- .../processor/DistributedUpdateProcessor.java | 4 +- .../apache/solr/cloud/DeleteShardTest.java | 3 +- .../apache/solr/cloud/OverseerRolesTest.java | 3 +- .../org/apache/solr/cloud/OverseerTest.java | 141 ++- .../overseer/TestClusterStateMutator.java | 70 ++ .../solr/common/cloud/ClusterState.java | 2 +- .../solr/common/cloud/DocCollection.java | 2 +- 21 files changed, 1654 insertions(+), 891 deletions(-) create mode 100644 solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java create mode 100644 solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java create mode 100644 solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java create mode 100644 solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java create mode 100644 solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java create mode 100644 solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java create mode 100644 solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java create mode 100644 solr/core/src/java/org/apache/solr/cloud/overseer/package.html create mode 100644 solr/core/src/test/org/apache/solr/cloud/overseer/TestClusterStateMutator.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 7f1b685ba4d..8ac72cfa78c 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -315,6 +315,11 @@ Optimizations * SOLR-6603: LBHttpSolrServer - lazily allocate skipped-zombie-servers list. (Christine Poerschke via shalin) +* SOLR-6554: Speed up overseer operations avoiding cluster state reads from + zookeeper at the start of each loop and instead relying on local state and + compare-and-set writes. This change also adds batching for consecutive messages + belonging to the same collection with stateFormat=2. (shalin) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index 8a4278d360d..b21b23ff1a8 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -1,6 +1,7 @@ package org.apache.solr.cloud; import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.Replica; @@ -148,7 +149,7 @@ class ShardLeaderElectionContextBase extends ElectionContext { assert shardId != null; ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION, - Overseer.OverseerAction.LEADER.toLower(), ZkStateReader.SHARD_ID_PROP, shardId, + OverseerAction.LEADER.toLower(), ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.BASE_URL_PROP, leaderProps.getProperties().get(ZkStateReader.BASE_URL_PROP), ZkStateReader.CORE_NAME_PROP, @@ -201,7 +202,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP); // clear the leader in clusterstate - ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.LEADER.toLower(), + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(), ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection); Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m)); diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 7f13e81d4b7..fe0efe3556c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -47,6 +47,13 @@ import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.ImmutableSet; import org.apache.commons.lang.StringUtils; import org.apache.solr.client.solrj.SolrResponse; +import org.apache.solr.cloud.overseer.ClusterStateMutator; +import org.apache.solr.cloud.overseer.CollectionMutator; +import org.apache.solr.cloud.overseer.OverseerAction; +import org.apache.solr.cloud.overseer.ReplicaMutator; +import org.apache.solr.cloud.overseer.SliceMutator; +import org.apache.solr.cloud.overseer.ZkStateWriter; +import org.apache.solr.cloud.overseer.ZkWriteCommand; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; @@ -90,41 +97,6 @@ public class Overseer implements Closeable { @Deprecated public static final String REMOVESHARD = "removeshard"; - /** - * Enum of actions supported by the overseer only. - * - * There are other actions supported which are public and defined - * in {@link org.apache.solr.common.params.CollectionParams.CollectionAction} - */ - public static enum OverseerAction { - LEADER, - DELETECORE, - ADDROUTINGRULE, - REMOVEROUTINGRULE, - UPDATESHARDSTATE, - STATE, - QUIT; - - public static OverseerAction get(String p) { - if (p != null) { - try { - return OverseerAction.valueOf(p.toUpperCase(Locale.ROOT)); - } catch (Exception ex) { - } - } - return null; - } - - public boolean isEqual(String s) { - return s != null && toString().equals(s.toUpperCase(Locale.ROOT)); - } - - public String toLower() { - return toString().toLowerCase(Locale.ROOT); - } - } - - public static final int STATE_UPDATE_DELAY = 1500; // delay between cloud state updates private static Logger log = LoggerFactory.getLogger(Overseer.class); @@ -205,7 +177,9 @@ public class Overseer implements Closeable { reader.updateClusterState(true); ClusterState clusterState = reader.getClusterState(); log.info("Replaying operations from work queue."); - + + ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats); + while (head != null) { isLeader = amILeader(); if (LeaderStatus.NO == isLeader) { @@ -216,7 +190,8 @@ public class Overseer implements Closeable { final String operation = message.getStr(QUEUE_OPERATION); final TimerContext timerContext = stats.time(operation); try { - clusterState = processMessage(clusterState, message, operation, workQueue.getStats().getQueueLength()); + ZkWriteCommand zkWriteCommand = processMessage(clusterState, message, operation, workQueue.getStats().getQueueLength()); + clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand); stats.success(operation); } catch (Exception e) { // generally there is nothing we can do - in most cases, we have @@ -229,8 +204,9 @@ public class Overseer implements Closeable { } finally { timerContext.stop(); } - updateZkStates(clusterState); - + if (zkStateWriter.hasPendingUpdates()) { + clusterState = zkStateWriter.writePendingUpdates(); + } workQueue.poll(); // poll-ing removes the element we got by peek-ing } else { @@ -260,7 +236,11 @@ public class Overseer implements Closeable { log.info("Starting to work on the main queue"); int lastStateFormat = -1; // sentinel + String lastCollectionName = null; try { + ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats); + ClusterState clusterState = null; + boolean refreshClusterState = true; // let's refresh in the first iteration while (!this.isClosed) { isLeader = amILeader(); if (LeaderStatus.NO == isLeader) { @@ -289,8 +269,41 @@ public class Overseer implements Closeable { } synchronized (reader.getUpdateLock()) { try { - reader.updateClusterState(true); - ClusterState clusterState = reader.getClusterState(); + if (refreshClusterState) { + reader.updateClusterState(true); + clusterState = reader.getClusterState(); + refreshClusterState = false; + + // if there were any errors while processing + // the state queue, items would have been left in the + // work queue so let's process those first + byte[] data = workQueue.peek(); + while (data != null) { + final ZkNodeProps message = ZkNodeProps.load(data); + final String operation = message.getStr(QUEUE_OPERATION); + final TimerContext timerContext = stats.time(operation); + try { + ZkWriteCommand zkWriteCommand = processMessage(clusterState, message, operation, workQueue.getStats().getQueueLength()); + clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand); + stats.success(operation); + } catch (Exception e) { + // generally there is nothing we can do - in most cases, we have + // an issue that will fail again on retry or we cannot communicate with a + // ZooKeeper in which case another Overseer should take over + // TODO: if ordering for the message is not important, we could + // track retries and put it back on the end of the queue + log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e); + stats.error(operation); + } finally { + timerContext.stop(); + } + if (zkStateWriter.hasPendingUpdates()) { + clusterState = zkStateWriter.writePendingUpdates(); + } + workQueue.poll(); // poll-ing removes the element we got by peek-ing + data = workQueue.peek(); + } + } while (head != null) { final ZkNodeProps message = ZkNodeProps.load(head.getBytes()); @@ -305,6 +318,8 @@ public class Overseer implements Closeable { DocCollection docCollection = clusterState.getCollectionOrNull(collection); if (lastStateFormat != -1 && docCollection != null && docCollection.getStateFormat() != lastStateFormat) { lastStateFormat = docCollection.getStateFormat(); + // we don't want to mix batching of different state formats together because that makes + // it harder to guarantee atomicity of ZK writes break; } if (docCollection != null) { @@ -314,7 +329,8 @@ public class Overseer implements Closeable { final TimerContext timerContext = stats.time(operation); try { - clusterState = processMessage(clusterState, message, operation, stateUpdateQueue.getStats().getQueueLength()); + ZkWriteCommand zkWriteCommand = processMessage(clusterState, message, operation, stateUpdateQueue.getStats().getQueueLength()); + clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand); stats.success(operation); } catch (Exception e) { // generally there is nothing we can do - in most cases, we have @@ -332,11 +348,18 @@ public class Overseer implements Closeable { stateUpdateQueue.poll(); if (isClosed || System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS)) break; - if(!updateNodes.isEmpty()) break; + if (!updateNodes.isEmpty() && !collection.equals(lastCollectionName)) { + lastCollectionName = collection; + break; + } + lastCollectionName = collection; // if an event comes in the next 100ms batch it together head = stateUpdateQueue.peek(100); } - updateZkStates(clusterState); + if (zkStateWriter.hasPendingUpdates()) { + clusterState = zkStateWriter.writePendingUpdates(); + lastUpdatedTime = zkStateWriter.getLastUpdatedTime(); + } // clean work queue while (workQueue.poll() != null) ; @@ -346,12 +369,13 @@ public class Overseer implements Closeable { return; } log.error("Exception in Overseer main queue loop", e); + refreshClusterState = true; // it might have been a bad version error } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; - } catch (Exception e) { log.error("Exception in Overseer main queue loop", e); + refreshClusterState = true; // it might have been a bad version error } } @@ -368,59 +392,6 @@ public class Overseer implements Closeable { } } - private void updateZkStates(ClusterState clusterState) throws KeeperException, InterruptedException { - TimerContext timerContext = stats.time("update_state"); - boolean success = false; - try { - if (!updateNodes.isEmpty()) { - for (Entry e : updateNodes.entrySet()) { - if (e.getValue() == null) { - if (zkClient.exists(e.getKey(), true)) zkClient.delete(e.getKey(), 0, true); - } else { - byte[] data = ZkStateReader.toJSON(e.getValue()); - if (zkClient.exists(e.getKey(), true)) { - log.info("going to update_collection {}", e.getKey()); - zkClient.setData(e.getKey(), data, true); - } else { - log.info("going to create_collection {}", e.getKey()); - String parentPath = e.getKey().substring(0, e.getKey().lastIndexOf('/')); - if (!zkClient.exists(parentPath, true)) { - // if the /collections/collection_name path doesn't exist then it means that - // 1) the user invoked a DELETE collection API and the OverseerCollectionProcessor has deleted - // this zk path. - // 2) these are most likely old "state" messages which are only being processed now because - // if they were new "state" messages then in legacy mode, a new collection would have been - // created with stateFormat = 1 (which is the default state format) - // 3) these can't be new "state" messages created for a new collection because - // otherwise the OverseerCollectionProcessor would have already created this path - // as part of the create collection API call -- which is the only way in which a collection - // with stateFormat > 1 can possibly be created - continue; - } - zkClient.create(e.getKey(), data, CreateMode.PERSISTENT, true); - } - } - } - updateNodes.clear(); - } - - if (isClusterStateModified) { - lastUpdatedTime = System.nanoTime(); - zkClient.setData(ZkStateReader.CLUSTER_STATE, - ZkStateReader.toJSON(clusterState), true); - isClusterStateModified = false; - } - success = true; - } finally { - timerContext.stop(); - if (success) { - stats.success("update_state"); - } else { - stats.error("update_state"); - } - } - } - private void checkIfIamStillLeader() { if (zkController != null && zkController.getCoreContainer().isShutDown()) return;//shutting down no need to go further org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat(); @@ -460,41 +431,33 @@ public class Overseer implements Closeable { } } - private ClusterState processMessage(ClusterState clusterState, + private ZkWriteCommand processMessage(ClusterState clusterState, final ZkNodeProps message, final String operation, int queueSize) { log.info("processMessage: queueSize: {}, message = {}", queueSize, message); CollectionParams.CollectionAction collectionAction = CollectionParams.CollectionAction.get(operation); if (collectionAction != null) { switch (collectionAction) { case CREATE: - clusterState = buildCollection(clusterState, message); - break; + return buildCollection(clusterState, message); case DELETE: - clusterState = removeCollection(clusterState, message); - break; + return removeCollection(clusterState, message); case CREATESHARD: - clusterState = createShard(clusterState, message); - break; + return createShard(clusterState, message); case DELETESHARD: - clusterState = removeShard(clusterState, message); - break; + return removeShard(clusterState, message); case ADDREPLICA: - clusterState = createReplica(clusterState, message); - break; + return createReplica(clusterState, message); case CLUSTERPROP: handleProp(message); - break; case ADDREPLICAPROP: - clusterState = addReplicaProp(clusterState, message); - break; + return addReplicaProp(clusterState, message); case DELETEREPLICAPROP: - clusterState = deleteReplicaProp(clusterState, message); - break; + return deleteReplicaProp(clusterState, message); case BALANCESHARDUNIQUE: ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(this, clusterState, message); if (dProp.balanceProperty()) { String collName = message.getStr(ZkStateReader.COLLECTION_PROP); - clusterState = newState(clusterState, singletonMap(collName, dProp.getDocCollection())); + return new ZkWriteCommand(collName, dProp.getDocCollection()); } break; default: @@ -506,27 +469,17 @@ public class Overseer implements Closeable { if (overseerAction != null) { switch (overseerAction) { case STATE: - if (isLegacy(clusterProps)) { - clusterState = updateState(clusterState, message); - } else { - clusterState = updateStateNew(clusterState, message); - } - break; + return new ReplicaMutator(getZkStateReader()).setState(clusterState, message); case LEADER: - clusterState = setShardLeader(clusterState, message); - break; + return setShardLeader(clusterState, message); case DELETECORE: - clusterState = removeCore(clusterState, message); - break; + return removeCore(clusterState, message); case ADDROUTINGRULE: - clusterState = addRoutingRule(clusterState, message); - break; + return addRoutingRule(clusterState, message); case REMOVEROUTINGRULE: - clusterState = removeRoutingRule(clusterState, message); - break; + return removeRoutingRule(clusterState, message); case UPDATESHARDSTATE: - clusterState = updateShardState(clusterState, message); - break; + return updateShardState(clusterState, message); case QUIT: if (myId.equals(message.get("id"))) { log.info("Quit command received {}", LeaderElector.getNodeName(myId)); @@ -545,14 +498,11 @@ public class Overseer implements Closeable { // specified in CollectionAction. See SOLR-6115. Remove this in 5.0 switch (operation) { case OverseerCollectionProcessor.CREATECOLLECTION: - clusterState = buildCollection(clusterState, message); - break; + return buildCollection(clusterState, message); case REMOVECOLLECTION: - clusterState = removeCollection(clusterState, message); - break; + return removeCollection(clusterState, message); case REMOVESHARD: - clusterState = removeShard(clusterState, message); - break; + return removeShard(clusterState, message); default: throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties()); @@ -560,125 +510,19 @@ public class Overseer implements Closeable { } } - return clusterState; + return ZkStateWriter.NO_OP; } - private ClusterState addReplicaProp(ClusterState clusterState, ZkNodeProps message) { - - if (checkKeyExistence(message, ZkStateReader.COLLECTION_PROP) == false || - checkKeyExistence(message, ZkStateReader.SHARD_ID_PROP) == false || - checkKeyExistence(message, ZkStateReader.REPLICA_PROP) == false || - checkKeyExistence(message, ZkStateReader.PROPERTY_PROP) == false || - checkKeyExistence(message, ZkStateReader.PROPERTY_VALUE_PROP) == false) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, - "Overseer SETREPLICAPROPERTY requires " + - ZkStateReader.COLLECTION_PROP + " and " + ZkStateReader.SHARD_ID_PROP + " and " + - ZkStateReader.REPLICA_PROP + " and " + ZkStateReader.PROPERTY_PROP + " and " + - ZkStateReader.PROPERTY_VALUE_PROP + " no action taken."); - } - - String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP); - String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP); - String replicaName = message.getStr(ZkStateReader.REPLICA_PROP); - String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT); - if (StringUtils.startsWith(property, COLL_PROP_PREFIX) == false) { - property = OverseerCollectionProcessor.COLL_PROP_PREFIX + property; - } - property = property.toLowerCase(Locale.ROOT); - String propVal = message.getStr(ZkStateReader.PROPERTY_VALUE_PROP); - String shardUnique = message.getStr(OverseerCollectionProcessor.SHARD_UNIQUE); - - boolean isUnique = false; - - if (sliceUniqueBooleanProperties.contains(property)) { - if (StringUtils.isNotBlank(shardUnique) && Boolean.parseBoolean(shardUnique) == false) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Overseer SETREPLICAPROPERTY for " + - property + " cannot have " + OverseerCollectionProcessor.SHARD_UNIQUE + " set to anything other than" + - "'true'. No action taken"); - } - isUnique = true; - } else { - isUnique = Boolean.parseBoolean(shardUnique); - } - - Replica replica = clusterState.getReplica(collectionName, replicaName); - - if (replica == null) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection/slice/replica " + - collectionName + "/" + sliceName + "/" + replicaName + " no action taken."); - } - log.info("Setting property " + property + " with value: " + propVal + - " for collection: " + collectionName + ". Full message: " + message); - if (StringUtils.equalsIgnoreCase(replica.getStr(property), propVal)) return clusterState; // already the value we're going to set - - // OK, there's no way we won't change the cluster state now - Map replicas = clusterState.getSlice(collectionName, sliceName).getReplicasCopy(); - if (isUnique == false) { - replicas.get(replicaName).getProperties().put(property, propVal); - } else { // Set prop for this replica, but remove it for all others. - for (Replica rep : replicas.values()) { - if (rep.getName().equalsIgnoreCase(replicaName)) { - rep.getProperties().put(property, propVal); - } else { - rep.getProperties().remove(property); - } - } - } - Slice newSlice = new Slice(sliceName, replicas, clusterState.getSlice(collectionName, sliceName).shallowCopy()); - return updateSlice(clusterState, collectionName, newSlice); + private ZkWriteCommand addReplicaProp(ClusterState clusterState, ZkNodeProps message) { + return new ReplicaMutator(getZkStateReader()).addReplicaProperty(clusterState, message); } - private ClusterState deleteReplicaProp(ClusterState clusterState, ZkNodeProps message) { - - if (checkKeyExistence(message, ZkStateReader.COLLECTION_PROP) == false || - checkKeyExistence(message, ZkStateReader.SHARD_ID_PROP) == false || - checkKeyExistence(message, ZkStateReader.REPLICA_PROP) == false || - checkKeyExistence(message, ZkStateReader.PROPERTY_PROP) == false) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, - "Overseer DELETEREPLICAPROPERTY requires " + - ZkStateReader.COLLECTION_PROP + " and " + ZkStateReader.SHARD_ID_PROP + " and " + - ZkStateReader.REPLICA_PROP + " and " + ZkStateReader.PROPERTY_PROP + " no action taken."); - } - String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP); - String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP); - String replicaName = message.getStr(ZkStateReader.REPLICA_PROP); - String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT); - if (StringUtils.startsWith(property, COLL_PROP_PREFIX) == false) { - property = OverseerCollectionProcessor.COLL_PROP_PREFIX + property; - } - - Replica replica = clusterState.getReplica(collectionName, replicaName); - - if (replica == null) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection/slice/replica " + - collectionName + "/" + sliceName + "/" + replicaName + " no action taken."); - } - - log.info("Deleting property " + property + " for collection: " + collectionName + - " slice " + sliceName + " replica " + replicaName + ". Full message: " + message); - String curProp = replica.getStr(property); - if (curProp == null) return clusterState; // not there anyway, nothing to do. - - Map replicas = clusterState.getSlice(collectionName, sliceName).getReplicasCopy(); - replica = replicas.get(replicaName); - replica.getProperties().remove(property); - Slice newSlice = new Slice(sliceName, replicas, clusterState.getSlice(collectionName, sliceName).shallowCopy()); - return updateSlice(clusterState, collectionName, newSlice); + private ZkWriteCommand deleteReplicaProp(ClusterState clusterState, ZkNodeProps message) { + return new ReplicaMutator(getZkStateReader()).removeReplicaProperty(clusterState, message); } - private ClusterState setShardLeader(ClusterState clusterState, ZkNodeProps message) { - StringBuilder sb = new StringBuilder(); - String baseUrl = message.getStr(ZkStateReader.BASE_URL_PROP); - String coreName = message.getStr(ZkStateReader.CORE_NAME_PROP); - sb.append(baseUrl); - if (baseUrl != null && !baseUrl.endsWith("/")) sb.append("/"); - sb.append(coreName == null ? "" : coreName); - if (!(sb.substring(sb.length() - 1).equals("/"))) sb.append("/"); - clusterState = setShardLeader(clusterState, - message.getStr(ZkStateReader.COLLECTION_PROP), - message.getStr(ZkStateReader.SHARD_ID_PROP), - sb.length() > 0 ? sb.toString() : null); - return clusterState; + private ZkWriteCommand setShardLeader(ClusterState clusterState, ZkNodeProps message) { + return new SliceMutator(getZkStateReader()).setShardLeader(clusterState, message); } private void handleProp(ZkNodeProps message) { @@ -700,176 +544,28 @@ public class Overseer implements Closeable { } } - private ClusterState createReplica(ClusterState clusterState, ZkNodeProps message) { - String coll = message.getStr(ZkStateReader.COLLECTION_PROP); - if (!checkCollectionKeyExistence(message)) return clusterState; - String slice = message.getStr(ZkStateReader.SHARD_ID_PROP); - DocCollection collection = clusterState.getCollection(coll); - Slice sl = collection.getSlice(slice); - if(sl == null){ - log.error("Invalid Collection/Slice {}/{} ",coll,slice); - return clusterState; - } - - String coreNodeName = Assign.assignNode(coll, clusterState); - Replica replica = new Replica(coreNodeName, - makeMap( - ZkStateReader.CORE_NAME_PROP, message.getStr(ZkStateReader.CORE_NAME_PROP), - ZkStateReader.BASE_URL_PROP,message.getStr(ZkStateReader.BASE_URL_PROP), - ZkStateReader.STATE_PROP,message.getStr(ZkStateReader.STATE_PROP))); - sl.getReplicasMap().put(coreNodeName, replica); - return newState(clusterState, singletonMap(coll, collection)); + private ZkWriteCommand createReplica(ClusterState clusterState, ZkNodeProps message) { + return new SliceMutator(getZkStateReader()).addReplica(clusterState, message); } - private ClusterState buildCollection(ClusterState clusterState, ZkNodeProps message) { - String collection = message.getStr("name"); - log.info("Building a new collection: {}", collection); - if(clusterState.hasCollection(collection) ){ - log.warn("Collection {} already exists. exit" ,collection); - return clusterState; - } - - ArrayList shardNames = new ArrayList<>(); - - if(ImplicitDocRouter.NAME.equals( message.getStr("router.name",DocRouter.DEFAULT_NAME))){ - getShardNames(shardNames,message.getStr("shards",DocRouter.DEFAULT_NAME)); - } else { - int numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, -1); - if(numShards<1) throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,"numShards is a required parameter for 'compositeId' router"); - getShardNames(numShards, shardNames); - } - - return createCollection(clusterState, collection, shardNames, message); + private ZkWriteCommand buildCollection(ClusterState clusterState, ZkNodeProps message) { + return new ClusterStateMutator(getZkStateReader()).createCollection(clusterState, message); } - private ClusterState updateShardState(ClusterState clusterState, ZkNodeProps message) { - String collection = message.getStr(ZkStateReader.COLLECTION_PROP); - if (!checkCollectionKeyExistence(message)) return clusterState; - log.info("Updating shard state for collection: {}", collection); - for (String key : message.keySet()) { - if (ZkStateReader.COLLECTION_PROP.equals(key)) continue; - if (QUEUE_OPERATION.equals(key)) continue; - - Slice slice = clusterState.getSlice(collection, key); - if (slice == null) { - throw new RuntimeException("Overseer.updateShardState unknown collection: " + collection + " slice: " + key); - } - log.info("Update shard state " + key + " to " + message.getStr(key)); - Map props = slice.shallowCopy(); - if (Slice.RECOVERY.equals(props.get(Slice.STATE)) && Slice.ACTIVE.equals(message.getStr(key))) { - props.remove(Slice.PARENT); - } - props.put(Slice.STATE, message.getStr(key)); - Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props); - clusterState = updateSlice(clusterState, collection, newSlice); - } - - return clusterState; + private ZkWriteCommand updateShardState(ClusterState clusterState, ZkNodeProps message) { + return new SliceMutator(getZkStateReader()).updateShardState(clusterState, message); } - private ClusterState addRoutingRule(ClusterState clusterState, ZkNodeProps message) { - String collection = message.getStr(ZkStateReader.COLLECTION_PROP); - if (!checkCollectionKeyExistence(message)) return clusterState; - String shard = message.getStr(ZkStateReader.SHARD_ID_PROP); - String routeKey = message.getStr("routeKey"); - String range = message.getStr("range"); - String targetCollection = message.getStr("targetCollection"); - String targetShard = message.getStr("targetShard"); - String expireAt = message.getStr("expireAt"); - - Slice slice = clusterState.getSlice(collection, shard); - if (slice == null) { - throw new RuntimeException("Overseer.addRoutingRule unknown collection: " + collection + " slice:" + shard); - } - - Map routingRules = slice.getRoutingRules(); - if (routingRules == null) - routingRules = new HashMap<>(); - RoutingRule r = routingRules.get(routeKey); - if (r == null) { - Map map = new HashMap<>(); - map.put("routeRanges", range); - map.put("targetCollection", targetCollection); - map.put("expireAt", expireAt); - RoutingRule rule = new RoutingRule(routeKey, map); - routingRules.put(routeKey, rule); - } else { - // add this range - Map map = r.shallowCopy(); - map.put("routeRanges", map.get("routeRanges") + "," + range); - map.put("expireAt", expireAt); - routingRules.put(routeKey, new RoutingRule(routeKey, map)); - } - - Map props = slice.shallowCopy(); - props.put("routingRules", routingRules); - - Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props); - clusterState = updateSlice(clusterState, collection, newSlice); - return clusterState; + private ZkWriteCommand addRoutingRule(ClusterState clusterState, ZkNodeProps message) { + return new SliceMutator(getZkStateReader()).addRoutingRule(clusterState, message); } - private boolean checkCollectionKeyExistence(ZkNodeProps message) { - return checkKeyExistence(message, ZkStateReader.COLLECTION_PROP); - } - - private boolean checkKeyExistence(ZkNodeProps message, String key) { - String value = message.getStr(key); - if (value == null || value.trim().length() == 0) { - log.error("Skipping invalid Overseer message because it has no " + key + " specified: " + message); - return false; - } - return true; + private ZkWriteCommand removeRoutingRule(ClusterState clusterState, ZkNodeProps message) { + return new SliceMutator(getZkStateReader()).removeRoutingRule(clusterState, message); } - private ClusterState removeRoutingRule(ClusterState clusterState, ZkNodeProps message) { - String collection = message.getStr(ZkStateReader.COLLECTION_PROP); - if (!checkCollectionKeyExistence(message)) return clusterState; - String shard = message.getStr(ZkStateReader.SHARD_ID_PROP); - String routeKeyStr = message.getStr("routeKey"); - - log.info("Overseer.removeRoutingRule invoked for collection: " + collection - + " shard: " + shard + " routeKey: " + routeKeyStr); - - Slice slice = clusterState.getSlice(collection, shard); - if (slice == null) { - log.warn("Unknown collection: " + collection + " shard: " + shard); - return clusterState; - } - Map routingRules = slice.getRoutingRules(); - if (routingRules != null) { - routingRules.remove(routeKeyStr); // no rules left - Map props = slice.shallowCopy(); - props.put("routingRules", routingRules); - Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props); - clusterState = updateSlice(clusterState, collection, newSlice); - } - - return clusterState; - } - - private ClusterState createShard(ClusterState clusterState, ZkNodeProps message) { - String collection = message.getStr(ZkStateReader.COLLECTION_PROP); - if (!checkCollectionKeyExistence(message)) return clusterState; - String shardId = message.getStr(ZkStateReader.SHARD_ID_PROP); - Slice slice = clusterState.getSlice(collection, shardId); - if (slice == null) { - Map replicas = Collections.EMPTY_MAP; - Map sliceProps = new HashMap<>(); - String shardRange = message.getStr(ZkStateReader.SHARD_RANGE_PROP); - String shardState = message.getStr(ZkStateReader.SHARD_STATE_PROP); - String shardParent = message.getStr(ZkStateReader.SHARD_PARENT_PROP); - sliceProps.put(Slice.RANGE, shardRange); - sliceProps.put(Slice.STATE, shardState); - if (shardParent != null) { - sliceProps.put(Slice.PARENT, shardParent); - } - slice = new Slice(shardId, replicas, sliceProps); - clusterState = updateSlice(clusterState, collection, slice); - } else { - log.error("Unable to create Shard: " + shardId + " because it already exists in collection: " + collection); - } - return clusterState; + private ZkWriteCommand createShard(ClusterState clusterState, ZkNodeProps message) { + return new CollectionMutator(getZkStateReader()).createShard(clusterState, message); } private LeaderStatus amILeader() { @@ -906,305 +602,10 @@ public class Overseer implements Closeable { return LeaderStatus.NO; } - private ClusterState updateStateNew(ClusterState clusterState, ZkNodeProps message) { - String collection = message.getStr(ZkStateReader.COLLECTION_PROP); - if (!checkCollectionKeyExistence(message)) return clusterState; - String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP); - - if(collection==null || sliceName == null){ - log.error("Invalid collection and slice {}", message); - return clusterState; - } - Slice slice = clusterState.getSlice(collection, sliceName); - if(slice == null){ - log.error("No such slice exists {}", message); - return clusterState; - } - - return updateState(clusterState, message); - } - - /** - * Try to assign core to the cluster. - */ - private ClusterState updateState(ClusterState clusterState, final ZkNodeProps message) { - final String collection = message.getStr(ZkStateReader.COLLECTION_PROP); - if (!checkCollectionKeyExistence(message)) return clusterState; - Integer numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, null); - - List shardNames = new ArrayList<>(); - - //collection does not yet exist, create placeholders if num shards is specified - boolean collectionExists = clusterState.hasCollection(collection); - if (!collectionExists && numShards!=null) { - getShardNames(numShards, shardNames); - clusterState = createCollection(clusterState, collection, shardNames, message); - } - String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP); - - String coreNodeName = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP); - if (coreNodeName == null) { - coreNodeName = getAssignedCoreNodeName(clusterState, message); - if (coreNodeName != null) { - log.info("node=" + coreNodeName + " is already registered"); - } else { - // if coreNodeName is null, auto assign one - coreNodeName = Assign.assignNode(collection, clusterState); - } - message.getProperties().put(ZkStateReader.CORE_NODE_NAME_PROP, - coreNodeName); - } - - // use the provided non null shardId - if (sliceName == null) { - //get shardId from ClusterState - sliceName = getAssignedId(clusterState, coreNodeName, message); - if (sliceName != null) { - log.info("shard=" + sliceName + " is already registered"); - } - } - if(sliceName == null) { - //request new shardId - if (collectionExists) { - // use existing numShards - numShards = clusterState.getCollection(collection).getSlices().size(); - log.info("Collection already exists with " + ZkStateReader.NUM_SHARDS_PROP + "=" + numShards); - } - sliceName = Assign.assignShard(collection, clusterState, numShards); - log.info("Assigning new node to shard shard=" + sliceName); - } - - Slice slice = clusterState.getSlice(collection, sliceName); - - Map replicaProps = new LinkedHashMap<>(); - - replicaProps.putAll(message.getProperties()); - // System.out.println("########## UPDATE MESSAGE: " + JSONUtil.toJSON(message)); - if (slice != null) { - Replica oldReplica = slice.getReplicasMap().get(coreNodeName); - if (oldReplica != null) { - if (oldReplica.containsKey(ZkStateReader.LEADER_PROP)) { - replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP)); - } - // Move custom props over. - for (Map.Entry ent : oldReplica.getProperties().entrySet()) { - if (ent.getKey().startsWith(COLL_PROP_PREFIX)) { - replicaProps.put(ent.getKey(), ent.getValue()); - } - } - } - } - - // we don't put these in the clusterstate - replicaProps.remove(ZkStateReader.NUM_SHARDS_PROP); - replicaProps.remove(ZkStateReader.CORE_NODE_NAME_PROP); - replicaProps.remove(ZkStateReader.SHARD_ID_PROP); - replicaProps.remove(ZkStateReader.COLLECTION_PROP); - replicaProps.remove(QUEUE_OPERATION); - - // remove any props with null values - Set> entrySet = replicaProps.entrySet(); - List removeKeys = new ArrayList<>(); - for (Entry entry : entrySet) { - if (entry.getValue() == null) { - removeKeys.add(entry.getKey()); - } - } - for (String removeKey : removeKeys) { - replicaProps.remove(removeKey); - } - replicaProps.remove(ZkStateReader.CORE_NODE_NAME_PROP); - // remove shard specific properties - String shardRange = (String) replicaProps.remove(ZkStateReader.SHARD_RANGE_PROP); - String shardState = (String) replicaProps.remove(ZkStateReader.SHARD_STATE_PROP); - String shardParent = (String) replicaProps.remove(ZkStateReader.SHARD_PARENT_PROP); - Replica replica = new Replica(coreNodeName, replicaProps); - - // TODO: where do we get slice properties in this message? or should there be a separate create-slice message if we want that? - - Map sliceProps = null; - Map replicas; - - if (slice != null) { - clusterState = checkAndCompleteShardSplit(clusterState, collection, coreNodeName, sliceName, replicaProps); - // get the current slice again because it may have been updated due to checkAndCompleteShardSplit method - slice = clusterState.getSlice(collection, sliceName); - sliceProps = slice.getProperties(); - replicas = slice.getReplicasCopy(); - } else { - replicas = new HashMap<>(1); - sliceProps = new HashMap<>(); - sliceProps.put(Slice.RANGE, shardRange); - sliceProps.put(Slice.STATE, shardState); - sliceProps.put(Slice.PARENT, shardParent); - } - - replicas.put(replica.getName(), replica); - slice = new Slice(sliceName, replicas, sliceProps); - - ClusterState newClusterState = updateSlice(clusterState, collection, slice); - return newClusterState; - } - - - - private ClusterState checkAndCompleteShardSplit(ClusterState state, String collection, String coreNodeName, String sliceName, Map replicaProps) { - Slice slice = state.getSlice(collection, sliceName); - Map sliceProps = slice.getProperties(); - String sliceState = slice.getState(); - if (Slice.RECOVERY.equals(sliceState)) { - log.info("Shard: {} is in recovery state", sliceName); - // is this replica active? - if (ZkStateReader.ACTIVE.equals(replicaProps.get(ZkStateReader.STATE_PROP))) { - log.info("Shard: {} is in recovery state and coreNodeName: {} is active", sliceName, coreNodeName); - // are all other replicas also active? - boolean allActive = true; - for (Entry entry : slice.getReplicasMap().entrySet()) { - if (coreNodeName.equals(entry.getKey())) continue; - if (!Slice.ACTIVE.equals(entry.getValue().getStr(Slice.STATE))) { - allActive = false; - break; - } - } - if (allActive) { - log.info("Shard: {} - all replicas are active. Finding status of fellow sub-shards", sliceName); - // find out about other sub shards - Map allSlicesCopy = new HashMap<>(state.getSlicesMap(collection)); - List subShardSlices = new ArrayList<>(); - outer: - for (Entry entry : allSlicesCopy.entrySet()) { - if (sliceName.equals(entry.getKey())) - continue; - Slice otherSlice = entry.getValue(); - if (Slice.RECOVERY.equals(otherSlice.getState())) { - if (slice.getParent() != null && slice.getParent().equals(otherSlice.getParent())) { - log.info("Shard: {} - Fellow sub-shard: {} found", sliceName, otherSlice.getName()); - // this is a fellow sub shard so check if all replicas are active - for (Entry sliceEntry : otherSlice.getReplicasMap().entrySet()) { - if (!ZkStateReader.ACTIVE.equals(sliceEntry.getValue().getStr(ZkStateReader.STATE_PROP))) { - allActive = false; - break outer; - } - } - log.info("Shard: {} - Fellow sub-shard: {} has all replicas active", sliceName, otherSlice.getName()); - subShardSlices.add(otherSlice); - } - } - } - if (allActive) { - // hurray, all sub shard replicas are active - log.info("Shard: {} - All replicas across all fellow sub-shards are now ACTIVE. Preparing to switch shard states.", sliceName); - String parentSliceName = (String) sliceProps.remove(Slice.PARENT); - - Map propMap = new HashMap<>(); - propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate"); - propMap.put(parentSliceName, Slice.INACTIVE); - propMap.put(sliceName, Slice.ACTIVE); - for (Slice subShardSlice : subShardSlices) { - propMap.put(subShardSlice.getName(), Slice.ACTIVE); - } - propMap.put(ZkStateReader.COLLECTION_PROP, collection); - ZkNodeProps m = new ZkNodeProps(propMap); - state = updateShardState(state, m); - } - } - } - } - return state; - } - - private ClusterState createCollection(ClusterState state, String collectionName, List shards , ZkNodeProps message) { - log.info("Create collection {} with shards {}", collectionName, shards); - - Map routerSpec = DocRouter.getRouterSpec(message); - String routerName = routerSpec.get("name") == null ? DocRouter.DEFAULT_NAME : (String) routerSpec.get("name"); - DocRouter router = DocRouter.getDocRouter(routerName); - - List ranges = router.partitionRange(shards.size(), router.fullRange()); - - - - Map newSlices = new LinkedHashMap<>(); - - for (int i = 0; i < shards.size(); i++) { - String sliceName = shards.get(i); - - Map sliceProps = new LinkedHashMap<>(1); - sliceProps.put(Slice.RANGE, ranges == null? null: ranges.get(i)); - - newSlices.put(sliceName, new Slice(sliceName, null, sliceProps)); - } - - // TODO: fill in with collection properties read from the /collections/ node - Map collectionProps = new HashMap<>(); - - for (Entry e : OverseerCollectionProcessor.COLL_PROPS.entrySet()) { - Object val = message.get(e.getKey()); - if(val == null){ - val = OverseerCollectionProcessor.COLL_PROPS.get(e.getKey()); - } - if(val != null) collectionProps.put(e.getKey(),val); - } - collectionProps.put(DocCollection.DOC_ROUTER, routerSpec); - - if (message.getStr("fromApi") == null) { - collectionProps.put("autoCreated", "true"); - } - - String znode = message.getInt(DocCollection.STATE_FORMAT, 1) == 1 ? null - : ZkStateReader.getCollectionPath(collectionName); - - DocCollection newCollection = new DocCollection(collectionName, - newSlices, collectionProps, router, -1, znode); - - isClusterStateModified = true; - - log.info("state version {} {}", collectionName, newCollection.getStateFormat()); - - return newState(state, singletonMap(newCollection.getName(), newCollection)); - } - - /* - * Return an already assigned id or null if not assigned - */ - private String getAssignedId(final ClusterState state, final String nodeName, - final ZkNodeProps coreState) { - Collection slices = state.getSlices(coreState.getStr(ZkStateReader.COLLECTION_PROP)); - if (slices != null) { - for (Slice slice : slices) { - if (slice.getReplicasMap().get(nodeName) != null) { - return slice.getName(); - } - } - } - return null; - } - - private String getAssignedCoreNodeName(ClusterState state, ZkNodeProps message) { - Collection slices = state.getSlices(message.getStr(ZkStateReader.COLLECTION_PROP)); - if (slices != null) { - for (Slice slice : slices) { - for (Replica replica : slice.getReplicas()) { - String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP); - String core = replica.getStr(ZkStateReader.CORE_NAME_PROP); - - String msgNodeName = message.getStr(ZkStateReader.NODE_NAME_PROP); - String msgCore = message.getStr(ZkStateReader.CORE_NAME_PROP); - - if (nodeName.equals(msgNodeName) && core.equals(msgCore)) { - return replica.getName(); - } - } - } - } - return null; - } private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) { - // System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates())); - // System.out.println("Updating slice:" + slice); DocCollection newCollection = null; DocCollection coll = state.getCollectionOrNull(collectionName) ; Map slices; @@ -1228,62 +629,6 @@ public class Overseer implements Closeable { return newState(state, singletonMap(collectionName, newCollection)); } - private ClusterState setShardLeader(ClusterState state, String collectionName, String sliceName, String leaderUrl) { - DocCollection coll = state.getCollectionOrNull(collectionName); - - if(coll == null) { - log.error("Could not mark shard leader for non existing collection:" + collectionName); - return state; - } - - Map slices = coll.getSlicesMap(); - // make a shallow copy and add it to the new collection - slices = new LinkedHashMap<>(slices); - - Slice slice = slices.get(sliceName); - if (slice == null) { - slice = coll.getSlice(sliceName); - } - - if (slice == null) { - log.error("Could not mark leader for non existing/active slice:" + sliceName); - return state; - } else { - // TODO: consider just putting the leader property on the shard, not on individual replicas - - Replica oldLeader = slice.getLeader(); - - final Map newReplicas = new LinkedHashMap<>(); - - for (Replica replica : slice.getReplicas()) { - - // TODO: this should only be calculated once and cached somewhere? - String coreURL = ZkCoreNodeProps.getCoreUrl(replica.getStr(ZkStateReader.BASE_URL_PROP), replica.getStr(ZkStateReader.CORE_NAME_PROP)); - - if (replica == oldLeader && !coreURL.equals(leaderUrl)) { - Map replicaProps = new LinkedHashMap<>(replica.getProperties()); - replicaProps.remove(Slice.LEADER); - replica = new Replica(replica.getName(), replicaProps); - } else if (coreURL.equals(leaderUrl)) { - Map replicaProps = new LinkedHashMap<>(replica.getProperties()); - replicaProps.put(Slice.LEADER, "true"); // TODO: allow booleans instead of strings - replica = new Replica(replica.getName(), replicaProps); - } - - newReplicas.put(replica.getName(), replica); - } - - Map newSliceProps = slice.shallowCopy(); - newSliceProps.put(Slice.REPLICAS, newReplicas); - Slice newSlice = new Slice(slice.getName(), newReplicas, slice.getProperties()); - slices.put(newSlice.getName(), newSlice); - } - - - DocCollection newCollection = coll.copyWithSlices(slices); - return newState(state, singletonMap(collectionName, newCollection)); - } - private ClusterState newState(ClusterState state, Map colls) { for (Entry e : colls.entrySet()) { DocCollection c = e.getValue(); @@ -1309,118 +654,21 @@ public class Overseer implements Closeable { /* * Remove collection from cloudstate */ - private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) { - final String collection = message.getStr("name"); - if (!checkKeyExistence(message, "name")) return clusterState; - DocCollection coll = clusterState.getCollectionOrNull(collection); - if(coll == null) return clusterState; - - isClusterStateModified = true; - if (coll.getStateFormat() > 1) { - try { - log.info("Deleting state for collection : {}", collection); - zkClient.delete(ZkStateReader.getCollectionPath(collection), -1, true); - } catch (Exception e) { - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to remove collection state :" + collection); - } - } - return newState(clusterState, singletonMap(coll.getName(),(DocCollection) null)); + private ZkWriteCommand removeCollection(final ClusterState clusterState, ZkNodeProps message) { + return new ClusterStateMutator(getZkStateReader()).deleteCollection(clusterState, message); } /* * Remove collection slice from cloudstate */ - private ClusterState removeShard(final ClusterState clusterState, ZkNodeProps message) { - final String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP); - final String collection = message.getStr(ZkStateReader.COLLECTION_PROP); - if (!checkCollectionKeyExistence(message)) return clusterState; - - log.info("Removing collection: {}, shard: {} from cluster state", collection, sliceId); - - DocCollection coll = clusterState.getCollection(collection); - - Map newSlices = new LinkedHashMap<>(coll.getSlicesMap()); - newSlices.remove(sliceId); - - DocCollection newCollection = coll.copyWithSlices(newSlices); - return newState(clusterState, singletonMap(collection,newCollection)); + private ZkWriteCommand removeShard(final ClusterState clusterState, ZkNodeProps message) { + return new CollectionMutator(getZkStateReader()).deleteShard(clusterState, message); } /* * Remove core from cloudstate */ - private ClusterState removeCore(final ClusterState clusterState, ZkNodeProps message) { - final String cnn = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP); - final String collection = message.getStr(ZkStateReader.COLLECTION_PROP); - if (!checkCollectionKeyExistence(message)) return clusterState; - - DocCollection coll = clusterState.getCollectionOrNull(collection) ; - if (coll == null) { - // TODO: log/error that we didn't find it? - // just in case, remove the zk collection node - try { - zkClient.clean("/collections/" + collection); - } catch (InterruptedException e) { - SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collection, e); - Thread.currentThread().interrupt(); - } catch (KeeperException e) { - SolrException.log(log, "Problem cleaning up collection in zk:" + collection, e); - } - return clusterState; - } - - Map newSlices = new LinkedHashMap<>(); - boolean lastSlice = false; - for (Slice slice : coll.getSlices()) { - Replica replica = slice.getReplica(cnn); - if (replica != null) { - Map newReplicas = slice.getReplicasCopy(); - newReplicas.remove(cnn); - // TODO TODO TODO!!! if there are no replicas left for the slice, and the slice has no hash range, remove it - // if (newReplicas.size() == 0 && slice.getRange() == null) { - // if there are no replicas left for the slice remove it - if (newReplicas.size() == 0) { - slice = null; - lastSlice = true; - } else { - slice = new Slice(slice.getName(), newReplicas, slice.getProperties()); - } - } - - if (slice != null) { - newSlices.put(slice.getName(), slice); - } - } - - if (lastSlice) { - // remove all empty pre allocated slices - for (Slice slice : coll.getSlices()) { - if (slice.getReplicas().size() == 0) { - newSlices.remove(slice.getName()); - } - } - } - - // if there are no slices left in the collection, remove it? - if (newSlices.size() == 0) { - - // TODO: it might be better logically to have this in ZkController - // but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or - // ZkController out of the Overseer. - try { - zkClient.clean("/collections/" + collection); - } catch (InterruptedException e) { - SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collection, e); - Thread.currentThread().interrupt(); - } catch (KeeperException e) { - SolrException.log(log, "Problem cleaning up collection in zk:" + collection, e); - } - return newState(clusterState,singletonMap(collection, (DocCollection) null)); - - } else { - DocCollection newCollection = coll.copyWithSlices(newSlices); - return newState(clusterState,singletonMap(collection,newCollection)); - } - + private ZkWriteCommand removeCore(final ClusterState clusterState, ZkNodeProps message) { + return new SliceMutator(getZkStateReader()).removeReplica(clusterState, message); } @Override @@ -1471,7 +719,7 @@ public class Overseer implements Closeable { Boolean shardUnique = Boolean.parseBoolean(message.getStr(SHARD_UNIQUE)); if (shardUnique == false && - Overseer.sliceUniqueBooleanProperties.contains(this.property) == false) { + SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(this.property) == false) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Balancing properties amongst replicas in a slice requires that" + " the property be a pre-defined property (e.g. 'preferredLeader') or that 'shardUnique' be set to 'true' " + " Property: " + this.property + " shardUnique: " + Boolean.toString(shardUnique)); @@ -1844,7 +1092,10 @@ public class Overseer implements Closeable { ccThread.start(); arfoThread.start(); } - + + public Stats getStats() { + return stats; + } /** * For tests. diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java index 1a15830def7..88ee2b7d7b5 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java @@ -68,6 +68,8 @@ import org.apache.solr.client.solrj.response.UpdateResponse; import org.apache.solr.cloud.Assign.Node; import org.apache.solr.cloud.DistributedQueue.QueueEvent; import org.apache.solr.cloud.Overseer.LeaderStatus; +import org.apache.solr.cloud.overseer.ClusterStateMutator; +import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.Aliases; @@ -467,7 +469,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { } //now ask the current leader to QUIT , so that the designate can takeover Overseer.getInQueue(zkStateReader.getZkClient()).offer( - ZkStateReader.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.QUIT.toLower(), + ZkStateReader.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(), "id",getLeaderId(zkStateReader.getZkClient())))); } @@ -698,7 +700,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { SolrZkClient zkClient = zkStateReader.getZkClient(); DistributedQueue inQueue = Overseer.getInQueue(zkClient); Map propMap = new HashMap<>(); - propMap.put(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.LEADER.toLower()); + propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower()); propMap.put(COLLECTION_PROP, collectionName); propMap.put(SHARD_ID_PROP, shardId); propMap.put(BASE_URL_PROP, baseURL); @@ -1148,7 +1150,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { private void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws KeeperException, InterruptedException { ZkNodeProps m = new ZkNodeProps( - Overseer.QUEUE_OPERATION, Overseer.OverseerAction.DELETECORE.toLower(), + Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, core, ZkStateReader.NODE_NAME_PROP, replica.getStr(ZkStateReader.NODE_NAME_PROP), ZkStateReader.COLLECTION_PROP, collectionName, @@ -1760,7 +1762,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { log.info("Replication factor is 1 so switching shard states"); DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient()); Map propMap = new HashMap<>(); - propMap.put(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.UPDATESHARDSTATE.toLower()); + propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower()); propMap.put(slice, Slice.INACTIVE); for (String subSlice : subSlices) { propMap.put(subSlice, Slice.ACTIVE); @@ -1772,7 +1774,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { log.info("Requesting shard state be set to 'recovery'"); DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient()); Map propMap = new HashMap<>(); - propMap.put(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.UPDATESHARDSTATE.toLower()); + propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower()); for (String subSlice : subSlices) { propMap.put(subSlice, Slice.RECOVERY); } @@ -2062,7 +2064,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { completeAsyncRequest(asyncId, requestMap, results); ZkNodeProps m = new ZkNodeProps( - Overseer.QUEUE_OPERATION, Overseer.OverseerAction.ADDROUTINGRULE.toLower(), + Overseer.QUEUE_OPERATION, OverseerAction.ADDROUTINGRULE.toLower(), COLLECTION_PROP, sourceCollection.getName(), SHARD_ID_PROP, sourceSlice.getName(), "routeKey", SolrIndexSplitter.getRouteKey(splitKey) + "!", @@ -2315,10 +2317,10 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { String router = message.getStr("router.name", DocRouter.DEFAULT_NAME); List shardNames = new ArrayList<>(); if(ImplicitDocRouter.NAME.equals(router)){ - Overseer.getShardNames(shardNames, message.getStr("shards",null)); + ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null)); numSlices = shardNames.size(); } else { - Overseer.getShardNames(numSlices,shardNames); + ClusterStateMutator.getShardNames(numSlices, shardNames); } if (numSlices == null ) { diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index b6655687493..9202e766da9 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -46,6 +46,8 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState; +import org.apache.solr.cloud.overseer.OverseerAction; +import org.apache.solr.cloud.overseer.SliceMutator; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.BeforeReconnect; @@ -838,7 +840,7 @@ public final class ZkController { boolean joinAtHead = false; Replica replica = zkStateReader.getClusterState().getReplica(desc.getCloudDescriptor().getCollectionName(), coreZkNodeName); if (replica != null) { - joinAtHead = replica.getBool(Overseer.preferredLeaderProp, false); + joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false); } joinElection(desc, afterExpiration, joinAtHead); } catch (InterruptedException e) { @@ -1214,7 +1216,7 @@ public final class ZkController { } if (removeWatch) zkStateReader.removeZKWatch(collection); ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, - Overseer.OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, coreName, + OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, coreName, ZkStateReader.NODE_NAME_PROP, getNodeName(), ZkStateReader.COLLECTION_PROP, cloudDescriptor.getCollectionName(), ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName); diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java new file mode 100644 index 00000000000..9885817da2c --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java @@ -0,0 +1,188 @@ +package org.apache.solr.cloud.overseer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.solr.cloud.OverseerCollectionProcessor; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.DocRouter; +import org.apache.solr.common.cloud.ImplicitDocRouter; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Collections.singletonMap; + +public class ClusterStateMutator { + private static Logger log = LoggerFactory.getLogger(ClusterStateMutator.class); + + protected final ZkStateReader zkStateReader; + + public ClusterStateMutator(ZkStateReader zkStateReader) { + this.zkStateReader = zkStateReader; + } + + public ZkWriteCommand createCollection(ClusterState clusterState, ZkNodeProps message) { + String cName = message.getStr("name"); + log.info("building a new cName: " + cName); + if (clusterState.hasCollection(cName)) { + log.warn("Collection {} already exists. exit", cName); + return ZkStateWriter.NO_OP; + } + + ArrayList shards = new ArrayList<>(); + + if (ImplicitDocRouter.NAME.equals(message.getStr("router.name", DocRouter.DEFAULT_NAME))) { + getShardNames(shards, message.getStr("shards", DocRouter.DEFAULT_NAME)); + } else { + int numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, -1); + if (numShards < 1) + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "numShards is a required parameter for 'compositeId' router"); + getShardNames(numShards, shards); + } + + Map routerSpec = DocRouter.getRouterSpec(message); + String routerName = routerSpec.get("name") == null ? DocRouter.DEFAULT_NAME : (String) routerSpec.get("name"); + DocRouter router = DocRouter.getDocRouter(routerName); + + List ranges = router.partitionRange(shards.size(), router.fullRange()); + + + Map newSlices = new LinkedHashMap<>(); + + for (int i = 0; i < shards.size(); i++) { + String sliceName = shards.get(i); + + Map sliceProps = new LinkedHashMap<>(1); + sliceProps.put(Slice.RANGE, ranges == null ? null : ranges.get(i)); + + newSlices.put(sliceName, new Slice(sliceName, null, sliceProps)); + } + + Map collectionProps = new HashMap<>(); + + for (Map.Entry e : OverseerCollectionProcessor.COLL_PROPS.entrySet()) { + Object val = message.get(e.getKey()); + if (val == null) { + val = OverseerCollectionProcessor.COLL_PROPS.get(e.getKey()); + } + if (val != null) collectionProps.put(e.getKey(), val); + } + collectionProps.put(DocCollection.DOC_ROUTER, routerSpec); + + if (message.getStr("fromApi") == null) { + collectionProps.put("autoCreated", "true"); + } + + String znode = message.getInt(DocCollection.STATE_FORMAT, 1) == 1 ? null + : ZkStateReader.getCollectionPath(cName); + + DocCollection newCollection = new DocCollection(cName, + newSlices, collectionProps, router, -1, znode); + + return new ZkWriteCommand(cName, newCollection); + } + + public ZkWriteCommand deleteCollection(ClusterState clusterState, ZkNodeProps message) { + final String collection = message.getStr("name"); + if (!CollectionMutator.checkKeyExistence(message, "name")) return ZkStateWriter.NO_OP; + DocCollection coll = clusterState.getCollectionOrNull(collection); + if (coll == null) return ZkStateWriter.NO_OP; + + return new ZkWriteCommand(coll.getName(), null); + } + + public static ClusterState newState(ClusterState state, String name, DocCollection collection) { + ClusterState newClusterState = null; + if (collection == null) { + newClusterState = state.copyWith(singletonMap(name, (DocCollection) null)); + } else { + newClusterState = state.copyWith(singletonMap(name, collection)); + } + return newClusterState; + } + + public static void getShardNames(Integer numShards, List shardNames) { + if (numShards == null) + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "numShards" + " is a required param"); + for (int i = 0; i < numShards; i++) { + final String sliceName = "shard" + (i + 1); + shardNames.add(sliceName); + } + + } + + public static void getShardNames(List shardNames, String shards) { + if (shards == null) + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "shards" + " is a required param"); + for (String s : shards.split(",")) { + if (s == null || s.trim().isEmpty()) continue; + shardNames.add(s.trim()); + } + if (shardNames.isEmpty()) + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "shards" + " is a required param"); + } + + /* + * Return an already assigned id or null if not assigned + */ + public static String getAssignedId(final ClusterState state, final String nodeName, + final ZkNodeProps coreState) { + Collection slices = state.getSlices(coreState.getStr(ZkStateReader.COLLECTION_PROP)); + if (slices != null) { + for (Slice slice : slices) { + if (slice.getReplicasMap().get(nodeName) != null) { + return slice.getName(); + } + } + } + return null; + } + + public static String getAssignedCoreNodeName(ClusterState state, ZkNodeProps message) { + Collection slices = state.getSlices(message.getStr(ZkStateReader.COLLECTION_PROP)); + if (slices != null) { + for (Slice slice : slices) { + for (Replica replica : slice.getReplicas()) { + String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP); + String core = replica.getStr(ZkStateReader.CORE_NAME_PROP); + + String msgNodeName = message.getStr(ZkStateReader.NODE_NAME_PROP); + String msgCore = message.getStr(ZkStateReader.CORE_NAME_PROP); + + if (nodeName.equals(msgNodeName) && core.equals(msgCore)) { + return replica.getName(); + } + } + } + } + return null; + } +} + diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java new file mode 100644 index 00000000000..e7578c443e3 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java @@ -0,0 +1,119 @@ +package org.apache.solr.cloud.overseer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.ImplicitDocRouter; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CollectionMutator { + private static Logger log = LoggerFactory.getLogger(CollectionMutator.class); + + protected final ZkStateReader zkStateReader; + + public CollectionMutator(ZkStateReader zkStateReader) { + this.zkStateReader = zkStateReader; + } + + public ZkWriteCommand createShard(final ClusterState clusterState, ZkNodeProps message) { + String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP); + if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP; + String shardId = message.getStr(ZkStateReader.SHARD_ID_PROP); + Slice slice = clusterState.getSlice(collectionName, shardId); + if (slice == null) { + Map replicas = Collections.EMPTY_MAP; + Map sliceProps = new HashMap<>(); + String shardRange = message.getStr(ZkStateReader.SHARD_RANGE_PROP); + String shardState = message.getStr(ZkStateReader.SHARD_STATE_PROP); + String shardParent = message.getStr(ZkStateReader.SHARD_PARENT_PROP); + sliceProps.put(Slice.RANGE, shardRange); + sliceProps.put(Slice.STATE, shardState); + if (shardParent != null) { + sliceProps.put(Slice.PARENT, shardParent); + } + DocCollection collection = updateSlice(collectionName, + clusterState.getCollection(collectionName), new Slice(shardId, replicas, sliceProps)); + return new ZkWriteCommand(collectionName, collection); + } else { + log.error("Unable to create Shard: " + shardId + " because it already exists in collection: " + collectionName); + return ZkStateWriter.NO_OP; + } + } + + public ZkWriteCommand deleteShard(final ClusterState clusterState, ZkNodeProps message) { + final String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP); + final String collection = message.getStr(ZkStateReader.COLLECTION_PROP); + if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP; + + log.info("Removing collection: " + collection + " shard: " + sliceId + " from clusterstate"); + + DocCollection coll = clusterState.getCollection(collection); + + Map newSlices = new LinkedHashMap<>(coll.getSlicesMap()); + newSlices.remove(sliceId); + + DocCollection newCollection = coll.copyWithSlices(newSlices); + return new ZkWriteCommand(collection, newCollection); + } + + public static DocCollection updateSlice(String collectionName, DocCollection collection, Slice slice) { + DocCollection newCollection = null; + Map slices; + + if (collection == null) { + // when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself + // without explicitly creating a collection. In this current case, we assume custom sharding with an "implicit" router. + slices = new LinkedHashMap<>(1); + slices.put(slice.getName(), slice); + Map props = new HashMap<>(1); + props.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name", ImplicitDocRouter.NAME)); + newCollection = new DocCollection(collectionName, slices, props, new ImplicitDocRouter()); + } else { + slices = new LinkedHashMap<>(collection.getSlicesMap()); // make a shallow copy + slices.put(slice.getName(), slice); + newCollection = collection.copyWithSlices(slices); + } + + return newCollection; + } + + static boolean checkCollectionKeyExistence(ZkNodeProps message) { + return checkKeyExistence(message, ZkStateReader.COLLECTION_PROP); + } + + static boolean checkKeyExistence(ZkNodeProps message, String key) { + String value = message.getStr(key); + if (value == null || value.trim().length() == 0) { + log.error("Skipping invalid Overseer message because it has no " + key + " specified: " + message); + return false; + } + return true; + } +} + diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java b/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java new file mode 100644 index 00000000000..9a2c70bd356 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java @@ -0,0 +1,57 @@ +package org.apache.solr.cloud.overseer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Locale; + +import org.apache.solr.cloud.Overseer; + +/** + * Enum of actions supported by the overseer only. + * + * There are other actions supported which are public and defined + * in {@link org.apache.solr.common.params.CollectionParams.CollectionAction} + */ +public enum OverseerAction { + LEADER, + DELETECORE, + ADDROUTINGRULE, + REMOVEROUTINGRULE, + UPDATESHARDSTATE, + STATE, + QUIT; + + public static OverseerAction get(String p) { + if (p != null) { + try { + return OverseerAction.valueOf(p.toUpperCase(Locale.ROOT)); + } catch (Exception ex) { + } + } + return null; + } + + public boolean isEqual(String s) { + return s != null && toString().equals(s.toUpperCase(Locale.ROOT)); + } + + public String toLower() { + return toString().toLowerCase(Locale.ROOT); + } +} + diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java new file mode 100644 index 00000000000..bfa25e6a800 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java @@ -0,0 +1,422 @@ +package org.apache.solr.cloud.overseer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.apache.solr.cloud.Assign; +import org.apache.solr.cloud.Overseer; +import org.apache.solr.cloud.OverseerCollectionProcessor; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX; +import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence; +import static org.apache.solr.cloud.overseer.CollectionMutator.checkKeyExistence; + +public class ReplicaMutator { + private static Logger log = LoggerFactory.getLogger(ReplicaMutator.class); + + protected final ZkStateReader zkStateReader; + + public ReplicaMutator(ZkStateReader reader) { + this.zkStateReader = reader; + } + + protected Replica setProperty(Replica replica, String key, String value) { + assert key != null; + assert value != null; + + if (StringUtils.equalsIgnoreCase(replica.getStr(key), value)) + return replica; // already the value we're going to set + + Map replicaProps = new LinkedHashMap<>(replica.getProperties()); + replicaProps.put(key, value); + return new Replica(replica.getName(), replicaProps); + } + + protected Replica unsetProperty(Replica replica, String key) { + assert key != null; + + if (!replica.containsKey(key)) return replica; + Map replicaProps = new LinkedHashMap<>(replica.getProperties()); + replicaProps.remove(key); + return new Replica(replica.getName(), replicaProps); + } + + protected Replica setLeader(Replica replica) { + return setProperty(replica, ZkStateReader.LEADER_PROP, "true"); + } + + protected Replica unsetLeader(Replica replica) { + return unsetProperty(replica, ZkStateReader.LEADER_PROP); + } + + protected Replica setState(Replica replica, String state) { + assert state != null; + + return setProperty(replica, ZkStateReader.STATE_PROP, state); + } + + public ZkWriteCommand addReplicaProperty(ClusterState clusterState, ZkNodeProps message) { + if (checkKeyExistence(message, ZkStateReader.COLLECTION_PROP) == false || + checkKeyExistence(message, ZkStateReader.SHARD_ID_PROP) == false || + checkKeyExistence(message, ZkStateReader.REPLICA_PROP) == false || + checkKeyExistence(message, ZkStateReader.PROPERTY_PROP) == false || + checkKeyExistence(message, ZkStateReader.PROPERTY_VALUE_PROP) == false) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Overseer SETREPLICAPROPERTY requires " + + ZkStateReader.COLLECTION_PROP + " and " + ZkStateReader.SHARD_ID_PROP + " and " + + ZkStateReader.REPLICA_PROP + " and " + ZkStateReader.PROPERTY_PROP + " and " + + ZkStateReader.PROPERTY_VALUE_PROP + " no action taken."); + } + + String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP); + String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP); + String replicaName = message.getStr(ZkStateReader.REPLICA_PROP); + String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT); + if (StringUtils.startsWith(property, COLL_PROP_PREFIX) == false) { + property = OverseerCollectionProcessor.COLL_PROP_PREFIX + property; + } + property = property.toLowerCase(Locale.ROOT); + String propVal = message.getStr(ZkStateReader.PROPERTY_VALUE_PROP); + String shardUnique = message.getStr(OverseerCollectionProcessor.SHARD_UNIQUE); + + boolean isUnique = false; + + if (SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(property)) { + if (StringUtils.isNotBlank(shardUnique) && Boolean.parseBoolean(shardUnique) == false) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Overseer SETREPLICAPROPERTY for " + + property + " cannot have " + OverseerCollectionProcessor.SHARD_UNIQUE + " set to anything other than" + + "'true'. No action taken"); + } + isUnique = true; + } else { + isUnique = Boolean.parseBoolean(shardUnique); + } + + Replica replica = clusterState.getReplica(collectionName, replicaName); + + if (replica == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection/slice/replica " + + collectionName + "/" + sliceName + "/" + replicaName + " no action taken."); + } + log.info("Setting property " + property + " with value: " + propVal + + " for collection: " + collectionName + ". Full message: " + message); + if (StringUtils.equalsIgnoreCase(replica.getStr(property), propVal)) return ZkStateWriter.NO_OP; // already the value we're going to set + + // OK, there's no way we won't change the cluster state now + Map replicas = clusterState.getSlice(collectionName, sliceName).getReplicasCopy(); + if (isUnique == false) { + replicas.get(replicaName).getProperties().put(property, propVal); + } else { // Set prop for this replica, but remove it for all others. + for (Replica rep : replicas.values()) { + if (rep.getName().equalsIgnoreCase(replicaName)) { + rep.getProperties().put(property, propVal); + } else { + rep.getProperties().remove(property); + } + } + } + Slice newSlice = new Slice(sliceName, replicas, clusterState.getSlice(collectionName, sliceName).shallowCopy()); + DocCollection newCollection = CollectionMutator.updateSlice(collectionName, clusterState.getCollection(collectionName), + newSlice); + return new ZkWriteCommand(collectionName, newCollection); + } + + public ZkWriteCommand removeReplicaProperty(ClusterState clusterState, ZkNodeProps message) { + if (checkKeyExistence(message, ZkStateReader.COLLECTION_PROP) == false || + checkKeyExistence(message, ZkStateReader.SHARD_ID_PROP) == false || + checkKeyExistence(message, ZkStateReader.REPLICA_PROP) == false || + checkKeyExistence(message, ZkStateReader.PROPERTY_PROP) == false) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Overseer DELETEREPLICAPROPERTY requires " + + ZkStateReader.COLLECTION_PROP + " and " + ZkStateReader.SHARD_ID_PROP + " and " + + ZkStateReader.REPLICA_PROP + " and " + ZkStateReader.PROPERTY_PROP + " no action taken."); + } + String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP); + String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP); + String replicaName = message.getStr(ZkStateReader.REPLICA_PROP); + String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT); + if (StringUtils.startsWith(property, COLL_PROP_PREFIX) == false) { + property = OverseerCollectionProcessor.COLL_PROP_PREFIX + property; + } + + Replica replica = clusterState.getReplica(collectionName, replicaName); + + if (replica == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection/slice/replica " + + collectionName + "/" + sliceName + "/" + replicaName + " no action taken."); + } + + log.info("Deleting property " + property + " for collection: " + collectionName + + " slice " + sliceName + " replica " + replicaName + ". Full message: " + message); + String curProp = replica.getStr(property); + if (curProp == null) return ZkStateWriter.NO_OP; // not there anyway, nothing to do. + + log.info("Deleting property " + property + " for collection: " + collectionName + + " slice " + sliceName + " replica " + replicaName + ". Full message: " + message); + DocCollection collection = clusterState.getCollection(collectionName); + Slice slice = collection.getSlice(sliceName); + DocCollection newCollection = SliceMutator.updateReplica(collection, + slice, replicaName, unsetProperty(replica, property)); + return new ZkWriteCommand(collectionName, newCollection); + } + + public ZkWriteCommand setState(ClusterState clusterState, ZkNodeProps message) { + if (Overseer.isLegacy(zkStateReader.getClusterProps())) { + return updateState(clusterState, message); + } else { + return updateStateNew(clusterState, message); + } + } + + protected ZkWriteCommand updateState(final ClusterState prevState, ZkNodeProps message) { + final String cName = message.getStr(ZkStateReader.COLLECTION_PROP); + if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP; + Integer numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, null); + log.info("Update state numShards={} message={}", numShards, message); + + List shardNames = new ArrayList<>(); + + ZkWriteCommand writeCommand = null; + ClusterState newState = null; + + //collection does not yet exist, create placeholders if num shards is specified + boolean collectionExists = prevState.hasCollection(cName); + if (!collectionExists && numShards != null) { + ClusterStateMutator.getShardNames(numShards, shardNames); + Map createMsg = ZkNodeProps.makeMap("name", cName); + createMsg.putAll(message.getProperties()); + writeCommand = new ClusterStateMutator(zkStateReader).createCollection(prevState, new ZkNodeProps(createMsg)); + DocCollection collection = writeCommand.collection; + newState = ClusterStateMutator.newState(prevState, cName, collection); + } + return updateState(newState != null ? newState : prevState, + message, cName, numShards, collectionExists); + } + + private ZkWriteCommand updateState(final ClusterState prevState, ZkNodeProps message, String collectionName, Integer numShards, boolean collectionExists) { + String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP); + + String coreNodeName = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP); + if (coreNodeName == null) { + coreNodeName = ClusterStateMutator.getAssignedCoreNodeName(prevState, message); + if (coreNodeName != null) { + log.info("node=" + coreNodeName + " is already registered"); + } else { + // if coreNodeName is null, auto assign one + coreNodeName = Assign.assignNode(collectionName, prevState); + } + message.getProperties().put(ZkStateReader.CORE_NODE_NAME_PROP, + coreNodeName); + } + + // use the provided non null shardId + if (sliceName == null) { + //get shardId from ClusterState + sliceName = ClusterStateMutator.getAssignedId(prevState, coreNodeName, message); + if (sliceName != null) { + log.info("shard=" + sliceName + " is already registered"); + } + } + if (sliceName == null) { + //request new shardId + if (collectionExists) { + // use existing numShards + numShards = prevState.getCollection(collectionName).getSlices().size(); + log.info("Collection already exists with " + ZkStateReader.NUM_SHARDS_PROP + "=" + numShards); + } + sliceName = Assign.assignShard(collectionName, prevState, numShards); + log.info("Assigning new node to shard shard=" + sliceName); + } + + Slice slice = prevState.getSlice(collectionName, sliceName); + + Map replicaProps = new LinkedHashMap<>(); + + replicaProps.putAll(message.getProperties()); + if (slice != null) { + Replica oldReplica = slice.getReplicasMap().get(coreNodeName); + if (oldReplica != null) { + if (oldReplica.containsKey(ZkStateReader.LEADER_PROP)) { + replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP)); + } + // Move custom props over. + for (Map.Entry ent : oldReplica.getProperties().entrySet()) { + if (ent.getKey().startsWith(COLL_PROP_PREFIX)) { + replicaProps.put(ent.getKey(), ent.getValue()); + } + } + } + } + + // we don't put these in the clusterstate + replicaProps.remove(ZkStateReader.NUM_SHARDS_PROP); + replicaProps.remove(ZkStateReader.CORE_NODE_NAME_PROP); + replicaProps.remove(ZkStateReader.SHARD_ID_PROP); + replicaProps.remove(ZkStateReader.COLLECTION_PROP); + replicaProps.remove(Overseer.QUEUE_OPERATION); + + // remove any props with null values + Set> entrySet = replicaProps.entrySet(); + List removeKeys = new ArrayList<>(); + for (Map.Entry entry : entrySet) { + if (entry.getValue() == null) { + removeKeys.add(entry.getKey()); + } + } + for (String removeKey : removeKeys) { + replicaProps.remove(removeKey); + } + replicaProps.remove(ZkStateReader.CORE_NODE_NAME_PROP); + // remove shard specific properties + String shardRange = (String) replicaProps.remove(ZkStateReader.SHARD_RANGE_PROP); + String shardState = (String) replicaProps.remove(ZkStateReader.SHARD_STATE_PROP); + String shardParent = (String) replicaProps.remove(ZkStateReader.SHARD_PARENT_PROP); + + + Replica replica = new Replica(coreNodeName, replicaProps); + + Map sliceProps = null; + Map replicas; + + DocCollection collection = prevState.getCollectionOrNull(collectionName); + if (slice != null) { + collection = prevState.getCollection(collectionName); + collection = checkAndCompleteShardSplit(prevState, collection, coreNodeName, sliceName, replicaProps); + // get the current slice again because it may have been updated due to checkAndCompleteShardSplit method + slice = collection.getSlice(sliceName); + sliceProps = slice.getProperties(); + replicas = slice.getReplicasCopy(); + } else { + replicas = new HashMap<>(1); + sliceProps = new HashMap<>(); + sliceProps.put(Slice.RANGE, shardRange); + sliceProps.put(Slice.STATE, shardState); + sliceProps.put(Slice.PARENT, shardParent); + } + + replicas.put(replica.getName(), replica); + slice = new Slice(sliceName, replicas, sliceProps); + + DocCollection newCollection = CollectionMutator.updateSlice(collectionName, collection, slice); + return new ZkWriteCommand(collectionName, newCollection); + } + + /** + * Handles non-legacy state updates + */ + protected ZkWriteCommand updateStateNew(ClusterState clusterState, final ZkNodeProps message) { + String collection = message.getStr(ZkStateReader.COLLECTION_PROP); + if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP; + String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP); + + if (collection == null || sliceName == null) { + log.error("Invalid collection and slice {}", message); + return ZkStateWriter.NO_OP; + } + Slice slice = clusterState.getSlice(collection, sliceName); + if (slice == null) { + log.error("No such slice exists {}", message); + return ZkStateWriter.NO_OP; + } + + return updateState(clusterState, message); + } + + private DocCollection checkAndCompleteShardSplit(ClusterState prevState, DocCollection collection, String coreNodeName, String sliceName, Map replicaProps) { + Slice slice = collection.getSlice(sliceName); + Map sliceProps = slice.getProperties(); + String sliceState = slice.getState(); + if (Slice.RECOVERY.equals(sliceState)) { + log.info("Shard: {} is in recovery state", sliceName); + // is this replica active? + if (ZkStateReader.ACTIVE.equals(replicaProps.get(ZkStateReader.STATE_PROP))) { + log.info("Shard: {} is in recovery state and coreNodeName: {} is active", sliceName, coreNodeName); + // are all other replicas also active? + boolean allActive = true; + for (Map.Entry entry : slice.getReplicasMap().entrySet()) { + if (coreNodeName.equals(entry.getKey())) continue; + if (!Slice.ACTIVE.equals(entry.getValue().getStr(Slice.STATE))) { + allActive = false; + break; + } + } + if (allActive) { + log.info("Shard: {} - all replicas are active. Finding status of fellow sub-shards", sliceName); + // find out about other sub shards + Map allSlicesCopy = new HashMap<>(collection.getSlicesMap()); + List subShardSlices = new ArrayList<>(); + outer: + for (Map.Entry entry : allSlicesCopy.entrySet()) { + if (sliceName.equals(entry.getKey())) + continue; + Slice otherSlice = entry.getValue(); + if (Slice.RECOVERY.equals(otherSlice.getState())) { + if (slice.getParent() != null && slice.getParent().equals(otherSlice.getParent())) { + log.info("Shard: {} - Fellow sub-shard: {} found", sliceName, otherSlice.getName()); + // this is a fellow sub shard so check if all replicas are active + for (Map.Entry sliceEntry : otherSlice.getReplicasMap().entrySet()) { + if (!ZkStateReader.ACTIVE.equals(sliceEntry.getValue().getStr(ZkStateReader.STATE_PROP))) { + allActive = false; + break outer; + } + } + log.info("Shard: {} - Fellow sub-shard: {} has all replicas active", sliceName, otherSlice.getName()); + subShardSlices.add(otherSlice); + } + } + } + if (allActive) { + // hurray, all sub shard replicas are active + log.info("Shard: {} - All replicas across all fellow sub-shards are now ACTIVE. Preparing to switch shard states.", sliceName); + String parentSliceName = (String) sliceProps.remove(Slice.PARENT); + + Map propMap = new HashMap<>(); + propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate"); + propMap.put(parentSliceName, Slice.INACTIVE); + propMap.put(sliceName, Slice.ACTIVE); + for (Slice subShardSlice : subShardSlices) { + propMap.put(subShardSlice.getName(), Slice.ACTIVE); + } + propMap.put(ZkStateReader.COLLECTION_PROP, collection.getName()); + ZkNodeProps m = new ZkNodeProps(propMap); + return new SliceMutator(zkStateReader).updateShardState(prevState, m).collection; + } + } + } + } + return collection; + } +} + diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java new file mode 100644 index 00000000000..0c7ce9e2c44 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java @@ -0,0 +1,281 @@ +package org.apache.solr.cloud.overseer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; +import org.apache.solr.cloud.Assign; +import org.apache.solr.cloud.Overseer; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.RoutingRule; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkCoreNodeProps; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX; +import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence; +import static org.apache.solr.common.cloud.ZkNodeProps.makeMap; + +public class SliceMutator { + private static Logger log = LoggerFactory.getLogger(SliceMutator.class); + + public static final String PREFERRED_LEADER_PROP = COLL_PROP_PREFIX + "preferredleader"; + + public static final Set SLICE_UNIQUE_BOOLEAN_PROPERTIES = ImmutableSet.of(PREFERRED_LEADER_PROP); + + + protected final ZkStateReader zkStateReader; + + public SliceMutator(ZkStateReader zkStateReader) { + this.zkStateReader = zkStateReader; + } + + public ZkWriteCommand addReplica(ClusterState clusterState, ZkNodeProps message) { + log.info("createReplica() {} ", message); + String coll = message.getStr(ZkStateReader.COLLECTION_PROP); + if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP; + String slice = message.getStr(ZkStateReader.SHARD_ID_PROP); + DocCollection collection = clusterState.getCollection(coll); + Slice sl = collection.getSlice(slice); + if (sl == null) { + log.error("Invalid Collection/Slice {}/{} ", coll, slice); + return ZkStateWriter.NO_OP; + } + + String coreNodeName = Assign.assignNode(coll, clusterState); + Replica replica = new Replica(coreNodeName, + makeMap( + ZkStateReader.CORE_NAME_PROP, message.getStr(ZkStateReader.CORE_NAME_PROP), + ZkStateReader.BASE_URL_PROP, message.getStr(ZkStateReader.BASE_URL_PROP), + ZkStateReader.STATE_PROP, message.getStr(ZkStateReader.STATE_PROP))); + return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica)); + } + + public ZkWriteCommand removeReplica(ClusterState clusterState, ZkNodeProps message) { + final String cnn = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP); + final String collection = message.getStr(ZkStateReader.COLLECTION_PROP); + if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP; + + DocCollection coll = clusterState.getCollectionOrNull(collection); + if (coll == null) { + // make sure we delete the zk nodes for this collection just to be safe + return new ZkWriteCommand(collection, null); + } + + Map newSlices = new LinkedHashMap<>(); + boolean lastSlice = false; + + for (Slice slice : coll.getSlices()) { + Replica replica = slice.getReplica(cnn); + if (replica != null) { + Map newReplicas = slice.getReplicasCopy(); + newReplicas.remove(cnn); + // TODO TODO TODO!!! if there are no replicas left for the slice, and the slice has no hash range, remove it + // if (newReplicas.size() == 0 && slice.getRange() == null) { + // if there are no replicas left for the slice remove it + if (newReplicas.size() == 0) { + slice = null; + lastSlice = true; + } else { + slice = new Slice(slice.getName(), newReplicas, slice.getProperties()); + } + } + + if (slice != null) { + newSlices.put(slice.getName(), slice); + } + } + + if (lastSlice) { + // remove all empty pre allocated slices + for (Slice slice : coll.getSlices()) { + if (slice.getReplicas().size() == 0) { + newSlices.remove(slice.getName()); + } + } + } + + // if there are no slices left in the collection, remove it? + if (newSlices.size() == 0) { + return new ClusterStateMutator(zkStateReader).deleteCollection(clusterState, + new ZkNodeProps(ZkNodeProps.makeMap("name", collection))); + } else { + return new ZkWriteCommand(collection, coll.copyWithSlices(newSlices)); + } + } + + public ZkWriteCommand setShardLeader(ClusterState clusterState, ZkNodeProps message) { + StringBuilder sb = new StringBuilder(); + String baseUrl = message.getStr(ZkStateReader.BASE_URL_PROP); + String coreName = message.getStr(ZkStateReader.CORE_NAME_PROP); + sb.append(baseUrl); + if (baseUrl != null && !baseUrl.endsWith("/")) sb.append("/"); + sb.append(coreName == null ? "" : coreName); + if (!(sb.substring(sb.length() - 1).equals("/"))) sb.append("/"); + String leaderUrl = sb.length() > 0 ? sb.toString() : null; + + String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP); + String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP); + DocCollection coll = clusterState.getCollectionOrNull(collectionName); + + if (coll == null) { + log.error("Could not mark shard leader for non existing collection:" + collectionName); + return ZkStateWriter.NO_OP; + } + + Map slices = coll.getSlicesMap(); + Slice slice = slices.get(sliceName); + + Replica oldLeader = slice.getLeader(); + final Map newReplicas = new LinkedHashMap<>(); + for (Replica replica : slice.getReplicas()) { + // TODO: this should only be calculated once and cached somewhere? + String coreURL = ZkCoreNodeProps.getCoreUrl(replica.getStr(ZkStateReader.BASE_URL_PROP), replica.getStr(ZkStateReader.CORE_NAME_PROP)); + + if (replica == oldLeader && !coreURL.equals(leaderUrl)) { + replica = new ReplicaMutator(zkStateReader).unsetLeader(replica); + } else if (coreURL.equals(leaderUrl)) { + replica = new ReplicaMutator(zkStateReader).setLeader(replica); + } + + newReplicas.put(replica.getName(), replica); + } + + Map newSliceProps = slice.shallowCopy(); + newSliceProps.put(Slice.REPLICAS, newReplicas); + slice = new Slice(slice.getName(), newReplicas, slice.getProperties()); + return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice)); + } + + public ZkWriteCommand updateShardState(ClusterState clusterState, ZkNodeProps message) { + String collection = message.getStr(ZkStateReader.COLLECTION_PROP); + if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP; + log.info("Update shard state invoked for collection: " + collection + " with message: " + message); + + Map slicesCopy = new LinkedHashMap<>(clusterState.getSlicesMap(collection)); + for (String key : message.keySet()) { + if (ZkStateReader.COLLECTION_PROP.equals(key)) continue; + if (Overseer.QUEUE_OPERATION.equals(key)) continue; + + Slice slice = clusterState.getSlice(collection, key); + if (slice == null) { + throw new RuntimeException("Overseer.updateShardState unknown collection: " + collection + " slice: " + key); + } + log.info("Update shard state " + key + " to " + message.getStr(key)); + Map props = slice.shallowCopy(); + if (Slice.RECOVERY.equals(props.get(Slice.STATE)) && Slice.ACTIVE.equals(message.getStr(key))) { + props.remove(Slice.PARENT); + } + props.put(Slice.STATE, message.getStr(key)); + Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props); + slicesCopy.put(slice.getName(), newSlice); + } + + return new ZkWriteCommand(collection, clusterState.getCollection(collection).copyWithSlices(slicesCopy)); + } + + public ZkWriteCommand addRoutingRule(final ClusterState clusterState, ZkNodeProps message) { + String collection = message.getStr(ZkStateReader.COLLECTION_PROP); + if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP; + String shard = message.getStr(ZkStateReader.SHARD_ID_PROP); + String routeKey = message.getStr("routeKey"); + String range = message.getStr("range"); + String targetCollection = message.getStr("targetCollection"); + String targetShard = message.getStr("targetShard"); + String expireAt = message.getStr("expireAt"); + + Slice slice = clusterState.getSlice(collection, shard); + if (slice == null) { + throw new RuntimeException("Overseer.addRoutingRule unknown collection: " + collection + " slice:" + shard); + } + + Map routingRules = slice.getRoutingRules(); + if (routingRules == null) + routingRules = new HashMap<>(); + RoutingRule r = routingRules.get(routeKey); + if (r == null) { + Map map = new HashMap<>(); + map.put("routeRanges", range); + map.put("targetCollection", targetCollection); + map.put("expireAt", expireAt); + RoutingRule rule = new RoutingRule(routeKey, map); + routingRules.put(routeKey, rule); + } else { + // add this range + Map map = r.shallowCopy(); + map.put("routeRanges", map.get("routeRanges") + "," + range); + map.put("expireAt", expireAt); + routingRules.put(routeKey, new RoutingRule(routeKey, map)); + } + + Map props = slice.shallowCopy(); + props.put("routingRules", routingRules); + + Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props); + return new ZkWriteCommand(collection, + CollectionMutator.updateSlice(collection, clusterState.getCollection(collection), newSlice)); + } + + public ZkWriteCommand removeRoutingRule(final ClusterState clusterState, ZkNodeProps message) { + String collection = message.getStr(ZkStateReader.COLLECTION_PROP); + if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP; + String shard = message.getStr(ZkStateReader.SHARD_ID_PROP); + String routeKeyStr = message.getStr("routeKey"); + + log.info("Overseer.removeRoutingRule invoked for collection: " + collection + + " shard: " + shard + " routeKey: " + routeKeyStr); + + Slice slice = clusterState.getSlice(collection, shard); + if (slice == null) { + log.warn("Unknown collection: " + collection + " shard: " + shard); + return ZkStateWriter.NO_OP; + } + Map routingRules = slice.getRoutingRules(); + if (routingRules != null) { + routingRules.remove(routeKeyStr); // no rules left + Map props = slice.shallowCopy(); + props.put("routingRules", routingRules); + Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props); + return new ZkWriteCommand(collection, + CollectionMutator.updateSlice(collection, clusterState.getCollection(collection), newSlice)); + } + + return ZkStateWriter.NO_OP; + } + + public static DocCollection updateReplica(DocCollection collection, final Slice slice, String coreNodeName, final Replica replica) { + Map copy = slice.getReplicasCopy(); + if (replica == null) { + copy.remove(coreNodeName); + } else { + copy.put(replica.getName(), replica); + } + Slice newSlice = new Slice(slice.getName(), copy, slice.getProperties()); + return CollectionMutator.updateSlice(collection.getName(), collection, newSlice); + } +} + diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java new file mode 100644 index 00000000000..aa61535342b --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java @@ -0,0 +1,151 @@ +package org.apache.solr.cloud.overseer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.solr.cloud.Overseer; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.util.stats.TimerContext; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Collections.singletonMap; + +public class ZkStateWriter { + private static Logger log = LoggerFactory.getLogger(ZkStateWriter.class); + public static ZkWriteCommand NO_OP = new ZkWriteCommand(); + + protected final ZkStateReader reader; + protected final Overseer.Stats stats; + + protected Map updates = new HashMap<>(); + protected ClusterState clusterState = null; + protected boolean isClusterStateModified = false; + protected long lastUpdatedTime = -1; + + public ZkStateWriter(ZkStateReader zkStateReader, Overseer.Stats stats) { + assert zkStateReader != null; + + this.reader = zkStateReader; + this.stats = stats; + } + + public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd) { + if (cmd == NO_OP) return prevState; + + if (cmd.collection == null) { + isClusterStateModified = true; + clusterState = prevState.copyWith(singletonMap(cmd.name, (DocCollection) null)); + updates.put(cmd.name, null); + } else { + if (cmd.collection.getStateFormat() > 1) { + updates.put(cmd.name, cmd.collection); + } else { + isClusterStateModified = true; + } + clusterState = prevState.copyWith(singletonMap(cmd.name, cmd.collection)); + } + return clusterState; + } + + public boolean hasPendingUpdates() { + return !updates.isEmpty() || isClusterStateModified; + } + + public ClusterState writePendingUpdates() throws KeeperException, InterruptedException { + if (!hasPendingUpdates()) throw new IllegalStateException("No queued updates to execute"); + TimerContext timerContext = stats.time("update_state"); + boolean success = false; + try { + if (!updates.isEmpty()) { + for (Map.Entry entry : updates.entrySet()) { + String name = entry.getKey(); + String path = ZkStateReader.getCollectionPath(name); + DocCollection c = entry.getValue(); + + if (c == null) { + // let's clean up the collections path for this collection + reader.getZkClient().clean("/collections/" + name); + } else if (c.getStateFormat() > 1) { + byte[] data = ZkStateReader.toJSON(new ClusterState(-1, Collections.emptySet(), singletonMap(c.getName(), c))); + if (reader.getZkClient().exists(path, true)) { + assert c.getZNodeVersion() >= 0; + log.info("going to update_collection {}", path); + Stat stat = reader.getZkClient().setData(path, data, c.getZNodeVersion(), true); + DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), stat.getVersion(), path); + clusterState = clusterState.copyWith(singletonMap(name, newCollection)); + } else { + log.info("going to create_collection {}", path); + reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true); + DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), 0, path); + clusterState = clusterState.copyWith(singletonMap(name, newCollection)); + isClusterStateModified = true; + } + } else if (c.getStateFormat() == 1) { + isClusterStateModified = true; + } + } + + updates.clear(); + } + + if (isClusterStateModified) { + assert clusterState.getZkClusterStateVersion() >= 0; + lastUpdatedTime = System.nanoTime(); + byte[] data = ZkStateReader.toJSON(clusterState); + Stat stat = reader.getZkClient().setData(ZkStateReader.CLUSTER_STATE, data, clusterState.getZkClusterStateVersion(), true); + Set collectionNames = clusterState.getCollections(); + Map collectionStates = new HashMap<>(collectionNames.size()); + for (String c : collectionNames) { + collectionStates.put(c, clusterState.getCollection(c)); + } + // use the reader's live nodes because our cluster state's live nodes may be stale + clusterState = new ClusterState(stat.getVersion(), reader.getClusterState().getLiveNodes(), collectionStates); + isClusterStateModified = false; + } + success = true; + } finally { + timerContext.stop(); + if (success) { + stats.success("update_state"); + } else { + stats.error("update_state"); + } + } + + return clusterState; + } + + public long getLastUpdatedTime() { + return lastUpdatedTime; + } + + public ClusterState getClusterState() { + return clusterState; + } +} + diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java new file mode 100644 index 00000000000..1ffc5cd3677 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java @@ -0,0 +1,49 @@ +package org.apache.solr.cloud.overseer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; + +/** +* Created by shalin on 29/10/14. +*/ +public class ZkWriteCommand { + public final String name; + public final DocCollection collection; +// public final ClusterState state; + public final boolean noop; + + public ZkWriteCommand(String name, DocCollection collection) { + this.name = name; + this.collection = collection; +// this.state = state; + this.noop = false; + } + + /** + * Returns a no-op + */ + public ZkWriteCommand() { + this.noop = true; + this.name = null; + this.collection = null; +// this.state = null; + } +} + diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/package.html b/solr/core/src/java/org/apache/solr/cloud/overseer/package.html new file mode 100644 index 00000000000..402c61cd2ee --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/package.html @@ -0,0 +1,29 @@ + + + + + + + + +

+ Classes for updating cluster state in SolrCloud mode. +

+ + + diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index e9829f8bfd0..ec769a2444a 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -83,6 +83,7 @@ import org.apache.solr.cloud.DistributedQueue.QueueEvent; import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.OverseerCollectionProcessor; import org.apache.solr.cloud.OverseerSolrResponse; +import org.apache.solr.cloud.overseer.SliceMutator; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.ClusterState; @@ -301,7 +302,7 @@ public class CollectionsHandler extends RequestHandlerBase { for (Slice slice : dc.getSlices()) { for (Replica replica : slice.getReplicas()) { // Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already - if (replica.getBool(Overseer.preferredLeaderProp, false) == false) { + if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false) { continue; } if (StringUtils.equalsIgnoreCase(replica.getStr(STATE_PROP), ACTIVE) == false) { @@ -442,7 +443,7 @@ public class CollectionsHandler extends RequestHandlerBase { // Check if we're trying to set a property with parameters that allow us to set the property on multiple replicas // in a slice on properties that are known to only be one-per-slice and error out if so. if (StringUtils.isNotBlank((String)map.get(SHARD_UNIQUE)) && - Overseer.sliceUniqueBooleanProperties.contains(property.toLowerCase(Locale.ROOT)) && + SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(property.toLowerCase(Locale.ROOT)) && uniquePerSlice == false) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Overseer replica property command received for property " + property + @@ -472,7 +473,7 @@ public class CollectionsHandler extends RequestHandlerBase { } if (shardUnique == false && - Overseer.sliceUniqueBooleanProperties.contains(prop) == false) { + SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(prop) == false) { throw new SolrException(ErrorCode.BAD_REQUEST, "Balancing properties amongst replicas in a slice requires that" + " the property be pre-defined as a unique property (e.g. 'preferredLeader') or that 'shardUnique' be set to 'true'. " + " Property: " + prop + " shardUnique: " + Boolean.toString(shardUnique)); diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 39daf72cb61..82526d098a0 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -47,6 +47,7 @@ import org.apache.solr.cloud.DistributedQueue; import org.apache.solr.cloud.LeaderInitiatedRecoveryThread; import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.ZkController; +import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrInputDocument; @@ -77,7 +78,6 @@ import org.apache.solr.request.SolrRequestInfo; import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.schema.IndexSchema; import org.apache.solr.schema.SchemaField; -import org.apache.solr.schema.TrieDateField; import org.apache.solr.update.AddUpdateCommand; import org.apache.solr.update.CommitUpdateCommand; import org.apache.solr.update.DeleteUpdateCommand; @@ -542,7 +542,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { if (ruleExpiryLock.tryLock(10, TimeUnit.MILLISECONDS)) { log.info("Going to expire routing rule"); try { - Map map = ZkNodeProps.makeMap(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.REMOVEROUTINGRULE.toLower(), + Map map = ZkNodeProps.makeMap(Overseer.QUEUE_OPERATION, OverseerAction.REMOVEROUTINGRULE.toLower(), ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.SHARD_ID_PROP, myShardId, "routeKey", routeKey + "!"); diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java index cdd3972cc1f..0004d8d3fa7 100644 --- a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java @@ -21,6 +21,7 @@ import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.Slice; @@ -157,7 +158,7 @@ public class DeleteShardTest extends AbstractFullDistribZkTestBase { KeeperException, InterruptedException { DistributedQueue inQueue = Overseer.getInQueue(cloudClient.getZkStateReader().getZkClient()); Map propMap = new HashMap<>(); - propMap.put(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.UPDATESHARDSTATE.toLower()); + propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower()); propMap.put(slice, state); propMap.put(ZkStateReader.COLLECTION_PROP, "collection1"); ZkNodeProps m = new ZkNodeProps(propMap); diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java index 84b96bbd9a0..cdfed1c13bc 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java @@ -38,6 +38,7 @@ import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.impl.CloudSolrServer; import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; @@ -106,7 +107,7 @@ public class OverseerRolesTest extends AbstractFullDistribZkTestBase{ Map m = (Map) ZkStateReader.fromJSON(data); String s = (String) m.get("id"); String leader = LeaderElector.getNodeName(s); - Overseer.getInQueue(zk).offer(ZkStateReader.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.QUIT.toLower()))); + Overseer.getInQueue(zk).offer(ZkStateReader.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower()))); long timeout = System.currentTimeMillis()+10000; String newLeader=null; for(;System.currentTimeMillis() < timeout;){ diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java index 625e481ee5a..9a3b71bd24e 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java @@ -36,6 +36,7 @@ import javax.xml.parsers.ParserConfigurationException; import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; @@ -43,10 +44,14 @@ import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CollectionParams; import org.apache.solr.handler.component.HttpShardHandlerFactory; import org.apache.solr.update.UpdateShardHandler; import org.apache.solr.util.DefaultSolrThreadFactory; import org.apache.solr.util.MockConfigSolr; +import org.apache.solr.util.stats.Snapshot; +import org.apache.solr.util.stats.Timer; +import org.apache.solr.util.stats.TimerContext; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; @@ -54,6 +59,7 @@ import org.apache.zookeeper.KeeperException.NodeExistsException; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.xml.sax.SAXException; @@ -112,7 +118,7 @@ public class OverseerTest extends SolrTestCaseJ4 { if (ec != null) { ec.cancelElection(); } - ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.DELETECORE.toLower(), + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(), ZkStateReader.NODE_NAME_PROP, nodeName, ZkStateReader.CORE_NAME_PROP, coreName, ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName, @@ -121,7 +127,7 @@ public class OverseerTest extends SolrTestCaseJ4 { q.offer(ZkStateReader.toJSON(m)); return null; } else { - ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.STATE.toLower(), + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), ZkStateReader.STATE_PROP, stateName, ZkStateReader.NODE_NAME_PROP, nodeName, ZkStateReader.CORE_NAME_PROP, coreName, @@ -525,7 +531,7 @@ public class OverseerTest extends SolrTestCaseJ4 { DistributedQueue q = Overseer.getInQueue(zkClient); - ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.STATE.toLower(), + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr", ZkStateReader.NODE_NAME_PROP, "node1", ZkStateReader.COLLECTION_PROP, "collection1", @@ -878,6 +884,133 @@ public class OverseerTest extends SolrTestCaseJ4 { } } + @Test + @Ignore + public void testPerformance() throws Exception { + String zkDir = createTempDir("OverseerTest.testPerformance").toFile().getAbsolutePath(); + + ZkTestServer server = new ZkTestServer(zkDir); + + SolrZkClient controllerClient = null; + SolrZkClient overseerClient = null; + ZkStateReader reader = null; + MockZKController mockController = null; + + try { + server.run(); + controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT); + + AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost()); + AbstractZkTestCase.makeSolrZkNode(server.getZkHost()); + controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true); + + reader = new ZkStateReader(controllerClient); + reader.createClusterStateWatchersAndUpdate(); + + mockController = new MockZKController(server.getZkAddress(), "node1"); + + for (int i=0; i<100; i++) { + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), + "name", "perf" + i, + ZkStateReader.NUM_SHARDS_PROP, "1", + "stateFormat", "2", + ZkStateReader.REPLICATION_FACTOR, "1", + ZkStateReader.MAX_SHARDS_PER_NODE, "1" + ); + DistributedQueue q = Overseer.getInQueue(controllerClient); + q.offer(ZkStateReader.toJSON(m)); + controllerClient.makePath("/collections/perf" + i, true); + } + + for (int i = 0, j = 0, k = 0; i < 20000; i++, j++, k++) { + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), + ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING, + ZkStateReader.NODE_NAME_PROP, "node1", + ZkStateReader.CORE_NAME_PROP, "core" + k, + ZkStateReader.CORE_NODE_NAME_PROP, "node1", + ZkStateReader.COLLECTION_PROP, "perf" + j, + ZkStateReader.NUM_SHARDS_PROP, "1", + ZkStateReader.BASE_URL_PROP, "http://" + "node1" + + "/solr/"); + DistributedQueue q = Overseer.getInQueue(controllerClient); + q.offer(ZkStateReader.toJSON(m)); + if (j == 99) j = 0; + if (k == 9) k = 0; + if (i > 0 && i % 100 == 0) log.info("Published {} items", i); + } + + // let's publish a sentinel collection which we'll use to wait for overseer to complete operations + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), + ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE, + ZkStateReader.NODE_NAME_PROP, "node1", + ZkStateReader.CORE_NAME_PROP, "core1", + ZkStateReader.CORE_NODE_NAME_PROP, "node1", + ZkStateReader.COLLECTION_PROP, "perf_sentinel", + ZkStateReader.NUM_SHARDS_PROP, "1", + ZkStateReader.BASE_URL_PROP, "http://" + "node1" + + "/solr/"); + DistributedQueue q = Overseer.getInQueue(controllerClient); + q.offer(ZkStateReader.toJSON(m)); + + Timer t = new Timer(); + TimerContext context = t.time(); + try { + overseerClient = electNewOverseer(server.getZkAddress()); + assertTrue(overseers.size() > 0); + + while (true) { + reader.updateClusterState(true); + ClusterState state = reader.getClusterState(); + if (state.hasCollection("perf_sentinel")) { + break; + } + Thread.sleep(1000); + } + } finally { + context.stop(); + } + + log.info("Overseer loop finished processing: "); + printTimingStats(t); + + Overseer overseer = overseers.get(0); + Overseer.Stats stats = overseer.getStats(); + + String[] interestingOps = {"state", "update_state", "am_i_leader", ""}; + Arrays.sort(interestingOps); + for (Map.Entry entry : stats.getStats().entrySet()) { + String op = entry.getKey(); + if (Arrays.binarySearch(interestingOps, op) < 0) + continue; + Overseer.Stat stat = entry.getValue(); + log.info("op: {}, success: {}, failure: {}", op, stat.success.get(), stat.errors.get()); + Timer timer = stat.requestTime; + printTimingStats(timer); + } + + } finally { + close(overseerClient); + close(mockController); + close(controllerClient); + close(reader); + server.shutdown(); + } + } + + private void printTimingStats(Timer timer) { + Snapshot snapshot = timer.getSnapshot(); + log.info("\t totalTime: {}", timer.getSum()); + log.info("\t avgRequestsPerMinute: {}", timer.getMeanRate()); + log.info("\t 5minRateRequestsPerMinute: {}", timer.getFiveMinuteRate()); + log.info("\t 15minRateRequestsPerMinute: {}", timer.getFifteenMinuteRate()); + log.info("\t avgTimePerRequest: {}", timer.getMean()); + log.info("\t medianRequestTime: {}", snapshot.getMedian()); + log.info("\t 75thPctlRequestTime: {}", snapshot.get75thPercentile()); + log.info("\t 95thPctlRequestTime: {}", snapshot.get95thPercentile()); + log.info("\t 99thPctlRequestTime: {}", snapshot.get99thPercentile()); + log.info("\t 999thPctlRequestTime: {}", snapshot.get999thPercentile()); + } + private void close(MockZKController mockController) { if (mockController != null) { mockController.close(); @@ -928,7 +1061,7 @@ public class OverseerTest extends SolrTestCaseJ4 { //submit to proper queue queue = Overseer.getInQueue(zkClient); - m = new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.STATE.toLower(), + m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr", ZkStateReader.NODE_NAME_PROP, "node1", ZkStateReader.SHARD_ID_PROP, "s1", diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/TestClusterStateMutator.java b/solr/core/src/test/org/apache/solr/cloud/overseer/TestClusterStateMutator.java new file mode 100644 index 00000000000..073c53238d4 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/overseer/TestClusterStateMutator.java @@ -0,0 +1,70 @@ +package org.apache.solr.cloud.overseer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Collections; + +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.cloud.MockZkStateReader; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.ImplicitDocRouter; +import org.apache.solr.common.cloud.ZkNodeProps; + +public class TestClusterStateMutator extends SolrTestCaseJ4 { + public void testCreateCollection() throws Exception { + ClusterState state = new ClusterState(-1, Collections.emptySet(), Collections.emptyMap()); + MockZkStateReader zkStateReader = new MockZkStateReader(state, Collections.emptySet()); + + ClusterState clusterState = zkStateReader.getClusterState(); + ClusterStateMutator mutator = new ClusterStateMutator(zkStateReader); + ZkNodeProps message = new ZkNodeProps(ZkNodeProps.makeMap( + "name", "xyz", + "numShards", "1" + )); + ZkWriteCommand cmd = mutator.createCollection(clusterState, message); + DocCollection collection = cmd.collection; + assertEquals("xyz", collection.getName()); + assertEquals(1, collection.getSlicesMap().size()); + assertEquals(1, collection.getMaxShardsPerNode()); + + state = new ClusterState(-1, Collections.emptySet(), Collections.singletonMap("xyz", collection)); + message = new ZkNodeProps(ZkNodeProps.makeMap( + "name", "abc", + "numShards", "2", + "router.name", "implicit", + "shards", "x,y", + "replicationFactor", "3", + "maxShardsPerNode", "4" + )); + cmd = mutator.createCollection(state, message); + collection = cmd.collection; + assertEquals("abc", collection.getName()); + assertEquals(2, collection.getSlicesMap().size()); + assertNotNull(collection.getSlicesMap().get("x")); + assertNotNull(collection.getSlicesMap().get("y")); + assertNull(collection.getSlicesMap().get("x").getRange()); + assertNull(collection.getSlicesMap().get("y").getRange()); + assertEquals("active", collection.getSlicesMap().get("x").getState()); + assertEquals("active", collection.getSlicesMap().get("y").getState()); + assertEquals(4, collection.getMaxShardsPerNode()); + assertEquals(ImplicitDocRouter.class, collection.getRouter().getClass()); + assertNotNull(state.getCollectionOrNull("xyz")); // we still have the old collection + } +} + diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java index 7563060d14b..3a21738ab28 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java @@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory; public class ClusterState implements JSONWriter.Writable { private static Logger log = LoggerFactory.getLogger(ClusterState.class); - private Integer znodeVersion; + private final Integer znodeVersion; private final Map collectionStates; private Set liveNodes; diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java index 2c4b55c40ae..aadf00a4b19 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java @@ -35,7 +35,7 @@ public class DocCollection extends ZkNodeProps { public static final String DOC_ROUTER = "router"; public static final String SHARDS = "shards"; public static final String STATE_FORMAT = "stateFormat"; - private int znodeVersion; + private int znodeVersion = -1; // sentinel private final String name; private final Map slices;