From f511424d43d55786bfd30bc0844d859e9c226270 Mon Sep 17 00:00:00 2001 From: Noble Paul Date: Thu, 26 Feb 2015 13:04:28 +0000 Subject: [PATCH] SOLR-7130: Make stale state notification work without failing the requests git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1662439 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 3 ++ .../solr/servlet/SolrDispatchFilter.java | 36 ++++++++++++++----- .../client/solrj/impl/CloudSolrClient.java | 35 +++++++++++++----- .../solr/common/cloud/ZkStateReader.java | 8 ++--- .../solrj/impl/CloudSolrClientTest.java | 31 +++++++--------- 5 files changed, 74 insertions(+), 39 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 7e40b273e18..045bd44545d 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -208,6 +208,9 @@ Other Changes * SOLR-7160: Rename ConfigSolr to NodeConfig, and decouple it from xml representation (Alan Woodward) +* SOLR-7130: Make stale state notification work without failing the requests + (Noble Paul, shalin) + ================== 5.0.0 ================== Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java index d768f58d50d..915c683bd89 100644 --- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java +++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java @@ -89,6 +89,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Enumeration; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -96,6 +97,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import static java.util.Collections.singletonMap; + /** * This filter looks at the incoming URL maps them to handlers defined in solrconfig.xml * @@ -220,6 +223,8 @@ public class SolrDispatchFilter extends BaseSolrFilter { SolrCore core = null; SolrQueryRequest solrReq = null; Aliases aliases = null; + //The states of client that is invalid in this request + Map invalidStates = null; if( request instanceof HttpServletRequest) { HttpServletRequest req = (HttpServletRequest)request; @@ -310,11 +315,15 @@ public class SolrDispatchFilter extends BaseSolrFilter { String coreUrl = getRemotCoreUrl(cores, corename, origCorename); // don't proxy for internal update requests SolrParams queryParams = SolrRequestParsers.parseQueryString(req.getQueryString()); - checkStateIsValid(cores, queryParams.get(CloudSolrClient.STATE_VERSION)); + invalidStates = checkStateIsValid(cores, queryParams.get(CloudSolrClient.STATE_VERSION)); if (coreUrl != null && queryParams .get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM) == null) { path = path.substring(idx); + if (invalidStates != null) { + //it does not make sense to send the request to a remote node + throw new SolrException(ErrorCode.INVALID_STATE, new String(ZkStateReader.toJSON(invalidStates), org.apache.lucene.util.IOUtils.UTF_8)); + } remoteQuery(coreUrl + path, req, solrReq, resp); return; } else { @@ -371,7 +380,7 @@ public class SolrDispatchFilter extends BaseSolrFilter { if( "/select".equals( path ) || "/select/".equals( path ) ) { solrReq = parser.parse( core, path, req ); - checkStateIsValid(cores,solrReq.getParams().get(CloudSolrClient.STATE_VERSION)); + invalidStates = checkStateIsValid(cores,solrReq.getParams().get(CloudSolrClient.STATE_VERSION)); String qt = solrReq.getParams().get( CommonParams.QT ); handler = core.getRequestHandler( qt ); if( handler == null ) { @@ -418,7 +427,8 @@ public class SolrDispatchFilter extends BaseSolrFilter { resp.addHeader(entry.getKey(), entry.getValue()); } QueryResponseWriter responseWriter = core.getQueryResponseWriter(solrReq); - writeResponse(solrRsp, response, responseWriter, solrReq, reqMethod); + if(invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates); + writeResponse(solrRsp, response, responseWriter, solrReq, reqMethod); } return; // we are done with a valid handler } @@ -461,21 +471,24 @@ public class SolrDispatchFilter extends BaseSolrFilter { chain.doFilter(request, response); } - private void checkStateIsValid(CoreContainer cores, String stateVer) { + private Map checkStateIsValid(CoreContainer cores, String stateVer) { + Map result = null; + String[] pairs = null; if (stateVer != null && !stateVer.isEmpty() && cores.isZooKeeperAware()) { // many have multiple collections separated by | - String[] pairs = StringUtils.split(stateVer, '|'); + pairs = StringUtils.split(stateVer, '|'); for (String pair : pairs) { String[] pcs = StringUtils.split(pair, ':'); if (pcs.length == 2 && !pcs[0].isEmpty() && !pcs[1].isEmpty()) { - Boolean status = cores.getZkController().getZkStateReader().checkValid(pcs[0], Integer.parseInt(pcs[1])); - - if (Boolean.TRUE != status) { - throw new SolrException(ErrorCode.INVALID_STATE, "STATE STALE: " + pair + "valid : " + status); + Integer status = cores.getZkController().getZkStateReader().compareStateVersions(pcs[0], Integer.parseInt(pcs[1])); + if(status != null ){ + if(result == null) result = new HashMap<>(); + result.put(pcs[0], status); } } } } + return result; } private void processAliases(SolrQueryRequest solrReq, Aliases aliases, @@ -747,6 +760,11 @@ public class SolrDispatchFilter extends BaseSolrFilter { QueryResponseWriter responseWriter, SolrQueryRequest solrReq, Method reqMethod) throws IOException { try { + Object invalidStates = solrReq.getContext().get(CloudSolrClient.STATE_VERSION); + //This is the last item added to the response and the client would expect it that way. + //If that assumption is changed , it would fail. This is done to avoid an O(n) scan on + // the response for each request + if(invalidStates != null) solrRsp.add(CloudSolrClient.STATE_VERSION, invalidStates); // Now write it out final String ct = responseWriter.getContentType(solrReq, solrRsp); // don't call setContentType on null diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java index 111b76ac8f9..ffda19233b4 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java @@ -529,7 +529,7 @@ public class CloudSolrClient extends SolrClient { } } - DocCollection col = getDocCollection(clusterState, collection); + DocCollection col = getDocCollection(clusterState, collection,null); DocRouter router = col.getRouter(); @@ -774,7 +774,7 @@ public class CloudSolrClient extends SolrClient { StringBuilder stateVerParamBuilder = null; for (String requestedCollection : requestedCollectionNames) { // track the version of state we're using on the client side using the _stateVer_ param - DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection); + DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection,null); int collVer = coll.getZNodeVersion(); if (coll.getStateFormat()>1) { if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size()); @@ -807,6 +807,15 @@ public class CloudSolrClient extends SolrClient { NamedList resp = null; try { resp = sendRequest(request); + Object o = resp.get(STATE_VERSION, resp.size()-1); + if(o != null && o instanceof Map) { + Map invalidStates = (Map) o; + for (Object invalidEntries : invalidStates.entrySet()) { + Map.Entry e = (Map.Entry) invalidEntries; + getDocCollection(getZkStateReader().getClusterState(),(String)e.getKey(), (Integer)e.getValue()); + } + + } } catch (Exception exc) { Throwable rootCause = SolrException.getRootCause(exc); @@ -860,7 +869,7 @@ public class CloudSolrClient extends SolrClient { !requestedCollections.isEmpty() && wasCommError) { for (DocCollection ext : requestedCollections) { - DocCollection latestStateFromZk = getDocCollection(zkStateReader.getClusterState(), ext.getName()); + DocCollection latestStateFromZk = getDocCollection(zkStateReader.getClusterState(), ext.getName(),null); if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) { // looks like we couldn't reach the server because the state was stale == retry stateWasStale = true; @@ -949,7 +958,7 @@ public class CloudSolrClient extends SolrClient { // add it to the Map of slices. Map slices = new HashMap<>(); for (String collectionName : collectionNames) { - DocCollection col = getDocCollection(clusterState, collectionName); + DocCollection col = getDocCollection(clusterState, collectionName, null); Collection routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col); ClientUtils.addSlices(slices, collectionName, routeSlices, true); } @@ -1099,10 +1108,13 @@ public class CloudSolrClient extends SolrClient { } - protected DocCollection getDocCollection(ClusterState clusterState, String collection) throws SolrException { + protected DocCollection getDocCollection(ClusterState clusterState, String collection, Integer expectedVersion) throws SolrException { if(collection == null) return null; DocCollection col = getFromCache(collection); - if(col != null) return col; + if(col != null) { + if(expectedVersion == null) return col; + if(expectedVersion.intValue() == col.getZNodeVersion()) return col; + } ClusterState.CollectionRef ref = clusterState.getCollectionRef(collection); if(ref == null){ @@ -1118,8 +1130,15 @@ public class CloudSolrClient extends SolrClient { synchronized (lock){ //we have waited for sometime just check once again col = getFromCache(collection); - if(col !=null) return col; - col = ref.get(); + if(col !=null) { + if(expectedVersion == null) return col; + if(expectedVersion.intValue() == col.getZNodeVersion()) { + return col; + } else { + collectionStateCache.remove(collection); + } + } + col = ref.get();//this is a call to ZK } if(col == null ) return null; if(col.getStateFormat() >1) collectionStateCache.put(collection, new ExpiringCachedDocCollection(col)); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 0203253b01c..1f9ebbc52bf 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -268,13 +268,13 @@ public class ZkStateReader implements Closeable { return aliases; } - public Boolean checkValid(String coll, int version) { + public Integer compareStateVersions(String coll, int version) { DocCollection collection = clusterState.getCollectionOrNull(coll); if (collection == null) return null; if (collection.getZNodeVersion() < version) { log.debug("server older than client {}<{}", collection.getZNodeVersion(), version); DocCollection nu = getCollectionLive(this, coll); - if (nu == null) return null; + if (nu == null) return -1 ; if (nu.getZNodeVersion() > collection.getZNodeVersion()) { updateWatchedCollection(nu); collection = nu; @@ -282,12 +282,12 @@ public class ZkStateReader implements Closeable { } if (collection.getZNodeVersion() == version) { - return Boolean.TRUE; + return null; } log.debug("wrong version from client {}!={} ", version, collection.getZNodeVersion()); - return Boolean.FALSE; + return collection.getZNodeVersion(); } public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException, diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java index 376f8158a11..bc19c6f3b0e 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java @@ -502,7 +502,7 @@ public class CloudSolrClientTest extends AbstractFullDistribZkTestBase { try (CloudSolrClient client = createCloudClient(null)) { String collectionName = "checkStateVerCol"; - createCollection(collectionName, client, 2, 2); + createCollection(collectionName, client, 1, 3); waitForRecoveriesToFinish(collectionName, false); DocCollection coll = client.getZkStateReader().getClusterState().getCollection(collectionName); Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next(); @@ -520,19 +520,13 @@ public class CloudSolrClientTest extends AbstractFullDistribZkTestBase { q.setParam(CloudSolrClient.STATE_VERSION, collectionName + ":" + (coll.getZNodeVersion() - 1)); //an older version expect error - try { - solrClient.query(q); - log.info("expected query error"); - } catch (HttpSolrClient.RemoteSolrException e) { - sse = e; - } - - assertNotNull(sse); - assertEquals(" Error code should be ", sse.code(), SolrException.ErrorCode.INVALID_STATE.code); - + QueryResponse rsp = solrClient.query(q); + Map m = (Map) rsp.getResponse().get(CloudSolrClient.STATE_VERSION, rsp.getResponse().size()-1); + assertNotNull("Expected an extra information from server with the list of invalid collection states", m); + assertNotNull(m.get(collectionName)); } - //now send the request to another node that does n ot serve the collection + //now send the request to another node that does not serve the collection Set allNodesOfColl = new HashSet<>(); for (Slice slice : coll.getSlices()) { @@ -541,27 +535,28 @@ public class CloudSolrClientTest extends AbstractFullDistribZkTestBase { } } String theNode = null; - for (String s : client.getZkStateReader().getClusterState().getLiveNodes()) { + Set liveNodes = client.getZkStateReader().getClusterState().getLiveNodes(); + for (String s : liveNodes) { String n = client.getZkStateReader().getBaseUrlForNodeName(s); - if(!allNodesOfColl.contains(s)){ + if(!allNodesOfColl.contains(n)){ theNode = n; break; } } - log.info("thenode which does not serve this collection{} ",theNode); + log.info("the node which does not serve this collection{} ",theNode); assertNotNull(theNode); try (SolrClient solrClient = new HttpSolrClient(theNode + "/"+collectionName)) { - q.setParam(CloudSolrClient.STATE_VERSION, collectionName + ":" + coll.getZNodeVersion()); + q.setParam(CloudSolrClient.STATE_VERSION, collectionName + ":" + (coll.getZNodeVersion()-1)); try { - solrClient.query(q); + QueryResponse rsp = solrClient.query(q); log.info("error was expected"); } catch (HttpSolrClient.RemoteSolrException e) { sse = e; } assertNotNull(sse); - assertEquals(" Error code should be ", sse.code(), SolrException.ErrorCode.INVALID_STATE.code); + assertEquals(" Error code should be 510", SolrException.ErrorCode.INVALID_STATE.code, sse.code()); } }