SOLR-5452: Do not attempt to proxy internal update requests.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1543037 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2013-11-18 14:31:18 +00:00
parent d9e8a31a15
commit 4f1a857977
3 changed files with 32 additions and 23 deletions

View File

@ -108,6 +108,8 @@ Bug Fixes
* SOLR-5451: SyncStrategy closes it's http connection manager before the
executor that uses it in it's close method. (Mark Miller)
* SOLR-5452: Do not attempt to proxy internal update requests. (Mark Miller)
Other Changes
---------------------

View File

@ -52,6 +52,7 @@ import org.apache.solr.response.QueryResponseWriter;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.servlet.cache.HttpCacheHeaderUtil;
import org.apache.solr.servlet.cache.Method;
import org.apache.solr.update.processor.DistributingUpdateProcessorFactory;
import org.apache.solr.util.FastWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -313,7 +314,9 @@ public class SolrDispatchFilter implements Filter
// if we couldn't find it locally, look on other nodes
if (core == null && idx > 0) {
String coreUrl = getRemotCoreUrl(cores, corename, origCorename);
if (coreUrl != null) {
Map<String,String[]> params = req.getParameterMap();
// don't proxy for internal update requests
if (coreUrl != null && (params == null || !params.containsKey(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM))) {
path = path.substring( idx );
remoteQuery(coreUrl + path, req, solrReq, resp);
return;

View File

@ -92,6 +92,10 @@ import org.slf4j.LoggerFactory;
// NOT mt-safe... create a new processor for each add thread
// TODO: we really should not wait for distrib after local? unless a certain replication factor is asked for
public class DistributedUpdateProcessor extends UpdateRequestProcessor {
public static final String DISTRIB_FROM_SHARD = "distrib.from.shard";
public static final String DISTRIB_FROM_COLLECTION = "distrib.from.collection";
public static final String DISTRIB_FROM_PARENT = "distrib.from.parent";
public static final String DISTRIB_FROM = "distrib.from";
private static final String TEST_DISTRIB_SKIP_SERVERS = "test.distrib.skip.servers";
public final static Logger log = LoggerFactory.getLogger(DistributedUpdateProcessor.class);
@ -252,7 +256,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// if request is coming from another collection then we want it to be sent to all replicas
// even if it's phase is FROMLEADER
String fromCollection = updateCommand.getReq().getParams().get("distrib.from.collection");
String fromCollection = updateCommand.getReq().getParams().get(DISTRIB_FROM_COLLECTION);
if (DistribPhase.FROMLEADER == phase && !isSubShardLeader && fromCollection == null) {
// we are coming from the leader, just go local - add no urls
@ -443,13 +447,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
boolean isReplayOrPeersync = (updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
if (isReplayOrPeersync) return;
String from = req.getParams().get("distrib.from");
String from = req.getParams().get(DISTRIB_FROM);
ClusterState clusterState = zkController.getClusterState();
CloudDescriptor cloudDescriptor = req.getCore().getCoreDescriptor().getCloudDescriptor();
Slice mySlice = clusterState.getSlice(collection, cloudDescriptor.getShardId());
boolean localIsLeader = cloudDescriptor.isLeader();
if (DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay
String fromShard = req.getParams().get("distrib.from.parent");
String fromShard = req.getParams().get(DISTRIB_FROM_PARENT);
if (fromShard != null) {
if (Slice.ACTIVE.equals(mySlice.getState())) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
@ -464,7 +468,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
"Request says it is coming from parent shard leader but parent hash range is not superset of my range");
}
} else {
String fromCollection = req.getParams().get("distrib.from.collection"); // is it because of a routing rule?
String fromCollection = req.getParams().get(DISTRIB_FROM_COLLECTION); // is it because of a routing rule?
if (fromCollection == null) {
log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString());
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
@ -543,9 +547,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (subShardLeaders != null && !subShardLeaders.isEmpty()) {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
params.set("distrib.from.parent", req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
params.set(DISTRIB_FROM_PARENT, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
for (Node subShardLeader : subShardLeaders) {
cmdDistrib.distribAdd(cmd, Collections.singletonList(subShardLeader), params, true);
}
@ -554,10 +558,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
params.set("distrib.from.collection", req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName());
params.set("distrib.from.shard", req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
params.set(DISTRIB_FROM_COLLECTION, req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName());
params.set(DISTRIB_FROM_SHARD, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
for (Node nodesByRoutingRule : nodesByRoutingRules) {
cmdDistrib.distribAdd(cmd, Collections.singletonList(nodesByRoutingRule), params, true);
}
@ -573,7 +577,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
DistribPhase.FROMLEADER.toString() :
DistribPhase.TOLEADER.toString()));
if (isLeader || isSubShardLeader) {
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
}
@ -727,7 +731,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
boolean leaderLogic = isLeader && !isReplayOrPeersync;
boolean forwardedFromCollection = cmd.getReq().getParams().get("distrib.from.collection") != null;
boolean forwardedFromCollection = cmd.getReq().getParams().get(DISTRIB_FROM_COLLECTION) != null;
VersionBucket bucket = vinfo.bucket(bucketHash);
@ -972,9 +976,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (subShardLeaders != null && !subShardLeaders.isEmpty()) {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
params.set("distrib.from.parent", cloudDesc.getShardId());
params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId());
cmdDistrib.distribDelete(cmd, subShardLeaders, params, true);
}
@ -982,10 +986,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
params.set("distrib.from.collection", req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName());
params.set("distrib.from.shard", req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
params.set(DISTRIB_FROM_COLLECTION, req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName());
params.set(DISTRIB_FROM_SHARD, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
for (Node nodesByRoutingRule : nodesByRoutingRules) {
cmdDistrib.distribDelete(cmd, Collections.singletonList(nodesByRoutingRule), params, true);
}
@ -1002,7 +1006,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
DistribPhase.FROMLEADER.toString() :
DistribPhase.TOLEADER.toString()));
if (isLeader || isSubShardLeader) {
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
}
cmdDistrib.distribDelete(cmd, nodes, params);
@ -1180,7 +1184,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(VERSION_FIELD, Long.toString(cmd.getVersion()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
params.set("update.from", ZkCoreNodeProps.getCoreUrl(
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
boolean someReplicas = false;
@ -1217,10 +1221,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
params.set("distrib.from.collection", req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName());
params.set("distrib.from.shard", req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
params.set(DISTRIB_FROM_COLLECTION, req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName());
params.set(DISTRIB_FROM_SHARD, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
cmdDistrib.distribDelete(cmd, nodesByRoutingRules, params, true);
}
if (replicas != null) {
@ -1315,7 +1319,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
boolean leaderLogic = isLeader && !isReplayOrPeersync;
boolean forwardedFromCollection = cmd.getReq().getParams().get("distrib.from.collection") != null;
boolean forwardedFromCollection = cmd.getReq().getParams().get(DISTRIB_FROM_COLLECTION) != null;
if (!leaderLogic && versionOnUpdate==0) {
throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");