From 4f1a857977a50f09ad7bd8a08b9f77d5e048e011 Mon Sep 17 00:00:00 2001 From: Mark Robert Miller Date: Mon, 18 Nov 2013 14:31:18 +0000 Subject: [PATCH] SOLR-5452: Do not attempt to proxy internal update requests. git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1543037 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 2 + .../solr/servlet/SolrDispatchFilter.java | 5 +- .../processor/DistributedUpdateProcessor.java | 48 ++++++++++--------- 3 files changed, 32 insertions(+), 23 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index a13187ec850..e3a1bc4098f 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -108,6 +108,8 @@ Bug Fixes * SOLR-5451: SyncStrategy closes it's http connection manager before the executor that uses it in it's close method. (Mark Miller) +* SOLR-5452: Do not attempt to proxy internal update requests. (Mark Miller) + Other Changes --------------------- diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java index 0374ca01c3f..57213215f67 100644 --- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java +++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java @@ -52,6 +52,7 @@ import org.apache.solr.response.QueryResponseWriter; import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.servlet.cache.HttpCacheHeaderUtil; import org.apache.solr.servlet.cache.Method; +import org.apache.solr.update.processor.DistributingUpdateProcessorFactory; import org.apache.solr.util.FastWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -313,7 +314,9 @@ public class SolrDispatchFilter implements Filter // if we couldn't find it locally, look on other nodes if (core == null && idx > 0) { String coreUrl = getRemotCoreUrl(cores, corename, origCorename); - if (coreUrl != null) { + Map params = req.getParameterMap(); + // don't proxy for internal update requests + if (coreUrl != null && (params == null || !params.containsKey(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM))) { path = path.substring( idx ); remoteQuery(coreUrl + path, req, solrReq, resp); return; diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 78429964100..6120337e1cf 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -92,6 +92,10 @@ import org.slf4j.LoggerFactory; // NOT mt-safe... create a new processor for each add thread // TODO: we really should not wait for distrib after local? unless a certain replication factor is asked for public class DistributedUpdateProcessor extends UpdateRequestProcessor { + public static final String DISTRIB_FROM_SHARD = "distrib.from.shard"; + public static final String DISTRIB_FROM_COLLECTION = "distrib.from.collection"; + public static final String DISTRIB_FROM_PARENT = "distrib.from.parent"; + public static final String DISTRIB_FROM = "distrib.from"; private static final String TEST_DISTRIB_SKIP_SERVERS = "test.distrib.skip.servers"; public final static Logger log = LoggerFactory.getLogger(DistributedUpdateProcessor.class); @@ -252,7 +256,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // if request is coming from another collection then we want it to be sent to all replicas // even if it's phase is FROMLEADER - String fromCollection = updateCommand.getReq().getParams().get("distrib.from.collection"); + String fromCollection = updateCommand.getReq().getParams().get(DISTRIB_FROM_COLLECTION); if (DistribPhase.FROMLEADER == phase && !isSubShardLeader && fromCollection == null) { // we are coming from the leader, just go local - add no urls @@ -443,13 +447,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { boolean isReplayOrPeersync = (updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0; if (isReplayOrPeersync) return; - String from = req.getParams().get("distrib.from"); + String from = req.getParams().get(DISTRIB_FROM); ClusterState clusterState = zkController.getClusterState(); CloudDescriptor cloudDescriptor = req.getCore().getCoreDescriptor().getCloudDescriptor(); Slice mySlice = clusterState.getSlice(collection, cloudDescriptor.getShardId()); boolean localIsLeader = cloudDescriptor.isLeader(); if (DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay - String fromShard = req.getParams().get("distrib.from.parent"); + String fromShard = req.getParams().get(DISTRIB_FROM_PARENT); if (fromShard != null) { if (Slice.ACTIVE.equals(mySlice.getState())) { throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, @@ -464,7 +468,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { "Request says it is coming from parent shard leader but parent hash range is not superset of my range"); } } else { - String fromCollection = req.getParams().get("distrib.from.collection"); // is it because of a routing rule? + String fromCollection = req.getParams().get(DISTRIB_FROM_COLLECTION); // is it because of a routing rule? if (fromCollection == null) { log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString()); throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader"); @@ -543,9 +547,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { if (subShardLeaders != null && !subShardLeaders.isEmpty()) { ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); - params.set("distrib.from", ZkCoreNodeProps.getCoreUrl( + params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); - params.set("distrib.from.parent", req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); + params.set(DISTRIB_FROM_PARENT, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); for (Node subShardLeader : subShardLeaders) { cmdDistrib.distribAdd(cmd, Collections.singletonList(subShardLeader), params, true); } @@ -554,10 +558,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) { ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); - params.set("distrib.from", ZkCoreNodeProps.getCoreUrl( + params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); - params.set("distrib.from.collection", req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName()); - params.set("distrib.from.shard", req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); + params.set(DISTRIB_FROM_COLLECTION, req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName()); + params.set(DISTRIB_FROM_SHARD, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); for (Node nodesByRoutingRule : nodesByRoutingRules) { cmdDistrib.distribAdd(cmd, Collections.singletonList(nodesByRoutingRule), params, true); } @@ -573,7 +577,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { DistribPhase.FROMLEADER.toString() : DistribPhase.TOLEADER.toString())); if (isLeader || isSubShardLeader) { - params.set("distrib.from", ZkCoreNodeProps.getCoreUrl( + params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); } @@ -727,7 +731,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0; boolean leaderLogic = isLeader && !isReplayOrPeersync; - boolean forwardedFromCollection = cmd.getReq().getParams().get("distrib.from.collection") != null; + boolean forwardedFromCollection = cmd.getReq().getParams().get(DISTRIB_FROM_COLLECTION) != null; VersionBucket bucket = vinfo.bucket(bucketHash); @@ -972,9 +976,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { if (subShardLeaders != null && !subShardLeaders.isEmpty()) { ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); - params.set("distrib.from", ZkCoreNodeProps.getCoreUrl( + params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); - params.set("distrib.from.parent", cloudDesc.getShardId()); + params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId()); cmdDistrib.distribDelete(cmd, subShardLeaders, params, true); } @@ -982,10 +986,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) { ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); - params.set("distrib.from", ZkCoreNodeProps.getCoreUrl( + params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); - params.set("distrib.from.collection", req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName()); - params.set("distrib.from.shard", req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); + params.set(DISTRIB_FROM_COLLECTION, req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName()); + params.set(DISTRIB_FROM_SHARD, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); for (Node nodesByRoutingRule : nodesByRoutingRules) { cmdDistrib.distribDelete(cmd, Collections.singletonList(nodesByRoutingRule), params, true); } @@ -1002,7 +1006,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { DistribPhase.FROMLEADER.toString() : DistribPhase.TOLEADER.toString())); if (isLeader || isSubShardLeader) { - params.set("distrib.from", ZkCoreNodeProps.getCoreUrl( + params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); } cmdDistrib.distribDelete(cmd, nodes, params); @@ -1180,7 +1184,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); params.set(VERSION_FIELD, Long.toString(cmd.getVersion())); params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); - params.set("update.from", ZkCoreNodeProps.getCoreUrl( + params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); boolean someReplicas = false; @@ -1217,10 +1221,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) { params = new ModifiableSolrParams(filterParams(req.getParams())); params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); - params.set("distrib.from", ZkCoreNodeProps.getCoreUrl( + params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); - params.set("distrib.from.collection", req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName()); - params.set("distrib.from.shard", req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); + params.set(DISTRIB_FROM_COLLECTION, req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName()); + params.set(DISTRIB_FROM_SHARD, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); cmdDistrib.distribDelete(cmd, nodesByRoutingRules, params, true); } if (replicas != null) { @@ -1315,7 +1319,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0; boolean leaderLogic = isLeader && !isReplayOrPeersync; - boolean forwardedFromCollection = cmd.getReq().getParams().get("distrib.from.collection") != null; + boolean forwardedFromCollection = cmd.getReq().getParams().get(DISTRIB_FROM_COLLECTION) != null; if (!leaderLogic && versionOnUpdate==0) { throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");