diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index b1808c2bd32..838fd50a3c0 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -384,6 +384,9 @@ Other Changes * SOLR-6414: Update to Hadoop 2.6.0. (Mark Miller) +* SOLR-6673: MDC based logging of collection, shard, replica, core + (Ishan Chattopadhyaya , Noble Paul) + ================== 5.0.0 ================== Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index 2c12ebb0363..605781a6336 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -15,6 +15,7 @@ import org.apache.solr.common.util.RetryUtil; import org.apache.solr.common.util.RetryUtil.RetryCmd; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.SolrCore; +import org.apache.solr.logging.MDCUtils; import org.apache.solr.search.SolrIndexSearcher; import org.apache.solr.update.UpdateLog; import org.apache.solr.util.RefCounted; @@ -24,10 +25,12 @@ import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NodeExistsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -113,7 +116,9 @@ class ShardLeaderElectionContextBase extends ElectionContext { this.zkClient = zkStateReader.getZkClient(); this.shardId = shardId; this.collection = collection; - + + Map previousMDCContext = MDC.getCopyOfContextMap(); + MDCUtils.setMDC(collection, shardId, null, null); try { new ZkCmdExecutor(zkStateReader.getZkClient().getZkClientTimeout()) .ensureExists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, @@ -123,6 +128,8 @@ class ShardLeaderElectionContextBase extends ElectionContext { } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new SolrException(ErrorCode.SERVER_ERROR, e); + } finally { + MDCUtils.cleanupMDC(previousMDCContext); } } diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java index 9c345061d28..4c101814c22 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java @@ -61,6 +61,7 @@ import org.apache.solr.handler.component.ShardHandler; import org.apache.solr.handler.component.ShardHandlerFactory; import org.apache.solr.handler.component.ShardRequest; import org.apache.solr.handler.component.ShardResponse; +import org.apache.solr.logging.MDCUtils; import org.apache.solr.update.SolrIndexSplitter; import org.apache.solr.util.DefaultSolrThreadFactory; import org.apache.solr.util.stats.Snapshot; @@ -71,6 +72,7 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import java.io.Closeable; import java.io.IOException; @@ -1059,57 +1061,64 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { String collectionName = message.getStr(COLLECTION_PROP); String shard = message.getStr(SHARD_ID_PROP); String replicaName = message.getStr(REPLICA_PROP); - DocCollection coll = clusterState.getCollection(collectionName); - Slice slice = coll.getSlice(shard); - ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); - if(slice==null){ - throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid shard name : "+shard+" in collection : "+ collectionName); - } - Replica replica = slice.getReplica(replicaName); - if(replica == null){ - ArrayList l = new ArrayList<>(); - for (Replica r : slice.getReplicas()) l.add(r.getName()); - throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid replica : " + replicaName + " in shard/collection : " - + shard + "/"+ collectionName + " available replicas are "+ StrUtils.join(l,',')); - } - - // If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true - // on the command. - if (Boolean.parseBoolean(message.getStr(ONLY_IF_DOWN)) && - ZkStateReader.DOWN.equals(replica.getStr(ZkStateReader.STATE_PROP)) == false) { - throw new SolrException(ErrorCode.BAD_REQUEST, "Attempted to remove replica : " + collectionName + "/" + - shard+"/" + replicaName + - " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'"); - } - - String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP); - String core = replica.getStr(ZkStateReader.CORE_NAME_PROP); - - // assume the core exists and try to unload it - Map m = ZkNodeProps.makeMap("qt", adminPath, CoreAdminParams.ACTION, - CoreAdminAction.UNLOAD.toString(), CoreAdminParams.CORE, core, - CoreAdminParams.DELETE_INSTANCE_DIR, "true", - CoreAdminParams.DELETE_DATA_DIR, "true"); - - ShardRequest sreq = new ShardRequest(); - sreq.purpose = 1; - sreq.shards = new String[] {baseUrl}; - sreq.actualShards = sreq.shards; - sreq.params = new ModifiableSolrParams(new MapSolrParams(m)); + Map previousMDCContext = MDC.getCopyOfContextMap(); + MDCUtils.setMDC(collectionName, shard, replicaName, null); try { - shardHandler.submit(sreq, baseUrl, sreq.params); - } catch (Exception e) { - log.warn("Exception trying to unload core " + sreq, e); - } - - collectShardResponses(!Slice.ACTIVE.equals(replica.getStr(Slice.STATE)) ? new NamedList() : results, - false, null, shardHandler); - - if (waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return;//check if the core unload removed the corenode zk enry - deleteCoreNode(collectionName, replicaName, replica, core); // try and ensure core info is removed from clusterstate - if(waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return; + DocCollection coll = clusterState.getCollection(collectionName); + Slice slice = coll.getSlice(shard); + ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); + if (slice == null) { + throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid shard name : " + shard + " in collection : " + collectionName); + } + Replica replica = slice.getReplica(replicaName); + if (replica == null) { + ArrayList l = new ArrayList<>(); + for (Replica r : slice.getReplicas()) l.add(r.getName()); + throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid replica : " + replicaName + " in shard/collection : " + + shard + "/" + collectionName + " available replicas are " + StrUtils.join(l, ',')); + } - throw new SolrException(ErrorCode.SERVER_ERROR, "Could not remove replica : " + collectionName + "/" + shard+"/" + replicaName); + // If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true + // on the command. + if (Boolean.parseBoolean(message.getStr(ONLY_IF_DOWN)) && + ZkStateReader.DOWN.equals(replica.getStr(ZkStateReader.STATE_PROP)) == false) { + throw new SolrException(ErrorCode.BAD_REQUEST, "Attempted to remove replica : " + collectionName + "/" + + shard + "/" + replicaName + + " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'"); + } + + String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP); + String core = replica.getStr(ZkStateReader.CORE_NAME_PROP); + + // assume the core exists and try to unload it + Map m = ZkNodeProps.makeMap("qt", adminPath, CoreAdminParams.ACTION, + CoreAdminAction.UNLOAD.toString(), CoreAdminParams.CORE, core, + CoreAdminParams.DELETE_INSTANCE_DIR, "true", + CoreAdminParams.DELETE_DATA_DIR, "true"); + + ShardRequest sreq = new ShardRequest(); + sreq.purpose = 1; + sreq.shards = new String[]{baseUrl}; + sreq.actualShards = sreq.shards; + sreq.params = new ModifiableSolrParams(new MapSolrParams(m)); + try { + shardHandler.submit(sreq, baseUrl, sreq.params); + } catch (Exception e) { + log.warn("Exception trying to unload core " + sreq, e); + } + + collectShardResponses(!Slice.ACTIVE.equals(replica.getStr(Slice.STATE)) ? new NamedList() : results, + false, null, shardHandler); + + if (waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) + return;//check if the core unload removed the corenode zk enry + deleteCoreNode(collectionName, replicaName, replica, core); // try and ensure core info is removed from clusterstate + if (waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return; + + throw new SolrException(ErrorCode.SERVER_ERROR, "Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName); + } finally { + MDCUtils.cleanupMDC(previousMDCContext); + } } private boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException { @@ -1205,33 +1214,40 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { private void createAlias(Aliases aliases, ZkNodeProps message) { String aliasName = message.getStr("name"); String collections = message.getStr("collections"); - - Map> newAliasesMap = new HashMap<>(); - Map newCollectionAliasesMap = new HashMap<>(); - Map prevColAliases = aliases.getCollectionAliasMap(); - if (prevColAliases != null) { - newCollectionAliasesMap.putAll(prevColAliases); - } - newCollectionAliasesMap.put(aliasName, collections); - newAliasesMap.put("collection", newCollectionAliasesMap); - Aliases newAliases = new Aliases(newAliasesMap); - byte[] jsonBytes = null; - if (newAliases.collectionAliasSize() > 0) { // only sub map right now - jsonBytes = ZkStateReader.toJSON(newAliases.getAliasMap()); - } + + Map previousMDCContext = MDC.getCopyOfContextMap(); + MDCUtils.setCollection(aliasName); + try { - zkStateReader.getZkClient().setData(ZkStateReader.ALIASES, - jsonBytes, true); - - checkForAlias(aliasName, collections); - // some fudge for other nodes - Thread.sleep(100); - } catch (KeeperException e) { - log.error("", e); - throw new SolrException(ErrorCode.SERVER_ERROR, e); - } catch (InterruptedException e) { - log.warn("", e); - throw new SolrException(ErrorCode.SERVER_ERROR, e); + Map> newAliasesMap = new HashMap<>(); + Map newCollectionAliasesMap = new HashMap<>(); + Map prevColAliases = aliases.getCollectionAliasMap(); + if (prevColAliases != null) { + newCollectionAliasesMap.putAll(prevColAliases); + } + newCollectionAliasesMap.put(aliasName, collections); + newAliasesMap.put("collection", newCollectionAliasesMap); + Aliases newAliases = new Aliases(newAliasesMap); + byte[] jsonBytes = null; + if (newAliases.collectionAliasSize() > 0) { // only sub map right now + jsonBytes = ZkStateReader.toJSON(newAliases.getAliasMap()); + } + try { + zkStateReader.getZkClient().setData(ZkStateReader.ALIASES, + jsonBytes, true); + + checkForAlias(aliasName, collections); + // some fudge for other nodes + Thread.sleep(100); + } catch (KeeperException e) { + log.error("", e); + throw new SolrException(ErrorCode.SERVER_ERROR, e); + } catch (InterruptedException e) { + log.warn("", e); + throw new SolrException(ErrorCode.SERVER_ERROR, e); + } + } finally { + MDCUtils.cleanupMDC(previousMDCContext); } } @@ -1276,6 +1292,8 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { private void deleteAlias(Aliases aliases, ZkNodeProps message) { String aliasName = message.getStr("name"); + Map previousMDCContext = MDC.getCopyOfContextMap(); + MDCUtils.setCollection(aliasName); Map> newAliasesMap = new HashMap<>(); Map newCollectionAliasesMap = new HashMap<>(); @@ -1299,400 +1317,282 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { } catch (InterruptedException e) { log.warn("", e); throw new SolrException(ErrorCode.SERVER_ERROR, e); + } finally { + MDCUtils.cleanupMDC(previousMDCContext); } } private boolean createShard(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException { - log.info("Create shard invoked: {}", message); + Map previousMDCContext = MDC.getCopyOfContextMap(); String collectionName = message.getStr(COLLECTION_PROP); String sliceName = message.getStr(SHARD_ID_PROP); - if (collectionName == null || sliceName == null) - throw new SolrException(ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters"); - int numSlices = 1; - ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); - DocCollection collection = clusterState.getCollection(collectionName); - int maxShardsPerNode = collection.getInt(ZkStateReader.MAX_SHARDS_PER_NODE, 1); - int repFactor = message.getInt(ZkStateReader.REPLICATION_FACTOR, collection.getInt(ZkStateReader.REPLICATION_FACTOR, 1)); - String createNodeSetStr = message.getStr(CREATE_NODE_SET); + MDCUtils.setMDC(collectionName, sliceName, null, null); + try { + log.info("Create shard invoked: {}", message); + if (collectionName == null || sliceName == null) + throw new SolrException(ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters"); + int numSlices = 1; - ArrayList sortedNodeList = getNodesForNewShard(clusterState, collectionName, numSlices, maxShardsPerNode, repFactor, createNodeSetStr); + ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); + DocCollection collection = clusterState.getCollection(collectionName); + int maxShardsPerNode = collection.getInt(ZkStateReader.MAX_SHARDS_PER_NODE, 1); + int repFactor = message.getInt(ZkStateReader.REPLICATION_FACTOR, collection.getInt(ZkStateReader.REPLICATION_FACTOR, 1)); + String createNodeSetStr = message.getStr(CREATE_NODE_SET); - Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message)); - // wait for a while until we see the shard - long waitUntil = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS); - boolean created = false; - while (System.nanoTime() < waitUntil) { - Thread.sleep(100); - created = zkStateReader.getClusterState().getCollection(collectionName).getSlice(sliceName) != null; - if (created) break; + ArrayList sortedNodeList = getNodesForNewShard(clusterState, collectionName, numSlices, maxShardsPerNode, repFactor, createNodeSetStr); + + Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message)); + // wait for a while until we see the shard + long waitUntil = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS); + boolean created = false; + while (System.nanoTime() < waitUntil) { + Thread.sleep(100); + created = zkStateReader.getClusterState().getCollection(collectionName).getSlice(sliceName) != null; + if (created) break; + } + if (!created) + throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully create shard: " + message.getStr("name")); + + + String configName = message.getStr(COLL_CONF); + for (int j = 1; j <= repFactor; j++) { + String nodeName = sortedNodeList.get(((j - 1)) % sortedNodeList.size()).nodeName; + String shardName = collectionName + "_" + sliceName + "_replica" + j; + log.info("Creating shard " + shardName + " as part of slice " + + sliceName + " of collection " + collectionName + " on " + + nodeName); + + // Need to create new params for each request + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString()); + + params.set(CoreAdminParams.NAME, shardName); + params.set(COLL_CONF, configName); + params.set(CoreAdminParams.COLLECTION, collectionName); + params.set(CoreAdminParams.SHARD, sliceName); + params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices); + addPropertyParams(message, params); + + ShardRequest sreq = new ShardRequest(); + params.set("qt", adminPath); + sreq.purpose = 1; + String replica = zkStateReader.getBaseUrlForNodeName(nodeName); + sreq.shards = new String[]{replica}; + sreq.actualShards = sreq.shards; + sreq.params = params; + + shardHandler.submit(sreq, replica, sreq.params); + + } + + processResponses(results, shardHandler); + + log.info("Finished create command on all shards for collection: " + + collectionName); + + return true; + } finally { + MDCUtils.cleanupMDC(previousMDCContext); } - if (!created) - throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully create shard: " + message.getStr("name")); - - - String configName = message.getStr(COLL_CONF); - for (int j = 1; j <= repFactor; j++) { - String nodeName = sortedNodeList.get(((j - 1)) % sortedNodeList.size()).nodeName; - String shardName = collectionName + "_" + sliceName + "_replica" + j; - log.info("Creating shard " + shardName + " as part of slice " - + sliceName + " of collection " + collectionName + " on " - + nodeName); - - // Need to create new params for each request - ModifiableSolrParams params = new ModifiableSolrParams(); - params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString()); - - params.set(CoreAdminParams.NAME, shardName); - params.set(COLL_CONF, configName); - params.set(CoreAdminParams.COLLECTION, collectionName); - params.set(CoreAdminParams.SHARD, sliceName); - params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices); - addPropertyParams(message, params); - - ShardRequest sreq = new ShardRequest(); - params.set("qt", adminPath); - sreq.purpose = 1; - String replica = zkStateReader.getBaseUrlForNodeName(nodeName); - sreq.shards = new String[]{replica}; - sreq.actualShards = sreq.shards; - sreq.params = params; - - shardHandler.submit(sreq, replica, sreq.params); - - } - - processResponses(results, shardHandler); - - log.info("Finished create command on all shards for collection: " - + collectionName); - - return true; } private boolean splitShard(ClusterState clusterState, ZkNodeProps message, NamedList results) { - log.info("Split shard invoked"); String collectionName = message.getStr("collection"); String slice = message.getStr(ZkStateReader.SHARD_ID_PROP); - String splitKey = message.getStr("split.key"); - ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); + Map previousMDCContext = MDC.getCopyOfContextMap(); + MDCUtils.setMDC(collectionName, slice, null, null); + try { + log.info("Split shard invoked"); + String splitKey = message.getStr("split.key"); + ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); - DocCollection collection = clusterState.getCollection(collectionName); - DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT; + DocCollection collection = clusterState.getCollection(collectionName); + DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT; - Slice parentSlice = null; + Slice parentSlice = null; - if (slice == null) { - if (router instanceof CompositeIdRouter) { - Collection searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection); - if (searchSlices.isEmpty()) { - throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to find an active shard for split.key: " + splitKey); - } - if (searchSlices.size() > 1) { + if (slice == null) { + if (router instanceof CompositeIdRouter) { + Collection searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection); + if (searchSlices.isEmpty()) { + throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to find an active shard for split.key: " + splitKey); + } + if (searchSlices.size() > 1) { + throw new SolrException(ErrorCode.BAD_REQUEST, + "Splitting a split.key: " + splitKey + " which spans multiple shards is not supported"); + } + parentSlice = searchSlices.iterator().next(); + slice = parentSlice.getName(); + log.info("Split by route.key: {}, parent shard is: {} ", splitKey, slice); + } else { throw new SolrException(ErrorCode.BAD_REQUEST, - "Splitting a split.key: " + splitKey + " which spans multiple shards is not supported"); + "Split by route key can only be used with CompositeIdRouter or subclass. Found router: " + router.getClass().getName()); } - parentSlice = searchSlices.iterator().next(); - slice = parentSlice.getName(); - log.info("Split by route.key: {}, parent shard is: {} ", splitKey, slice); - } else { - throw new SolrException(ErrorCode.BAD_REQUEST, - "Split by route key can only be used with CompositeIdRouter or subclass. Found router: " + router.getClass().getName()); - } - } else { - parentSlice = clusterState.getSlice(collectionName, slice); - } - - if (parentSlice == null) { - if(clusterState.hasCollection(collectionName)) { - throw new SolrException(ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice); } else { - throw new SolrException(ErrorCode.BAD_REQUEST, "No collection with the specified name exists: " + collectionName); - } - } - - // find the leader for the shard - Replica parentShardLeader = null; - try { - parentShardLeader = zkStateReader.getLeaderRetry(collectionName, slice, 10000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + parentSlice = clusterState.getSlice(collectionName, slice); + } - DocRouter.Range range = parentSlice.getRange(); - if (range == null) { - range = new PlainIdRouter().fullRange(); - } - - List subRanges = null; - String rangesStr = message.getStr(CoreAdminParams.RANGES); - if (rangesStr != null) { - String[] ranges = rangesStr.split(","); - if (ranges.length == 0 || ranges.length == 1) { - throw new SolrException(ErrorCode.BAD_REQUEST, "There must be at least two ranges specified to split a shard"); - } else { - subRanges = new ArrayList<>(ranges.length); - for (int i = 0; i < ranges.length; i++) { - String r = ranges[i]; - try { - subRanges.add(DocRouter.DEFAULT.fromString(r)); - } catch (Exception e) { - throw new SolrException(ErrorCode.BAD_REQUEST, "Exception in parsing hexadecimal hash range: " + r, e); - } - if (!subRanges.get(i).isSubsetOf(range)) { - throw new SolrException(ErrorCode.BAD_REQUEST, - "Specified hash range: " + r + " is not a subset of parent shard's range: " + range.toString()); - } - } - List temp = new ArrayList<>(subRanges); // copy to preserve original order - Collections.sort(temp); - if (!range.equals(new DocRouter.Range(temp.get(0).min, temp.get(temp.size() - 1).max))) { - throw new SolrException(ErrorCode.BAD_REQUEST, - "Specified hash ranges: " + rangesStr + " do not cover the entire range of parent shard: " + range); - } - for (int i = 1; i < temp.size(); i++) { - if (temp.get(i - 1).max + 1 != temp.get(i).min) { - throw new SolrException(ErrorCode.BAD_REQUEST, - "Specified hash ranges: " + rangesStr + " either overlap with each other or " + - "do not cover the entire range of parent shard: " + range); - } + if (parentSlice == null) { + if (clusterState.hasCollection(collectionName)) { + throw new SolrException(ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice); + } else { + throw new SolrException(ErrorCode.BAD_REQUEST, "No collection with the specified name exists: " + collectionName); } } - } else if (splitKey != null) { - if (router instanceof CompositeIdRouter) { - CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router; - subRanges = compositeIdRouter.partitionRangeByKey(splitKey, range); - if (subRanges.size() == 1) { - throw new SolrException(ErrorCode.BAD_REQUEST, - "The split.key: " + splitKey + " has a hash range that is exactly equal to hash range of shard: " + slice); - } - for (DocRouter.Range subRange : subRanges) { - if (subRange.min == subRange.max) { - throw new SolrException(ErrorCode.BAD_REQUEST, "The split.key: " + splitKey + " must be a compositeId"); + + // find the leader for the shard + Replica parentShardLeader = null; + try { + parentShardLeader = zkStateReader.getLeaderRetry(collectionName, slice, 10000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + DocRouter.Range range = parentSlice.getRange(); + if (range == null) { + range = new PlainIdRouter().fullRange(); + } + + List subRanges = null; + String rangesStr = message.getStr(CoreAdminParams.RANGES); + if (rangesStr != null) { + String[] ranges = rangesStr.split(","); + if (ranges.length == 0 || ranges.length == 1) { + throw new SolrException(ErrorCode.BAD_REQUEST, "There must be at least two ranges specified to split a shard"); + } else { + subRanges = new ArrayList<>(ranges.length); + for (int i = 0; i < ranges.length; i++) { + String r = ranges[i]; + try { + subRanges.add(DocRouter.DEFAULT.fromString(r)); + } catch (Exception e) { + throw new SolrException(ErrorCode.BAD_REQUEST, "Exception in parsing hexadecimal hash range: " + r, e); + } + if (!subRanges.get(i).isSubsetOf(range)) { + throw new SolrException(ErrorCode.BAD_REQUEST, + "Specified hash range: " + r + " is not a subset of parent shard's range: " + range.toString()); + } + } + List temp = new ArrayList<>(subRanges); // copy to preserve original order + Collections.sort(temp); + if (!range.equals(new DocRouter.Range(temp.get(0).min, temp.get(temp.size() - 1).max))) { + throw new SolrException(ErrorCode.BAD_REQUEST, + "Specified hash ranges: " + rangesStr + " do not cover the entire range of parent shard: " + range); + } + for (int i = 1; i < temp.size(); i++) { + if (temp.get(i - 1).max + 1 != temp.get(i).min) { + throw new SolrException(ErrorCode.BAD_REQUEST, + "Specified hash ranges: " + rangesStr + " either overlap with each other or " + + "do not cover the entire range of parent shard: " + range); + } } } - log.info("Partitioning parent shard " + slice + " range: " + parentSlice.getRange() + " yields: " + subRanges); - rangesStr = ""; + } else if (splitKey != null) { + if (router instanceof CompositeIdRouter) { + CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router; + subRanges = compositeIdRouter.partitionRangeByKey(splitKey, range); + if (subRanges.size() == 1) { + throw new SolrException(ErrorCode.BAD_REQUEST, + "The split.key: " + splitKey + " has a hash range that is exactly equal to hash range of shard: " + slice); + } + for (DocRouter.Range subRange : subRanges) { + if (subRange.min == subRange.max) { + throw new SolrException(ErrorCode.BAD_REQUEST, "The split.key: " + splitKey + " must be a compositeId"); + } + } + log.info("Partitioning parent shard " + slice + " range: " + parentSlice.getRange() + " yields: " + subRanges); + rangesStr = ""; + for (int i = 0; i < subRanges.size(); i++) { + DocRouter.Range subRange = subRanges.get(i); + rangesStr += subRange.toString(); + if (i < subRanges.size() - 1) + rangesStr += ','; + } + } + } else { + // todo: fixed to two partitions? + subRanges = router.partitionRange(2, range); + } + + try { + List subSlices = new ArrayList<>(subRanges.size()); + List subShardNames = new ArrayList<>(subRanges.size()); + String nodeName = parentShardLeader.getNodeName(); for (int i = 0; i < subRanges.size(); i++) { - DocRouter.Range subRange = subRanges.get(i); - rangesStr += subRange.toString(); - if (i < subRanges.size() - 1) - rangesStr += ','; - } - } - } else { - // todo: fixed to two partitions? - subRanges = router.partitionRange(2, range); - } + String subSlice = slice + "_" + i; + subSlices.add(subSlice); + String subShardName = collectionName + "_" + subSlice + "_replica1"; + subShardNames.add(subShardName); - try { - List subSlices = new ArrayList<>(subRanges.size()); - List subShardNames = new ArrayList<>(subRanges.size()); - String nodeName = parentShardLeader.getNodeName(); - for (int i = 0; i < subRanges.size(); i++) { - String subSlice = slice + "_" + i; - subSlices.add(subSlice); - String subShardName = collectionName + "_" + subSlice + "_replica1"; - subShardNames.add(subShardName); - - Slice oSlice = clusterState.getSlice(collectionName, subSlice); - if (oSlice != null) { - if (Slice.ACTIVE.equals(oSlice.getState())) { - throw new SolrException(ErrorCode.BAD_REQUEST, "Sub-shard: " + subSlice + " exists in active state. Aborting split shard."); - } else if (Slice.CONSTRUCTION.equals(oSlice.getState()) || Slice.RECOVERY.equals(oSlice.getState())) { - // delete the shards - for (String sub : subSlices) { - log.info("Sub-shard: {} already exists therefore requesting its deletion", sub); - Map propMap = new HashMap<>(); - propMap.put(Overseer.QUEUE_OPERATION, "deleteshard"); - propMap.put(COLLECTION_PROP, collectionName); - propMap.put(SHARD_ID_PROP, sub); - ZkNodeProps m = new ZkNodeProps(propMap); - try { - deleteShard(clusterState, m, new NamedList()); - } catch (Exception e) { - throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to delete already existing sub shard: " + sub, e); + Slice oSlice = clusterState.getSlice(collectionName, subSlice); + if (oSlice != null) { + if (Slice.ACTIVE.equals(oSlice.getState())) { + throw new SolrException(ErrorCode.BAD_REQUEST, "Sub-shard: " + subSlice + " exists in active state. Aborting split shard."); + } else if (Slice.CONSTRUCTION.equals(oSlice.getState()) || Slice.RECOVERY.equals(oSlice.getState())) { + // delete the shards + for (String sub : subSlices) { + log.info("Sub-shard: {} already exists therefore requesting its deletion", sub); + Map propMap = new HashMap<>(); + propMap.put(Overseer.QUEUE_OPERATION, "deleteshard"); + propMap.put(COLLECTION_PROP, collectionName); + propMap.put(SHARD_ID_PROP, sub); + ZkNodeProps m = new ZkNodeProps(propMap); + try { + deleteShard(clusterState, m, new NamedList()); + } catch (Exception e) { + throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to delete already existing sub shard: " + sub, e); + } } } } } - } - // do not abort splitshard if the unloading fails - // this can happen because the replicas created previously may be down - // the only side effect of this is that the sub shard may end up having more replicas than we want - collectShardResponses(results, false, null, shardHandler); + // do not abort splitshard if the unloading fails + // this can happen because the replicas created previously may be down + // the only side effect of this is that the sub shard may end up having more replicas than we want + collectShardResponses(results, false, null, shardHandler); - String asyncId = message.getStr(ASYNC); - HashMap requestMap = new HashMap(); + String asyncId = message.getStr(ASYNC); + HashMap requestMap = new HashMap(); - for (int i=0; i propMap = new HashMap<>(); - propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD.toLower()); - propMap.put(ZkStateReader.SHARD_ID_PROP, subSlice); - propMap.put(ZkStateReader.COLLECTION_PROP, collectionName); - propMap.put(ZkStateReader.SHARD_RANGE_PROP, subRange.toString()); - propMap.put(ZkStateReader.SHARD_STATE_PROP, Slice.CONSTRUCTION); - propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName()); - DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient()); - inQueue.offer(ZkStateReader.toJSON(new ZkNodeProps(propMap))); + Map propMap = new HashMap<>(); + propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD.toLower()); + propMap.put(ZkStateReader.SHARD_ID_PROP, subSlice); + propMap.put(ZkStateReader.COLLECTION_PROP, collectionName); + propMap.put(ZkStateReader.SHARD_RANGE_PROP, subRange.toString()); + propMap.put(ZkStateReader.SHARD_STATE_PROP, Slice.CONSTRUCTION); + propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName()); + DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient()); + inQueue.offer(ZkStateReader.toJSON(new ZkNodeProps(propMap))); - // wait until we are able to see the new shard in cluster state - waitForNewShard(collectionName, subSlice); + // wait until we are able to see the new shard in cluster state + waitForNewShard(collectionName, subSlice); - // refresh cluster state - clusterState = zkStateReader.getClusterState(); + // refresh cluster state + clusterState = zkStateReader.getClusterState(); - log.info("Adding replica " + subShardName + " as part of slice " - + subSlice + " of collection " + collectionName + " on " - + nodeName); - propMap = new HashMap<>(); - propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower()); - propMap.put(COLLECTION_PROP, collectionName); - propMap.put(SHARD_ID_PROP, subSlice); - propMap.put("node", nodeName); - propMap.put(CoreAdminParams.NAME, subShardName); - // copy over property params: - for (String key : message.keySet()) { - if (key.startsWith(COLL_PROP_PREFIX)) { - propMap.put(key, message.getStr(key)); - } - } - // add async param - if(asyncId != null) { - propMap.put(ASYNC, asyncId); - } - addReplica(clusterState, new ZkNodeProps(propMap), results); - } - - collectShardResponses(results, true, - "SPLITSHARD failed to create subshard leaders", shardHandler); - - completeAsyncRequest(asyncId, requestMap, results); - - for (String subShardName : subShardNames) { - // wait for parent leader to acknowledge the sub-shard core - log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName); - String coreNodeName = waitForCoreNodeName(collectionName, nodeName, subShardName); - CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState(); - cmd.setCoreName(subShardName); - cmd.setNodeName(nodeName); - cmd.setCoreNodeName(coreNodeName); - cmd.setState(ZkStateReader.ACTIVE); - cmd.setCheckLive(true); - cmd.setOnlyIfLeader(true); - - ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams()); - sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap); - } - - collectShardResponses(results, true, - "SPLITSHARD timed out waiting for subshard leaders to come up", shardHandler); - - completeAsyncRequest(asyncId, requestMap, results); - - log.info("Successfully created all sub-shards for collection " - + collectionName + " parent shard: " + slice + " on: " + parentShardLeader); - - log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice " - + slice + " of collection " + collectionName + " on " - + parentShardLeader); - - ModifiableSolrParams params = new ModifiableSolrParams(); - params.set(CoreAdminParams.ACTION, CoreAdminAction.SPLIT.toString()); - params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core")); - for (int i = 0; i < subShardNames.size(); i++) { - String subShardName = subShardNames.get(i); - params.add(CoreAdminParams.TARGET_CORE, subShardName); - } - params.set(CoreAdminParams.RANGES, rangesStr); - - sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap); - - collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command", - shardHandler); - completeAsyncRequest(asyncId, requestMap, results); - - log.info("Index on shard: " + nodeName + " split into two successfully"); - - // apply buffered updates on sub-shards - for (int i = 0; i < subShardNames.size(); i++) { - String subShardName = subShardNames.get(i); - - log.info("Applying buffered updates on : " + subShardName); - - params = new ModifiableSolrParams(); - params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString()); - params.set(CoreAdminParams.NAME, subShardName); - - sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap); - } - - collectShardResponses(results, true, - "SPLITSHARD failed while asking sub shard leaders to apply buffered updates", - shardHandler); - - completeAsyncRequest(asyncId, requestMap, results); - - log.info("Successfully applied buffered updates on : " + subShardNames); - - // Replica creation for the new Slices - - // look at the replication factor and see if it matches reality - // if it does not, find best nodes to create more cores - - // TODO: Have replication factor decided in some other way instead of numShards for the parent - - int repFactor = clusterState.getSlice(collectionName, slice).getReplicas().size(); - - // we need to look at every node and see how many cores it serves - // add our new cores to existing nodes serving the least number of cores - // but (for now) require that each core goes on a distinct node. - - // TODO: add smarter options that look at the current number of cores per - // node? - // for now we just go random - Set nodes = clusterState.getLiveNodes(); - List nodeList = new ArrayList<>(nodes.size()); - nodeList.addAll(nodes); - - Collections.shuffle(nodeList, RANDOM); - - // TODO: Have maxShardsPerNode param for this operation? - - // Remove the node that hosts the parent shard for replica creation. - nodeList.remove(nodeName); - - // TODO: change this to handle sharding a slice into > 2 sub-shards. - - for (int i = 1; i <= subSlices.size(); i++) { - Collections.shuffle(nodeList, RANDOM); - String sliceName = subSlices.get(i - 1); - for (int j = 2; j <= repFactor; j++) { - String subShardNodeName = nodeList.get((repFactor * (i - 1) + (j - 2)) % nodeList.size()); - String shardName = collectionName + "_" + sliceName + "_replica" + (j); - - log.info("Creating replica shard " + shardName + " as part of slice " - + sliceName + " of collection " + collectionName + " on " - + subShardNodeName); - - HashMap propMap = new HashMap<>(); + log.info("Adding replica " + subShardName + " as part of slice " + + subSlice + " of collection " + collectionName + " on " + + nodeName); + propMap = new HashMap<>(); propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower()); propMap.put(COLLECTION_PROP, collectionName); - propMap.put(SHARD_ID_PROP, sliceName); - propMap.put("node", subShardNodeName); - propMap.put(CoreAdminParams.NAME, shardName); + propMap.put(SHARD_ID_PROP, subSlice); + propMap.put("node", nodeName); + propMap.put(CoreAdminParams.NAME, subShardName); // copy over property params: for (String key : message.keySet()) { if (key.startsWith(COLL_PROP_PREFIX)) { @@ -1704,66 +1604,199 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { propMap.put(ASYNC, asyncId); } addReplica(clusterState, new ZkNodeProps(propMap), results); + } - String coreNodeName = waitForCoreNodeName(collectionName, subShardNodeName, shardName); - // wait for the replicas to be seen as active on sub shard leader - log.info("Asking sub shard leader to wait for: " + shardName + " to be alive on: " + subShardNodeName); + collectShardResponses(results, true, + "SPLITSHARD failed to create subshard leaders", shardHandler); + + completeAsyncRequest(asyncId, requestMap, results); + + for (String subShardName : subShardNames) { + // wait for parent leader to acknowledge the sub-shard core + log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName); + String coreNodeName = waitForCoreNodeName(collectionName, nodeName, subShardName); CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState(); - cmd.setCoreName(subShardNames.get(i-1)); - cmd.setNodeName(subShardNodeName); + cmd.setCoreName(subShardName); + cmd.setNodeName(nodeName); cmd.setCoreNodeName(coreNodeName); - cmd.setState(ZkStateReader.RECOVERING); + cmd.setState(ZkStateReader.ACTIVE); cmd.setCheckLive(true); cmd.setOnlyIfLeader(true); + ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams()); - sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap); - } + + collectShardResponses(results, true, + "SPLITSHARD timed out waiting for subshard leaders to come up", shardHandler); + + completeAsyncRequest(asyncId, requestMap, results); + + log.info("Successfully created all sub-shards for collection " + + collectionName + " parent shard: " + slice + " on: " + parentShardLeader); + + log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice " + + slice + " of collection " + collectionName + " on " + + parentShardLeader); + + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(CoreAdminParams.ACTION, CoreAdminAction.SPLIT.toString()); + params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core")); + for (int i = 0; i < subShardNames.size(); i++) { + String subShardName = subShardNames.get(i); + params.add(CoreAdminParams.TARGET_CORE, subShardName); + } + params.set(CoreAdminParams.RANGES, rangesStr); + + sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap); + + collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command", + shardHandler); + completeAsyncRequest(asyncId, requestMap, results); + + log.info("Index on shard: " + nodeName + " split into two successfully"); + + // apply buffered updates on sub-shards + for (int i = 0; i < subShardNames.size(); i++) { + String subShardName = subShardNames.get(i); + + log.info("Applying buffered updates on : " + subShardName); + + params = new ModifiableSolrParams(); + params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString()); + params.set(CoreAdminParams.NAME, subShardName); + + sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap); + } + + collectShardResponses(results, true, + "SPLITSHARD failed while asking sub shard leaders to apply buffered updates", + shardHandler); + + completeAsyncRequest(asyncId, requestMap, results); + + log.info("Successfully applied buffered updates on : " + subShardNames); + + // Replica creation for the new Slices + + // look at the replication factor and see if it matches reality + // if it does not, find best nodes to create more cores + + // TODO: Have replication factor decided in some other way instead of numShards for the parent + + int repFactor = clusterState.getSlice(collectionName, slice).getReplicas().size(); + + // we need to look at every node and see how many cores it serves + // add our new cores to existing nodes serving the least number of cores + // but (for now) require that each core goes on a distinct node. + + // TODO: add smarter options that look at the current number of cores per + // node? + // for now we just go random + Set nodes = clusterState.getLiveNodes(); + List nodeList = new ArrayList<>(nodes.size()); + nodeList.addAll(nodes); + + Collections.shuffle(nodeList, RANDOM); + + // TODO: Have maxShardsPerNode param for this operation? + + // Remove the node that hosts the parent shard for replica creation. + nodeList.remove(nodeName); + + // TODO: change this to handle sharding a slice into > 2 sub-shards. + + for (int i = 1; i <= subSlices.size(); i++) { + Collections.shuffle(nodeList, RANDOM); + String sliceName = subSlices.get(i - 1); + for (int j = 2; j <= repFactor; j++) { + String subShardNodeName = nodeList.get((repFactor * (i - 1) + (j - 2)) % nodeList.size()); + String shardName = collectionName + "_" + sliceName + "_replica" + (j); + + log.info("Creating replica shard " + shardName + " as part of slice " + + sliceName + " of collection " + collectionName + " on " + + subShardNodeName); + + HashMap propMap = new HashMap<>(); + propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower()); + propMap.put(COLLECTION_PROP, collectionName); + propMap.put(SHARD_ID_PROP, sliceName); + propMap.put("node", subShardNodeName); + propMap.put(CoreAdminParams.NAME, shardName); + // copy over property params: + for (String key : message.keySet()) { + if (key.startsWith(COLL_PROP_PREFIX)) { + propMap.put(key, message.getStr(key)); + } + } + // add async param + if (asyncId != null) { + propMap.put(ASYNC, asyncId); + } + addReplica(clusterState, new ZkNodeProps(propMap), results); + + String coreNodeName = waitForCoreNodeName(collectionName, subShardNodeName, shardName); + // wait for the replicas to be seen as active on sub shard leader + log.info("Asking sub shard leader to wait for: " + shardName + " to be alive on: " + subShardNodeName); + CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState(); + cmd.setCoreName(subShardNames.get(i - 1)); + cmd.setNodeName(subShardNodeName); + cmd.setCoreNodeName(coreNodeName); + cmd.setState(ZkStateReader.RECOVERING); + cmd.setCheckLive(true); + cmd.setOnlyIfLeader(true); + ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams()); + + sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap); + + } + } + + collectShardResponses(results, true, + "SPLITSHARD failed to create subshard replicas or timed out waiting for them to come up", + shardHandler); + + completeAsyncRequest(asyncId, requestMap, results); + + log.info("Successfully created all replica shards for all sub-slices " + subSlices); + + commit(results, slice, parentShardLeader); + + if (repFactor == 1) { + // switch sub shard states to 'active' + log.info("Replication factor is 1 so switching shard states"); + DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient()); + Map propMap = new HashMap<>(); + propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower()); + propMap.put(slice, Slice.INACTIVE); + for (String subSlice : subSlices) { + propMap.put(subSlice, Slice.ACTIVE); + } + propMap.put(ZkStateReader.COLLECTION_PROP, collectionName); + ZkNodeProps m = new ZkNodeProps(propMap); + inQueue.offer(ZkStateReader.toJSON(m)); + } else { + log.info("Requesting shard state be set to 'recovery'"); + DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient()); + Map propMap = new HashMap<>(); + propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower()); + for (String subSlice : subSlices) { + propMap.put(subSlice, Slice.RECOVERY); + } + propMap.put(ZkStateReader.COLLECTION_PROP, collectionName); + ZkNodeProps m = new ZkNodeProps(propMap); + inQueue.offer(ZkStateReader.toJSON(m)); + } + + return true; + } catch (SolrException e) { + throw e; + } catch (Exception e) { + log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e); + throw new SolrException(ErrorCode.SERVER_ERROR, null, e); } - - collectShardResponses(results, true, - "SPLITSHARD failed to create subshard replicas or timed out waiting for them to come up", - shardHandler); - - completeAsyncRequest(asyncId, requestMap, results); - - log.info("Successfully created all replica shards for all sub-slices " + subSlices); - - commit(results, slice, parentShardLeader); - - if (repFactor == 1) { - // switch sub shard states to 'active' - log.info("Replication factor is 1 so switching shard states"); - DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient()); - Map propMap = new HashMap<>(); - propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower()); - propMap.put(slice, Slice.INACTIVE); - for (String subSlice : subSlices) { - propMap.put(subSlice, Slice.ACTIVE); - } - propMap.put(ZkStateReader.COLLECTION_PROP, collectionName); - ZkNodeProps m = new ZkNodeProps(propMap); - inQueue.offer(ZkStateReader.toJSON(m)); - } else { - log.info("Requesting shard state be set to 'recovery'"); - DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient()); - Map propMap = new HashMap<>(); - propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower()); - for (String subSlice : subSlices) { - propMap.put(subSlice, Slice.RECOVERY); - } - propMap.put(ZkStateReader.COLLECTION_PROP, collectionName); - ZkNodeProps m = new ZkNodeProps(propMap); - inQueue.offer(ZkStateReader.toJSON(m)); - } - - return true; - } catch (SolrException e) { - throw e; - } catch (Exception e) { - log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e); - throw new SolrException(ErrorCode.SERVER_ERROR, null, e); + } finally { + MDCUtils.cleanupMDC(previousMDCContext); } } @@ -1870,67 +1903,72 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { } private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) { - log.info("Delete shard invoked"); String collection = message.getStr(ZkStateReader.COLLECTION_PROP); - String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP); - Slice slice = clusterState.getSlice(collection, sliceId); - - if (slice == null) { - if(clusterState.hasCollection(collection)) { - throw new SolrException(ErrorCode.BAD_REQUEST, - "No shard with name " + sliceId + " exists for collection " + collection); - } else { - throw new SolrException(ErrorCode.BAD_REQUEST, - "No collection with the specified name exists: " + collection); - } - } - // For now, only allow for deletions of Inactive slices or custom hashes (range==null). - // TODO: Add check for range gaps on Slice deletion - if (!(slice.getRange() == null || slice.getState().equals(Slice.INACTIVE) - || slice.getState().equals(Slice.RECOVERY) || slice.getState().equals(Slice.CONSTRUCTION))) { - throw new SolrException(ErrorCode.BAD_REQUEST, - "The slice: " + slice.getName() + " is currently " - + slice.getState() + ". Only non-active (or custom-hashed) slices can be deleted."); - } - ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); - + Map previousMDCContext = MDC.getCopyOfContextMap(); + MDCUtils.setMDC(collection, sliceId, null, null); try { - ModifiableSolrParams params = new ModifiableSolrParams(); - params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString()); - params.set(CoreAdminParams.DELETE_INDEX, "true"); - sliceCmd(clusterState, params, null, slice, shardHandler); + log.info("Delete shard invoked"); + Slice slice = clusterState.getSlice(collection, sliceId); - processResponses(results, shardHandler); - - ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, - DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP, collection, - ZkStateReader.SHARD_ID_PROP, sliceId); - Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m)); - - // wait for a while until we don't see the shard - long now = System.nanoTime(); - long timeout = now + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS); - boolean removed = false; - while (System.nanoTime() < timeout) { - Thread.sleep(100); - removed = zkStateReader.getClusterState().getSlice(collection, sliceId) == null; - if (removed) { - Thread.sleep(100); // just a bit of time so it's more likely other readers see on return - break; + if (slice == null) { + if (clusterState.hasCollection(collection)) { + throw new SolrException(ErrorCode.BAD_REQUEST, + "No shard with name " + sliceId + " exists for collection " + collection); + } else { + throw new SolrException(ErrorCode.BAD_REQUEST, + "No collection with the specified name exists: " + collection); } } - if (!removed) { - throw new SolrException(ErrorCode.SERVER_ERROR, - "Could not fully remove collection: " + collection + " shard: " + sliceId); + // For now, only allow for deletions of Inactive slices or custom hashes (range==null). + // TODO: Add check for range gaps on Slice deletion + if (!(slice.getRange() == null || slice.getState().equals(Slice.INACTIVE) + || slice.getState().equals(Slice.RECOVERY) || slice.getState().equals(Slice.CONSTRUCTION))) { + throw new SolrException(ErrorCode.BAD_REQUEST, + "The slice: " + slice.getName() + " is currently " + + slice.getState() + ". Only non-active (or custom-hashed) slices can be deleted."); } + ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); - log.info("Successfully deleted collection: " + collection + ", shard: " + sliceId); + try { + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString()); + params.set(CoreAdminParams.DELETE_INDEX, "true"); + sliceCmd(clusterState, params, null, slice, shardHandler); - } catch (SolrException e) { - throw e; - } catch (Exception e) { - throw new SolrException(ErrorCode.SERVER_ERROR, "Error executing delete operation for collection: " + collection + " shard: " + sliceId, e); + processResponses(results, shardHandler); + + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, + DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP, collection, + ZkStateReader.SHARD_ID_PROP, sliceId); + Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m)); + + // wait for a while until we don't see the shard + long now = System.nanoTime(); + long timeout = now + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS); + boolean removed = false; + while (System.nanoTime() < timeout) { + Thread.sleep(100); + removed = zkStateReader.getClusterState().getSlice(collection, sliceId) == null; + if (removed) { + Thread.sleep(100); // just a bit of time so it's more likely other readers see on return + break; + } + } + if (!removed) { + throw new SolrException(ErrorCode.SERVER_ERROR, + "Could not fully remove collection: " + collection + " shard: " + sliceId); + } + + log.info("Successfully deleted collection: " + collection + ", shard: " + sliceId); + + } catch (SolrException e) { + throw e; + } catch (Exception e) { + throw new SolrException(ErrorCode.SERVER_ERROR, "Error executing delete operation for collection: " + collection + " shard: " + sliceId, e); + } + } finally { + MDCUtils.cleanupMDC(previousMDCContext); } } @@ -2495,97 +2533,103 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { String node = message.getStr("node"); String shard = message.getStr(SHARD_ID_PROP); String coreName = message.getStr(CoreAdminParams.NAME); - String asyncId = message.getStr("async"); + Map previousMDCContext = MDC.getCopyOfContextMap(); + MDCUtils.setMDC(collection, shard, null, coreName); + try { + String asyncId = message.getStr("async"); - DocCollection coll = clusterState.getCollection(collection); - if (coll == null) { - throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist"); - } - if (coll.getSlice(shard) == null) { - throw new SolrException(ErrorCode.BAD_REQUEST, - "Collection: " + collection + " shard: " + shard + " does not exist"); - } - ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); + DocCollection coll = clusterState.getCollection(collection); + if (coll == null) { + throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist"); + } + if (coll.getSlice(shard) == null) { + throw new SolrException(ErrorCode.BAD_REQUEST, + "Collection: " + collection + " shard: " + shard + " does not exist"); + } + ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); - if (node == null) { - node = getNodesForNewShard(clusterState, collection, coll.getSlices().size(), coll.getInt(ZkStateReader.MAX_SHARDS_PER_NODE, 1), coll.getInt(ZkStateReader.REPLICATION_FACTOR, 1), null).get(0).nodeName; - log.info("Node not provided, Identified {} for creating new replica", node); - } + if (node == null) { + node = getNodesForNewShard(clusterState, collection, coll.getSlices().size(), coll.getInt(ZkStateReader.MAX_SHARDS_PER_NODE, 1), coll.getInt(ZkStateReader.REPLICATION_FACTOR, 1), null).get(0).nodeName; + log.info("Node not provided, Identified {} for creating new replica", node); + } - if (!clusterState.liveNodesContain(node)) { - throw new SolrException(ErrorCode.BAD_REQUEST, "Node: " + node + " is not live"); - } - if (coreName == null) { - // assign a name to this core - Slice slice = coll.getSlice(shard); - int replicaNum = slice.getReplicas().size(); - for (;;) { - String replicaName = collection + "_" + shard + "_replica" + replicaNum; - boolean exists = false; - for (Replica replica : slice.getReplicas()) { - if (replicaName.equals(replica.getStr("core"))) { - exists = true; - break; + if (!clusterState.liveNodesContain(node)) { + throw new SolrException(ErrorCode.BAD_REQUEST, "Node: " + node + " is not live"); + } + if (coreName == null) { + // assign a name to this core + Slice slice = coll.getSlice(shard); + int replicaNum = slice.getReplicas().size(); + for (; ; ) { + String replicaName = collection + "_" + shard + "_replica" + replicaNum; + boolean exists = false; + for (Replica replica : slice.getReplicas()) { + if (replicaName.equals(replica.getStr("core"))) { + exists = true; + break; + } } + if (exists) replicaNum++; + else break; } - if (exists) replicaNum++; - else break; + coreName = collection + "_" + shard + "_replica" + replicaNum; } - coreName = collection + "_" + shard + "_replica" + replicaNum; - } - ModifiableSolrParams params = new ModifiableSolrParams(); + ModifiableSolrParams params = new ModifiableSolrParams(); - if(!Overseer.isLegacy(zkStateReader.getClusterProps())){ - ZkNodeProps props = new ZkNodeProps( - Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(), - ZkStateReader.COLLECTION_PROP, collection, - ZkStateReader.SHARD_ID_PROP, shard, - ZkStateReader.CORE_NAME_PROP, coreName, - ZkStateReader.STATE_PROP, ZkStateReader.DOWN, - ZkStateReader.BASE_URL_PROP,zkStateReader.getBaseUrlForNodeName(node)); - Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props)); - params.set(CoreAdminParams.CORE_NODE_NAME, waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName()); - } + if (!Overseer.isLegacy(zkStateReader.getClusterProps())) { + ZkNodeProps props = new ZkNodeProps( + Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(), + ZkStateReader.COLLECTION_PROP, collection, + ZkStateReader.SHARD_ID_PROP, shard, + ZkStateReader.CORE_NAME_PROP, coreName, + ZkStateReader.STATE_PROP, ZkStateReader.DOWN, + ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(node)); + Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props)); + params.set(CoreAdminParams.CORE_NODE_NAME, waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName()); + } - String configName = zkStateReader.readConfigName(collection); - String routeKey = message.getStr(ShardParams._ROUTE_); - String dataDir = message.getStr(CoreAdminParams.DATA_DIR); - String instanceDir = message.getStr(CoreAdminParams.INSTANCE_DIR); + String configName = zkStateReader.readConfigName(collection); + String routeKey = message.getStr(ShardParams._ROUTE_); + String dataDir = message.getStr(CoreAdminParams.DATA_DIR); + String instanceDir = message.getStr(CoreAdminParams.INSTANCE_DIR); - params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString()); - params.set(CoreAdminParams.NAME, coreName); - params.set(COLL_CONF, configName); - params.set(CoreAdminParams.COLLECTION, collection); - if (shard != null) { - params.set(CoreAdminParams.SHARD, shard); - } else if (routeKey != null) { - Collection slices = coll.getRouter().getSearchSlicesSingle(routeKey, null, coll); - if (slices.isEmpty()) { - throw new SolrException(ErrorCode.BAD_REQUEST, "No active shard serving _route_=" + routeKey + " found"); + params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString()); + params.set(CoreAdminParams.NAME, coreName); + params.set(COLL_CONF, configName); + params.set(CoreAdminParams.COLLECTION, collection); + if (shard != null) { + params.set(CoreAdminParams.SHARD, shard); + } else if (routeKey != null) { + Collection slices = coll.getRouter().getSearchSlicesSingle(routeKey, null, coll); + if (slices.isEmpty()) { + throw new SolrException(ErrorCode.BAD_REQUEST, "No active shard serving _route_=" + routeKey + " found"); + } else { + params.set(CoreAdminParams.SHARD, slices.iterator().next().getName()); + } } else { - params.set(CoreAdminParams.SHARD, slices.iterator().next().getName()); + throw new SolrException(ErrorCode.BAD_REQUEST, "Specify either 'shard' or _route_ param"); } - } else { - throw new SolrException(ErrorCode.BAD_REQUEST, "Specify either 'shard' or _route_ param"); - } - if (dataDir != null) { - params.set(CoreAdminParams.DATA_DIR, dataDir); - } - if (instanceDir != null) { - params.set(CoreAdminParams.INSTANCE_DIR, instanceDir); - } - addPropertyParams(message, params); + if (dataDir != null) { + params.set(CoreAdminParams.DATA_DIR, dataDir); + } + if (instanceDir != null) { + params.set(CoreAdminParams.INSTANCE_DIR, instanceDir); + } + addPropertyParams(message, params); - // For tracking async calls. - HashMap requestMap = new HashMap<>(); - sendShardRequest(node, params, shardHandler, asyncId, requestMap); + // For tracking async calls. + HashMap requestMap = new HashMap<>(); + sendShardRequest(node, params, shardHandler, asyncId, requestMap); - collectShardResponses(results, true, - "ADDREPLICA failed to create replica", shardHandler); + collectShardResponses(results, true, + "ADDREPLICA failed to create replica", shardHandler); - completeAsyncRequest(asyncId, requestMap, results); + completeAsyncRequest(asyncId, requestMap, results); + } finally { + MDCUtils.cleanupMDC(previousMDCContext); + } } private void processResponses(NamedList results, ShardHandler shardHandler) { @@ -2844,6 +2888,8 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { String asyncId = message.getStr(ASYNC); String collectionName = message.containsKey(COLLECTION_PROP) ? message.getStr(COLLECTION_PROP) : message.getStr("name"); + Map previousMDCContext = MDC.getCopyOfContextMap(); + MDCUtils.setCollection(collectionName); try { try { log.debug("Runner processing {}", head.getId()); @@ -2888,6 +2934,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable { synchronized (waitLock){ waitLock.notifyAll(); } + MDCUtils.cleanupMDC(previousMDCContext); } } diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 394abbb1bb1..fa2215fab1d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -53,6 +53,7 @@ import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreDescriptor; import org.apache.solr.core.SolrCore; import org.apache.solr.handler.component.ShardHandler; +import org.apache.solr.logging.MDCUtils; import org.apache.solr.update.UpdateLog; import org.apache.solr.update.UpdateShardHandler; import org.apache.zookeeper.CreateMode; @@ -66,6 +67,7 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import java.io.IOException; import java.io.UnsupportedEncodingException; @@ -826,17 +828,19 @@ public final class ZkController { */ public String register(String coreName, final CoreDescriptor desc, boolean recoverReloadedCores, boolean afterExpiration) throws Exception { // pre register has published our down state - final String baseUrl = getBaseUrl(); final CloudDescriptor cloudDesc = desc.getCloudDescriptor(); final String collection = cloudDesc.getCollectionName(); + Map previousMDCContext = MDC.getCopyOfContextMap(); + MDCUtils.setCollection(collection); + final String coreZkNodeName = desc.getCloudDescriptor().getCoreNodeName(); assert coreZkNodeName != null : "we should have a coreNodeName by now"; String shardId = cloudDesc.getShardId(); - + MDCUtils.setShard(shardId); Map props = new HashMap<>(); // we only put a subset of props into the leader node props.put(ZkStateReader.BASE_URL_PROP, baseUrl); @@ -852,67 +856,71 @@ public final class ZkController { ZkNodeProps leaderProps = new ZkNodeProps(props); try { - // If we're a preferred leader, insert ourselves at the head of the queue - boolean joinAtHead = false; - Replica replica = zkStateReader.getClusterState().getReplica(desc.getCloudDescriptor().getCollectionName(), coreZkNodeName); - if (replica != null) { - joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false); + try { + // If we're a preferred leader, insert ourselves at the head of the queue + boolean joinAtHead = false; + Replica replica = zkStateReader.getClusterState().getReplica(desc.getCloudDescriptor().getCollectionName(), coreZkNodeName); + if (replica != null) { + joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false); + } + joinElection(desc, afterExpiration, joinAtHead); + } catch (InterruptedException e) { + // Restore the interrupted status + Thread.currentThread().interrupt(); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); + } catch (KeeperException | IOException e) { + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); } - joinElection(desc, afterExpiration, joinAtHead); - } catch (InterruptedException e) { - // Restore the interrupted status - Thread.currentThread().interrupt(); - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); - } catch (KeeperException | IOException e) { - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); - } - // in this case, we want to wait for the leader as long as the leader might - // wait for a vote, at least - but also long enough that a large cluster has - // time to get its act together - String leaderUrl = getLeader(cloudDesc, leaderVoteWait + 600000); - - String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName); - log.info("We are " + ourUrl + " and leader is " + leaderUrl); - boolean isLeader = leaderUrl.equals(ourUrl); + // in this case, we want to wait for the leader as long as the leader might + // wait for a vote, at least - but also long enough that a large cluster has + // time to get its act together + String leaderUrl = getLeader(cloudDesc, leaderVoteWait + 600000); - try (SolrCore core = cc.getCore(desc.getName())) { + String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName); + log.info("We are " + ourUrl + " and leader is " + leaderUrl); + boolean isLeader = leaderUrl.equals(ourUrl); - // recover from local transaction log and wait for it to complete before - // going active - // TODO: should this be moved to another thread? To recoveryStrat? - // TODO: should this actually be done earlier, before (or as part of) - // leader election perhaps? + try (SolrCore core = cc.getCore(desc.getName())) { - UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); - if (!core.isReloaded() && ulog != null) { - // disable recovery in case shard is in construction state (for shard splits) - Slice slice = getClusterState().getSlice(collection, shardId); - if (!Slice.CONSTRUCTION.equals(slice.getState()) || !isLeader) { - Future recoveryFuture = core.getUpdateHandler() - .getUpdateLog().recoverFromLog(); - if (recoveryFuture != null) { - log.info("Replaying tlog for "+ourUrl+" during startup... NOTE: This can take a while."); - recoveryFuture.get(); // NOTE: this could potentially block for - // minutes or more! - // TODO: public as recovering in the mean time? - // TODO: in the future we could do peersync in parallel with recoverFromLog - } else { - log.info("No LogReplay needed for core=" + core.getName() + " baseURL=" + baseUrl); + // recover from local transaction log and wait for it to complete before + // going active + // TODO: should this be moved to another thread? To recoveryStrat? + // TODO: should this actually be done earlier, before (or as part of) + // leader election perhaps? + + UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); + if (!core.isReloaded() && ulog != null) { + // disable recovery in case shard is in construction state (for shard splits) + Slice slice = getClusterState().getSlice(collection, shardId); + if (!Slice.CONSTRUCTION.equals(slice.getState()) || !isLeader) { + Future recoveryFuture = core.getUpdateHandler() + .getUpdateLog().recoverFromLog(); + if (recoveryFuture != null) { + log.info("Replaying tlog for " + ourUrl + " during startup... NOTE: This can take a while."); + recoveryFuture.get(); // NOTE: this could potentially block for + // minutes or more! + // TODO: public as recovering in the mean time? + // TODO: in the future we could do peersync in parallel with recoverFromLog + } else { + log.info("No LogReplay needed for core=" + core.getName() + " baseURL=" + baseUrl); + } + } + boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc, + collection, coreZkNodeName, shardId, leaderProps, core, cc); + if (!didRecovery) { + publish(desc, ZkStateReader.ACTIVE); } } - boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc, - collection, coreZkNodeName, shardId, leaderProps, core, cc); - if (!didRecovery) { - publish(desc, ZkStateReader.ACTIVE); - } } + + // make sure we have an update cluster state right away + zkStateReader.updateClusterState(true); + return shardId; + } finally { + MDCUtils.cleanupMDC(previousMDCContext); } - - // make sure we have an update cluster state right away - zkStateReader.updateClusterState(true); - return shardId; } // timeoutms is the timeout for the first call to get the leader - there is then @@ -1110,74 +1118,85 @@ public final class ZkController { } } String collection = cd.getCloudDescriptor().getCollectionName(); - log.info("publishing core={} state={} collection={}", cd.getName(), state, collection); - //System.out.println(Thread.currentThread().getStackTrace()[3]); - Integer numShards = cd.getCloudDescriptor().getNumShards(); - if (numShards == null) { //XXX sys prop hack - log.info("numShards not found on descriptor - reading it from system property"); - numShards = Integer.getInteger(ZkStateReader.NUM_SHARDS_PROP); - } - - assert collection != null && collection.length() > 0; - - String shardId = cd.getCloudDescriptor().getShardId(); - String coreNodeName = cd.getCloudDescriptor().getCoreNodeName(); - // If the leader initiated recovery, then verify that this replica has performed - // recovery as requested before becoming active; don't even look at lirState if going down - if (!ZkStateReader.DOWN.equals(state)) { - String lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreNodeName); - if (lirState != null) { - if (ZkStateReader.ACTIVE.equals(state)) { - // trying to become active, so leader-initiated state must be recovering - if (ZkStateReader.RECOVERING.equals(lirState)) { - updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.ACTIVE, null, true); - } else if (ZkStateReader.DOWN.equals(lirState)) { - throw new SolrException(ErrorCode.INVALID_STATE, - "Cannot publish state of core '"+cd.getName()+"' as active without recovering first!"); - } - } else if (ZkStateReader.RECOVERING.equals(state)) { - // if it is currently DOWN, then trying to enter into recovering state is good - if (ZkStateReader.DOWN.equals(lirState)) { - updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.RECOVERING, null, true); + + Map previousMDCContext = MDC.getCopyOfContextMap(); + MDCUtils.setCollection(collection); + + try { + if (cd != null && cd.getName() != null) + MDCUtils.setCore(cd.getName()); + log.info("publishing core={} state={} collection={}", cd.getName(), state, collection); + //System.out.println(Thread.currentThread().getStackTrace()[3]); + Integer numShards = cd.getCloudDescriptor().getNumShards(); + if (numShards == null) { //XXX sys prop hack + log.info("numShards not found on descriptor - reading it from system property"); + numShards = Integer.getInteger(ZkStateReader.NUM_SHARDS_PROP); + } + + assert collection != null && collection.length() > 0; + + String shardId = cd.getCloudDescriptor().getShardId(); + MDCUtils.setShard(shardId); + String coreNodeName = cd.getCloudDescriptor().getCoreNodeName(); + // If the leader initiated recovery, then verify that this replica has performed + // recovery as requested before becoming active; don't even look at lirState if going down + if (!ZkStateReader.DOWN.equals(state)) { + String lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreNodeName); + if (lirState != null) { + if (ZkStateReader.ACTIVE.equals(state)) { + // trying to become active, so leader-initiated state must be recovering + if (ZkStateReader.RECOVERING.equals(lirState)) { + updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.ACTIVE, null, true); + } else if (ZkStateReader.DOWN.equals(lirState)) { + throw new SolrException(ErrorCode.INVALID_STATE, + "Cannot publish state of core '" + cd.getName() + "' as active without recovering first!"); + } + } else if (ZkStateReader.RECOVERING.equals(state)) { + // if it is currently DOWN, then trying to enter into recovering state is good + if (ZkStateReader.DOWN.equals(lirState)) { + updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.RECOVERING, null, true); + } } } } - } - - Map props = new HashMap<>(); - props.put(Overseer.QUEUE_OPERATION, "state"); - props.put(ZkStateReader.STATE_PROP, state); - props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl()); - props.put(ZkStateReader.CORE_NAME_PROP, cd.getName()); - props.put(ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles()); - props.put(ZkStateReader.NODE_NAME_PROP, getNodeName()); - props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId()); - props.put(ZkStateReader.COLLECTION_PROP, collection); - if (numShards != null) { - props.put(ZkStateReader.NUM_SHARDS_PROP, numShards.toString()); - } - if (coreNodeName != null) { - props.put(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName); - } - - if (ClusterStateUtil.isAutoAddReplicas(getZkStateReader(), collection)) { - try (SolrCore core = cc.getCore(cd.getName())) { - if (core != null && core.getDirectoryFactory().isSharedStorage()) { - props.put("dataDir", core.getDataDir()); - UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); - if (ulog != null) { - props.put("ulogDir", ulog.getLogDir()); + + Map props = new HashMap<>(); + props.put(Overseer.QUEUE_OPERATION, "state"); + props.put(ZkStateReader.STATE_PROP, state); + props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl()); + props.put(ZkStateReader.CORE_NAME_PROP, cd.getName()); + props.put(ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles()); + props.put(ZkStateReader.NODE_NAME_PROP, getNodeName()); + props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId()); + props.put(ZkStateReader.COLLECTION_PROP, collection); + if (numShards != null) { + props.put(ZkStateReader.NUM_SHARDS_PROP, numShards.toString()); + } + if (coreNodeName != null) { + props.put(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName); + } + + if (ClusterStateUtil.isAutoAddReplicas(getZkStateReader(), collection)) { + try (SolrCore core = cc.getCore(cd.getName())) { + if (core != null && core.getDirectoryFactory().isSharedStorage()) { + props.put("dataDir", core.getDataDir()); + UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); + if (ulog != null) { + props.put("ulogDir", ulog.getLogDir()); + } } } } + + ZkNodeProps m = new ZkNodeProps(props); + + if (updateLastState) { + cd.getCloudDescriptor().lastPublished = state; + } + overseerJobQueue.offer(ZkStateReader.toJSON(m)); + } finally { + MDCUtils.cleanupMDC(previousMDCContext); } - - ZkNodeProps m = new ZkNodeProps(props); - - if (updateLastState) { - cd.getCloudDescriptor().lastPublished = state; - } - overseerJobQueue.offer(ZkStateReader.toJSON(m)); } private boolean needsToBeAssignedShardId(final CoreDescriptor desc, diff --git a/solr/core/src/java/org/apache/solr/logging/MDCUtils.java b/solr/core/src/java/org/apache/solr/logging/MDCUtils.java new file mode 100644 index 00000000000..61a0aef8644 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/logging/MDCUtils.java @@ -0,0 +1,69 @@ +package org.apache.solr.logging; + +import java.util.Map; + +import org.apache.solr.common.cloud.ZkStateReader; +import org.slf4j.MDC; + +import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class MDCUtils { + public static void cleanupMDC(Map previousMDCContext) { + if (previousMDCContext != null) + MDC.setContextMap(previousMDCContext); + } + + public static void setMDC (String collection, String shard, String replica, String core) { + setCollection(collection); + setShard(shard); + setReplica(replica); + setCore(core); + } + + public static void setCollection(String collection) { + if (collection != null) + MDC.put(COLLECTION_PROP, collection); + } + + public static void setShard(String shard) { + if (shard != null) + MDC.put(SHARD_ID_PROP, shard); + } + + public static void setReplica(String replica) { + if (replica != null) + MDC.put(REPLICA_PROP, replica); + } + + public static void setCore(String core) { + if (core != null) + MDC.put(CORE_NAME_PROP, core); + } + + public static void clearMDC() { + MDC.remove(COLLECTION_PROP); + MDC.remove(CORE_NAME_PROP); + MDC.remove(REPLICA_PROP); + MDC.remove(SHARD_ID_PROP); + } +} diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java index f08ce029685..46faf026e22 100644 --- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java +++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java @@ -60,6 +60,7 @@ import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrResourceLoader; import org.apache.solr.core.SolrXmlConfig; import org.apache.solr.handler.ContentStreamHandlerBase; +import org.apache.solr.logging.MDCUtils; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequestBase; import org.apache.solr.request.SolrRequestHandler; @@ -73,7 +74,6 @@ import org.apache.solr.update.processor.DistributingUpdateProcessorFactory; import org.apache.solr.util.RTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import javax.servlet.FilterChain; import javax.servlet.FilterConfig; import javax.servlet.ServletException; @@ -81,6 +81,7 @@ import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; + import java.io.ByteArrayInputStream; import java.io.EOFException; import java.io.IOException; @@ -219,6 +220,7 @@ public class SolrDispatchFilter extends BaseSolrFilter { } public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain, boolean retry) throws IOException, ServletException { + MDCUtils.clearMDC(); if (abortErrorMessage != null) { sendError((HttpServletResponse) response, 500, abortErrorMessage); @@ -305,11 +307,14 @@ public class SolrDispatchFilter extends BaseSolrFilter { if (core != null) { path = path.substring( idx ); + MDCUtils.setCore(core.getName()); } } if (core == null) { if (!cores.isZooKeeperAware() ) { core = cores.getCore(""); + if (core != null) + MDCUtils.setCore(core.getName()); } } } @@ -321,6 +326,7 @@ public class SolrDispatchFilter extends BaseSolrFilter { if (core != null) { // we found a core, update the path path = path.substring( idx ); + MDCUtils.setCore(core.getName()); } // if we couldn't find it locally, look on other nodes @@ -355,6 +361,7 @@ public class SolrDispatchFilter extends BaseSolrFilter { // try the default core if (core == null) { core = cores.getCore(""); + MDCUtils.setCore(core.getName()); } } diff --git a/solr/core/src/java/org/apache/solr/util/SolrLogLayout.java b/solr/core/src/java/org/apache/solr/util/SolrLogLayout.java index 6e8f8aa33c9..4bd7d796f9a 100644 --- a/solr/core/src/java/org/apache/solr/util/SolrLogLayout.java +++ b/solr/core/src/java/org/apache/solr/util/SolrLogLayout.java @@ -11,13 +11,18 @@ import org.apache.log4j.spi.LoggingEvent; import org.apache.log4j.spi.ThrowableInformation; import org.apache.solr.cloud.ZkController; import org.apache.solr.common.SolrException; -import org.apache.solr.common.cloud.ClusterState; -import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; -import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.core.SolrCore; +import org.apache.solr.logging.MDCUtils; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrRequestInfo; +import org.slf4j.MDC; + +import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; /* * Licensed to the Apache Software Foundation (ASF) under one or more @@ -107,7 +112,7 @@ public class SolrLogLayout extends Layout { sb.append(" T"); sb.append(th.getId()); } - + @Override public String format(LoggingEvent event) { return _format(event); @@ -183,7 +188,9 @@ public class SolrLogLayout extends Layout { // useful for sequencing when looking at multiple parts of a log file, but // ms since start should be fine. appendThread(sb, event); - + + appendMDC(sb); + if (info != null) { sb.append(' ').append(info.shortId); // core } @@ -361,4 +368,17 @@ public class SolrLogLayout extends Layout { public boolean ignoresThrowable() { return false; } + + + private void appendMDC(StringBuilder sb) { + sb.append(" [" + getMDCValueOrEmpty(COLLECTION_PROP) + "] "); + sb.append("[" + getMDCValueOrEmpty(SHARD_ID_PROP) + "] "); + sb.append("[" + getMDCValueOrEmpty(REPLICA_PROP) + "] "); + sb.append("[" + getMDCValueOrEmpty(CORE_NAME_PROP)+"] "); + } + + private String getMDCValueOrEmpty(String key) { + String val = MDC.get(key); + return val==null? "": val; + } } diff --git a/solr/server/resources/log4j.properties b/solr/server/resources/log4j.properties index 83f649a709f..6c337a10bba 100644 --- a/solr/server/resources/log4j.properties +++ b/solr/server/resources/log4j.properties @@ -5,7 +5,7 @@ log4j.rootLogger=INFO, file, CONSOLE log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x \u2013 %m%n +log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x [%X{collection}] [%X{shard}] [%X{replica}] [%X{core}] \u2013 %m%n #- size rotation with log cleanup. log4j.appender.file=org.apache.log4j.RollingFileAppender diff --git a/solr/solrj/src/java/org/apache/solr/common/SolrException.java b/solr/solrj/src/java/org/apache/solr/common/SolrException.java index 02bbc045004..f6f353cb455 100644 --- a/solr/solrj/src/java/org/apache/solr/common/SolrException.java +++ b/solr/solrj/src/java/org/apache/solr/common/SolrException.java @@ -19,18 +19,21 @@ package org.apache.solr.common; import java.io.CharArrayWriter; import java.io.PrintWriter; +import java.util.Map; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; - import org.apache.solr.common.util.NamedList; import org.slf4j.Logger; +import org.slf4j.MDC; /** * */ public class SolrException extends RuntimeException { + final private Map mdcContext; + /** * This list of valid HTTP Status error codes that Solr may return in * the case of a "Server Side" error. @@ -65,15 +68,18 @@ public class SolrException extends RuntimeException { public SolrException(ErrorCode code, String msg) { super(msg); this.code = code.code; + this.mdcContext = MDC.getCopyOfContextMap(); } public SolrException(ErrorCode code, String msg, Throwable th) { super(msg, th); this.code = code.code; + this.mdcContext = MDC.getCopyOfContextMap(); } public SolrException(ErrorCode code, Throwable th) { super(th); this.code = code.code; + this.mdcContext = MDC.getCopyOfContextMap(); } /** @@ -84,6 +90,7 @@ public class SolrException extends RuntimeException { protected SolrException(int code, String msg, Throwable th) { super(msg, th); this.code = code; + this.mdcContext = MDC.getCopyOfContextMap(); } int code=0; @@ -205,4 +212,34 @@ public class SolrException extends RuntimeException { return t; } + public void logInfoWithMdc(Logger logger, String msg) { + Map previousMdcContext = MDC.getCopyOfContextMap(); + MDC.setContextMap(mdcContext); + try { + logger.info(msg); + } finally{ + MDC.setContextMap(previousMdcContext); + } + } + + public void logDebugWithMdc(Logger logger, String msg) { + Map previousMdcContext = MDC.getCopyOfContextMap(); + MDC.setContextMap(mdcContext); + try { + logger.debug(msg); + } finally{ + MDC.setContextMap(previousMdcContext); + } + } + + public void logWarnWithMdc(Logger logger, String msg) { + Map previousMdcContext = MDC.getCopyOfContextMap(); + MDC.setContextMap(mdcContext); + try { + logger.warn(msg); + } finally{ + MDC.setContextMap(previousMdcContext); + } + } + }