mirror of https://github.com/apache/lucene.git
SOLR-3710: Change CloudSolrServer so that update requests are only sent to leaders by default.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1369484 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2ba5d018e7
commit
fec289ee99
|
@ -117,6 +117,9 @@ Optimizations
|
|||
|
||||
* SOLR-3709: Cache the url list created from the ClusterState in CloudSolrServer on each
|
||||
request. (Mark Miller, yonik)
|
||||
|
||||
* SOLR-3710: Change CloudSolrServer so that update requests are only sent to leaders by
|
||||
default. (Mark Miller)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.http.client.HttpClient;
|
|||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServer;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.request.IsUpdateRequest;
|
||||
import org.apache.solr.client.solrj.util.ClientUtils;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
|
@ -64,7 +65,11 @@ public class CloudSolrServer extends SolrServer {
|
|||
|
||||
// since the state shouldn't change often, should be very cheap reads
|
||||
private volatile List<String> urlList;
|
||||
private volatile List<String> leaderUrlList;
|
||||
private volatile int lastClusterStateHashCode;
|
||||
|
||||
private final boolean updatesToLeaders;
|
||||
|
||||
/**
|
||||
* @param zkHost The client endpoint of the zookeeper quorum containing the cloud state,
|
||||
* in the form HOST:PORT.
|
||||
|
@ -73,6 +78,7 @@ public class CloudSolrServer extends SolrServer {
|
|||
this.zkHost = zkHost;
|
||||
this.myClient = HttpClientUtil.createClient(null);
|
||||
this.lbServer = new LBHttpSolrServer(myClient);
|
||||
this.updatesToLeaders = true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -83,6 +89,19 @@ public class CloudSolrServer extends SolrServer {
|
|||
public CloudSolrServer(String zkHost, LBHttpSolrServer lbServer) {
|
||||
this.zkHost = zkHost;
|
||||
this.lbServer = lbServer;
|
||||
this.updatesToLeaders = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param zkHost The client endpoint of the zookeeper quorum containing the cloud state,
|
||||
* in the form HOST:PORT.
|
||||
* @param lbServer LBHttpSolrServer instance for requests.
|
||||
* @param updatesToLeaders sends updates only to leaders - defaults to true
|
||||
*/
|
||||
public CloudSolrServer(String zkHost, LBHttpSolrServer lbServer, boolean updatesToLeaders) {
|
||||
this.zkHost = zkHost;
|
||||
this.lbServer = lbServer;
|
||||
this.updatesToLeaders = updatesToLeaders;
|
||||
}
|
||||
|
||||
public ZkStateReader getZkStateReader() {
|
||||
|
@ -144,6 +163,11 @@ public class CloudSolrServer extends SolrServer {
|
|||
// TODO: if you can hash here, you could favor the shard leader
|
||||
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
boolean sendToLeaders = false;
|
||||
|
||||
if (request instanceof IsUpdateRequest && updatesToLeaders) {
|
||||
sendToLeaders = true;
|
||||
}
|
||||
|
||||
SolrParams reqParams = request.getParams();
|
||||
if (reqParams == null) {
|
||||
|
@ -158,6 +182,9 @@ public class CloudSolrServer extends SolrServer {
|
|||
// Extract each comma separated collection name and store in a List.
|
||||
List<String> collectionList = StrUtils.splitSmart(collection, ",", true);
|
||||
|
||||
// TODO: not a big deal because of the caching, but we could avoid looking at every shard
|
||||
// when getting leaders if we tweaked some things
|
||||
|
||||
// Retrieve slices from the cloud state and, for each collection specified,
|
||||
// add it to the Map of slices.
|
||||
Map<String,Slice> slices = new HashMap<String,Slice>();
|
||||
|
@ -168,11 +195,7 @@ public class CloudSolrServer extends SolrServer {
|
|||
|
||||
Set<String> liveNodes = clusterState.getLiveNodes();
|
||||
|
||||
// IDEA: have versions on various things... like a global clusterState version
|
||||
// or shardAddressVersion (which only changes when the shards change)
|
||||
// to allow caching.
|
||||
|
||||
if (clusterState.hashCode() != this.lastClusterStateHashCode) {
|
||||
if (sendToLeaders && leaderUrlList == null || !sendToLeaders && urlList == null || clusterState.hashCode() != this.lastClusterStateHashCode) {
|
||||
|
||||
// build a map of unique nodes
|
||||
// TODO: allow filtering by group, role, etc
|
||||
|
@ -185,19 +208,32 @@ public class CloudSolrServer extends SolrServer {
|
|||
if (!liveNodes.contains(coreNodeProps.getNodeName())
|
||||
|| !coreNodeProps.getState().equals(ZkStateReader.ACTIVE)) continue;
|
||||
if (nodes.put(node, nodeProps) == null) {
|
||||
String url = coreNodeProps.getCoreUrl();
|
||||
urlList.add(url);
|
||||
if (!sendToLeaders || (sendToLeaders && coreNodeProps.isLeader())) {
|
||||
String url = coreNodeProps.getCoreUrl();
|
||||
urlList.add(url);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
this.urlList = urlList;
|
||||
if (sendToLeaders) {
|
||||
this.leaderUrlList = urlList;
|
||||
} else {
|
||||
this.urlList = urlList;
|
||||
}
|
||||
this.lastClusterStateHashCode = clusterState.hashCode();
|
||||
}
|
||||
|
||||
Collections.shuffle(urlList, rand);
|
||||
List<String> theUrlList;
|
||||
if (sendToLeaders) {
|
||||
theUrlList = new ArrayList<String>(leaderUrlList.size());
|
||||
theUrlList.addAll(leaderUrlList);
|
||||
} else {
|
||||
theUrlList = new ArrayList<String>(urlList.size());
|
||||
theUrlList.addAll(urlList);
|
||||
}
|
||||
Collections.shuffle(theUrlList, rand);
|
||||
//System.out.println("########################## MAKING REQUEST TO " + urlList);
|
||||
|
||||
LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(request, urlList);
|
||||
LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(request, theUrlList);
|
||||
LBHttpSolrServer.Rsp rsp = lbServer.request(req);
|
||||
return rsp.getResponse();
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ import java.io.IOException;
|
|||
*
|
||||
*
|
||||
**/
|
||||
public abstract class AbstractUpdateRequest extends SolrRequest {
|
||||
public abstract class AbstractUpdateRequest extends SolrRequest implements IsUpdateRequest {
|
||||
protected ModifiableSolrParams params;
|
||||
protected int commitWithin = -1;
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.apache.solr.common.util.ContentStream;
|
|||
*
|
||||
* @since solr 1.3
|
||||
*/
|
||||
public class DirectXmlRequest extends SolrRequest
|
||||
public class DirectXmlRequest extends SolrRequest implements IsUpdateRequest
|
||||
{
|
||||
final String xml;
|
||||
private SolrParams params;
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
package org.apache.solr.client.solrj.request;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
/**
|
||||
* Marker class so that we can determine which requests are updates.
|
||||
*/
|
||||
public interface IsUpdateRequest {
|
||||
|
||||
}
|
|
@ -74,5 +74,9 @@ public class ZkCoreNodeProps {
|
|||
return nodeProps;
|
||||
}
|
||||
|
||||
public boolean isLeader() {
|
||||
return nodeProps.containsKey(ZkStateReader.LEADER_PROP);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue