SOLR-7130: Make stale state notification work without failing the requests

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1662439 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Noble Paul 2015-02-26 13:04:28 +00:00
parent ed358ad9ad
commit f511424d43
5 changed files with 74 additions and 39 deletions

View File

@ -208,6 +208,9 @@ Other Changes
* SOLR-7160: Rename ConfigSolr to NodeConfig, and decouple it from xml
representation (Alan Woodward)
* SOLR-7130: Make stale state notification work without failing the requests
(Noble Paul, shalin)
================== 5.0.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -89,6 +89,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -96,6 +97,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import static java.util.Collections.singletonMap;
/**
* This filter looks at the incoming URL maps them to handlers defined in solrconfig.xml
*
@ -220,6 +223,8 @@ public class SolrDispatchFilter extends BaseSolrFilter {
SolrCore core = null;
SolrQueryRequest solrReq = null;
Aliases aliases = null;
//The states of client that is invalid in this request
Map<String, Integer> invalidStates = null;
if( request instanceof HttpServletRequest) {
HttpServletRequest req = (HttpServletRequest)request;
@ -310,11 +315,15 @@ public class SolrDispatchFilter extends BaseSolrFilter {
String coreUrl = getRemotCoreUrl(cores, corename, origCorename);
// don't proxy for internal update requests
SolrParams queryParams = SolrRequestParsers.parseQueryString(req.getQueryString());
checkStateIsValid(cores, queryParams.get(CloudSolrClient.STATE_VERSION));
invalidStates = checkStateIsValid(cores, queryParams.get(CloudSolrClient.STATE_VERSION));
if (coreUrl != null
&& queryParams
.get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM) == null) {
path = path.substring(idx);
if (invalidStates != null) {
//it does not make sense to send the request to a remote node
throw new SolrException(ErrorCode.INVALID_STATE, new String(ZkStateReader.toJSON(invalidStates), org.apache.lucene.util.IOUtils.UTF_8));
}
remoteQuery(coreUrl + path, req, solrReq, resp);
return;
} else {
@ -371,7 +380,7 @@ public class SolrDispatchFilter extends BaseSolrFilter {
if( "/select".equals( path ) || "/select/".equals( path ) ) {
solrReq = parser.parse( core, path, req );
checkStateIsValid(cores,solrReq.getParams().get(CloudSolrClient.STATE_VERSION));
invalidStates = checkStateIsValid(cores,solrReq.getParams().get(CloudSolrClient.STATE_VERSION));
String qt = solrReq.getParams().get( CommonParams.QT );
handler = core.getRequestHandler( qt );
if( handler == null ) {
@ -418,6 +427,7 @@ public class SolrDispatchFilter extends BaseSolrFilter {
resp.addHeader(entry.getKey(), entry.getValue());
}
QueryResponseWriter responseWriter = core.getQueryResponseWriter(solrReq);
if(invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates);
writeResponse(solrRsp, response, responseWriter, solrReq, reqMethod);
}
return; // we are done with a valid handler
@ -461,21 +471,24 @@ public class SolrDispatchFilter extends BaseSolrFilter {
chain.doFilter(request, response);
}
private void checkStateIsValid(CoreContainer cores, String stateVer) {
private Map<String , Integer> checkStateIsValid(CoreContainer cores, String stateVer) {
Map<String, Integer> result = null;
String[] pairs = null;
if (stateVer != null && !stateVer.isEmpty() && cores.isZooKeeperAware()) {
// many have multiple collections separated by |
String[] pairs = StringUtils.split(stateVer, '|');
pairs = StringUtils.split(stateVer, '|');
for (String pair : pairs) {
String[] pcs = StringUtils.split(pair, ':');
if (pcs.length == 2 && !pcs[0].isEmpty() && !pcs[1].isEmpty()) {
Boolean status = cores.getZkController().getZkStateReader().checkValid(pcs[0], Integer.parseInt(pcs[1]));
if (Boolean.TRUE != status) {
throw new SolrException(ErrorCode.INVALID_STATE, "STATE STALE: " + pair + "valid : " + status);
Integer status = cores.getZkController().getZkStateReader().compareStateVersions(pcs[0], Integer.parseInt(pcs[1]));
if(status != null ){
if(result == null) result = new HashMap<>();
result.put(pcs[0], status);
}
}
}
}
return result;
}
private void processAliases(SolrQueryRequest solrReq, Aliases aliases,
@ -747,6 +760,11 @@ public class SolrDispatchFilter extends BaseSolrFilter {
QueryResponseWriter responseWriter, SolrQueryRequest solrReq, Method reqMethod)
throws IOException {
try {
Object invalidStates = solrReq.getContext().get(CloudSolrClient.STATE_VERSION);
//This is the last item added to the response and the client would expect it that way.
//If that assumption is changed , it would fail. This is done to avoid an O(n) scan on
// the response for each request
if(invalidStates != null) solrRsp.add(CloudSolrClient.STATE_VERSION, invalidStates);
// Now write it out
final String ct = responseWriter.getContentType(solrReq, solrRsp);
// don't call setContentType on null

View File

@ -529,7 +529,7 @@ public class CloudSolrClient extends SolrClient {
}
}
DocCollection col = getDocCollection(clusterState, collection);
DocCollection col = getDocCollection(clusterState, collection,null);
DocRouter router = col.getRouter();
@ -774,7 +774,7 @@ public class CloudSolrClient extends SolrClient {
StringBuilder stateVerParamBuilder = null;
for (String requestedCollection : requestedCollectionNames) {
// track the version of state we're using on the client side using the _stateVer_ param
DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection);
DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection,null);
int collVer = coll.getZNodeVersion();
if (coll.getStateFormat()>1) {
if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
@ -807,6 +807,15 @@ public class CloudSolrClient extends SolrClient {
NamedList<Object> resp = null;
try {
resp = sendRequest(request);
Object o = resp.get(STATE_VERSION, resp.size()-1);
if(o != null && o instanceof Map) {
Map invalidStates = (Map) o;
for (Object invalidEntries : invalidStates.entrySet()) {
Map.Entry e = (Map.Entry) invalidEntries;
getDocCollection(getZkStateReader().getClusterState(),(String)e.getKey(), (Integer)e.getValue());
}
}
} catch (Exception exc) {
Throwable rootCause = SolrException.getRootCause(exc);
@ -860,7 +869,7 @@ public class CloudSolrClient extends SolrClient {
!requestedCollections.isEmpty() &&
wasCommError) {
for (DocCollection ext : requestedCollections) {
DocCollection latestStateFromZk = getDocCollection(zkStateReader.getClusterState(), ext.getName());
DocCollection latestStateFromZk = getDocCollection(zkStateReader.getClusterState(), ext.getName(),null);
if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
// looks like we couldn't reach the server because the state was stale == retry
stateWasStale = true;
@ -949,7 +958,7 @@ public class CloudSolrClient extends SolrClient {
// add it to the Map of slices.
Map<String,Slice> slices = new HashMap<>();
for (String collectionName : collectionNames) {
DocCollection col = getDocCollection(clusterState, collectionName);
DocCollection col = getDocCollection(clusterState, collectionName, null);
Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
ClientUtils.addSlices(slices, collectionName, routeSlices, true);
}
@ -1099,10 +1108,13 @@ public class CloudSolrClient extends SolrClient {
}
protected DocCollection getDocCollection(ClusterState clusterState, String collection) throws SolrException {
protected DocCollection getDocCollection(ClusterState clusterState, String collection, Integer expectedVersion) throws SolrException {
if(collection == null) return null;
DocCollection col = getFromCache(collection);
if(col != null) return col;
if(col != null) {
if(expectedVersion == null) return col;
if(expectedVersion.intValue() == col.getZNodeVersion()) return col;
}
ClusterState.CollectionRef ref = clusterState.getCollectionRef(collection);
if(ref == null){
@ -1118,8 +1130,15 @@ public class CloudSolrClient extends SolrClient {
synchronized (lock){
//we have waited for sometime just check once again
col = getFromCache(collection);
if(col !=null) return col;
col = ref.get();
if(col !=null) {
if(expectedVersion == null) return col;
if(expectedVersion.intValue() == col.getZNodeVersion()) {
return col;
} else {
collectionStateCache.remove(collection);
}
}
col = ref.get();//this is a call to ZK
}
if(col == null ) return null;
if(col.getStateFormat() >1) collectionStateCache.put(collection, new ExpiringCachedDocCollection(col));

View File

@ -268,13 +268,13 @@ public class ZkStateReader implements Closeable {
return aliases;
}
public Boolean checkValid(String coll, int version) {
public Integer compareStateVersions(String coll, int version) {
DocCollection collection = clusterState.getCollectionOrNull(coll);
if (collection == null) return null;
if (collection.getZNodeVersion() < version) {
log.debug("server older than client {}<{}", collection.getZNodeVersion(), version);
DocCollection nu = getCollectionLive(this, coll);
if (nu == null) return null;
if (nu == null) return -1 ;
if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
updateWatchedCollection(nu);
collection = nu;
@ -282,12 +282,12 @@ public class ZkStateReader implements Closeable {
}
if (collection.getZNodeVersion() == version) {
return Boolean.TRUE;
return null;
}
log.debug("wrong version from client {}!={} ", version, collection.getZNodeVersion());
return Boolean.FALSE;
return collection.getZNodeVersion();
}
public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,

View File

@ -502,7 +502,7 @@ public class CloudSolrClientTest extends AbstractFullDistribZkTestBase {
try (CloudSolrClient client = createCloudClient(null)) {
String collectionName = "checkStateVerCol";
createCollection(collectionName, client, 2, 2);
createCollection(collectionName, client, 1, 3);
waitForRecoveriesToFinish(collectionName, false);
DocCollection coll = client.getZkStateReader().getClusterState().getCollection(collectionName);
Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next();
@ -520,19 +520,13 @@ public class CloudSolrClientTest extends AbstractFullDistribZkTestBase {
q.setParam(CloudSolrClient.STATE_VERSION, collectionName + ":" + (coll.getZNodeVersion() - 1)); //an older version expect error
try {
solrClient.query(q);
log.info("expected query error");
} catch (HttpSolrClient.RemoteSolrException e) {
sse = e;
QueryResponse rsp = solrClient.query(q);
Map m = (Map) rsp.getResponse().get(CloudSolrClient.STATE_VERSION, rsp.getResponse().size()-1);
assertNotNull("Expected an extra information from server with the list of invalid collection states", m);
assertNotNull(m.get(collectionName));
}
assertNotNull(sse);
assertEquals(" Error code should be ", sse.code(), SolrException.ErrorCode.INVALID_STATE.code);
}
//now send the request to another node that does n ot serve the collection
//now send the request to another node that does not serve the collection
Set<String> allNodesOfColl = new HashSet<>();
for (Slice slice : coll.getSlices()) {
@ -541,27 +535,28 @@ public class CloudSolrClientTest extends AbstractFullDistribZkTestBase {
}
}
String theNode = null;
for (String s : client.getZkStateReader().getClusterState().getLiveNodes()) {
Set<String> liveNodes = client.getZkStateReader().getClusterState().getLiveNodes();
for (String s : liveNodes) {
String n = client.getZkStateReader().getBaseUrlForNodeName(s);
if(!allNodesOfColl.contains(s)){
if(!allNodesOfColl.contains(n)){
theNode = n;
break;
}
}
log.info("thenode which does not serve this collection{} ",theNode);
log.info("the node which does not serve this collection{} ",theNode);
assertNotNull(theNode);
try (SolrClient solrClient = new HttpSolrClient(theNode + "/"+collectionName)) {
q.setParam(CloudSolrClient.STATE_VERSION, collectionName + ":" + coll.getZNodeVersion());
q.setParam(CloudSolrClient.STATE_VERSION, collectionName + ":" + (coll.getZNodeVersion()-1));
try {
solrClient.query(q);
QueryResponse rsp = solrClient.query(q);
log.info("error was expected");
} catch (HttpSolrClient.RemoteSolrException e) {
sse = e;
}
assertNotNull(sse);
assertEquals(" Error code should be ", sse.code(), SolrException.ErrorCode.INVALID_STATE.code);
assertEquals(" Error code should be 510", SolrException.ErrorCode.INVALID_STATE.code, sse.code());
}
}