SOLR-12388: Enable a strict ZooKeeper-connected search request mode, in which search requests will fail when the coordinating node cant communicate with ZooKeeper, by setting the "shards.tolerant" param to "requireZkConnected"

This commit is contained in:
Steve Rowe 2018-05-25 01:03:54 -04:00
parent ad143a1cec
commit 54a63d0d0c
12 changed files with 216 additions and 20 deletions

View File

@ -121,6 +121,10 @@ New Features
* SOLR-12378: Support missing versionField on indexed docs in DocBasedVersionConstraintsURP. * SOLR-12378: Support missing versionField on indexed docs in DocBasedVersionConstraintsURP.
(Oliver Bates, Michael Braun via Mark Miller) (Oliver Bates, Michael Braun via Mark Miller)
* SOLR-12388: Enable a strict ZooKeeper-connected search request mode, in which search
requests will fail when the coordinating node can't communicate with ZooKeeper,
by setting the "shards.tolerant" param to "requireZkConnected". (Steve Rowe)
Bug Fixes Bug Fixes
---------------------- ----------------------

View File

@ -713,7 +713,7 @@ public class FacetComponent extends SearchComponent {
try { try {
facet_counts = (NamedList) srsp.getSolrResponse().getResponse().get("facet_counts"); facet_counts = (NamedList) srsp.getSolrResponse().getResponse().get("facet_counts");
} catch (Exception ex) { } catch (Exception ex) {
if (rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false)) { if (ShardParams.getShardsTolerantAsBool(rb.req.getParams())) {
continue; // looks like a shard did not return anything continue; // looks like a shard did not return anything
} }
throw new SolrException(ErrorCode.SERVER_ERROR, throw new SolrException(ErrorCode.SERVER_ERROR,

View File

@ -430,7 +430,7 @@ public class HttpShardHandler extends ShardHandler {
// And now recreate the | delimited list of equivalent servers // And now recreate the | delimited list of equivalent servers
final String sliceShardsStr = createSliceShardsStr(shardUrls); final String sliceShardsStr = createSliceShardsStr(shardUrls);
if (sliceShardsStr.isEmpty()) { if (sliceShardsStr.isEmpty()) {
boolean tolerant = rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false); boolean tolerant = ShardParams.getShardsTolerantAsBool(rb.req.getParams());
if (!tolerant) { if (!tolerant) {
// stop the check when there are no replicas available for a shard // stop the check when there are no replicas available for a shard
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,

View File

@ -229,16 +229,19 @@ public class SearchHandler extends RequestHandlerBase implements SolrCoreAware ,
} }
} }
if(isZkAware) { if (isZkAware) {
String shardsTolerant = req.getParams().get(ShardParams.SHARDS_TOLERANT);
boolean requireZkConnected = shardsTolerant != null && shardsTolerant.equals(ShardParams.REQUIRE_ZK_CONNECTED);
ZkController zkController = cc.getZkController(); ZkController zkController = cc.getZkController();
NamedList<Object> headers = rb.rsp.getResponseHeader(); boolean zkConnected = zkController != null && ! zkController.getZkClient().getConnectionManager().isLikelyExpired();
if(headers != null) { if (requireZkConnected && false == zkConnected) {
headers.add("zkConnected", throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ZooKeeper is not connected");
zkController != null } else {
? !zkController.getZkClient().getConnectionManager().isLikelyExpired() NamedList<Object> headers = rb.rsp.getResponseHeader();
: false); if (headers != null) {
headers.add("zkConnected", zkConnected);
}
} }
} }
return shardHandler; return shardHandler;
@ -392,7 +395,7 @@ public class SearchHandler extends RequestHandlerBase implements SolrCoreAware ,
// now wait for replies, but if anyone puts more requests on // now wait for replies, but if anyone puts more requests on
// the outgoing queue, send them out immediately (by exiting // the outgoing queue, send them out immediately (by exiting
// this loop) // this loop)
boolean tolerant = rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false); boolean tolerant = ShardParams.getShardsTolerantAsBool(rb.req.getParams());
while (rb.outgoing.size() == 0) { while (rb.outgoing.size() == 0) {
ShardResponse srsp = tolerant ? ShardResponse srsp = tolerant ?
shardHandler1.takeCompletedIncludingErrors(): shardHandler1.takeCompletedIncludingErrors():

View File

@ -401,7 +401,7 @@ public class SpellCheckComponent extends SearchComponent implements SolrCoreAwar
try { try {
nl = (NamedList) srsp.getSolrResponse().getResponse().get("spellcheck"); nl = (NamedList) srsp.getSolrResponse().getResponse().get("spellcheck");
} catch (Exception e) { } catch (Exception e) {
if (rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false)) { if (ShardParams.getShardsTolerantAsBool(rb.req.getParams())) {
continue; // looks like a shard did not return anything continue; // looks like a shard did not return anything
} }
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,

View File

@ -99,7 +99,7 @@ public class StatsComponent extends SearchComponent {
stats = (NamedList<NamedList<NamedList<?>>>) stats = (NamedList<NamedList<NamedList<?>>>)
srsp.getSolrResponse().getResponse().get("stats"); srsp.getSolrResponse().getResponse().get("stats");
} catch (Exception e) { } catch (Exception e) {
if (rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false)) { if (ShardParams.getShardsTolerantAsBool(rb.req.getParams())) {
continue; // looks like a shard did not return anything continue; // looks like a shard did not return anything
} }
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,

View File

@ -99,7 +99,7 @@ public class SearchGroupShardResponseProcessor implements ShardResponseProcessor
} }
shardInfo.add(srsp.getShard(), nl); shardInfo.add(srsp.getShard(), nl);
} }
if (rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false) && srsp.getException() != null) { if (ShardParams.getShardsTolerantAsBool(rb.req.getParams()) && srsp.getException() != null) {
if(rb.rsp.getResponseHeader().get(SolrQueryResponse.RESPONSE_HEADER_PARTIAL_RESULTS_KEY) == null) { if(rb.rsp.getResponseHeader().get(SolrQueryResponse.RESPONSE_HEADER_PARTIAL_RESULTS_KEY) == null) {
rb.rsp.getResponseHeader().add(SolrQueryResponse.RESPONSE_HEADER_PARTIAL_RESULTS_KEY, Boolean.TRUE); rb.rsp.getResponseHeader().add(SolrQueryResponse.RESPONSE_HEADER_PARTIAL_RESULTS_KEY, Boolean.TRUE);
} }

View File

@ -110,7 +110,7 @@ public class TopGroupsShardResponseProcessor implements ShardResponseProcessor {
} }
shardInfo.add(srsp.getShard(), individualShardInfo); shardInfo.add(srsp.getShard(), individualShardInfo);
} }
if (rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false) && srsp.getException() != null) { if (ShardParams.getShardsTolerantAsBool(rb.req.getParams()) && srsp.getException() != null) {
if(rb.rsp.getResponseHeader().get(SolrQueryResponse.RESPONSE_HEADER_PARTIAL_RESULTS_KEY) == null) { if(rb.rsp.getResponseHeader().get(SolrQueryResponse.RESPONSE_HEADER_PARTIAL_RESULTS_KEY) == null) {
rb.rsp.getResponseHeader().add(SolrQueryResponse.RESPONSE_HEADER_PARTIAL_RESULTS_KEY, Boolean.TRUE); rb.rsp.getResponseHeader().add(SolrQueryResponse.RESPONSE_HEADER_PARTIAL_RESULTS_KEY, Boolean.TRUE);
} }

View File

@ -17,15 +17,22 @@
package org.apache.solr.handler.component; package org.apache.solr.handler.component;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List; import java.util.List;
import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster; import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrCore;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -131,9 +138,131 @@ public class SearchHandlerTest extends SolrTestCaseJ4
QueryResponse rsp = req.process(cloudSolrClient, collectionName); QueryResponse rsp = req.process(cloudSolrClient, collectionName);
assertTrue(rsp.getResponseHeader().getBooleanArg("zkConnected")); assertTrue(rsp.getResponseHeader().getBooleanArg("zkConnected"));
Collection<Slice> slices = cloudSolrClient.getZkStateReader().getClusterState().getCollection(collectionName).getSlices();
Slice slice = getRandomEntry(slices);
Replica replica = getRandomEntry(slice.getReplicas());
JettySolrRunner jetty = miniCluster.getReplicaJetty(replica);
// Use the replica's core URL to avoid ZK communication
try (HttpSolrClient client = new HttpSolrClient.Builder(replica.getCoreUrl()).build()) {
jetty.getCoreContainer().getZkController().getZkClient().close();
rsp = req.process(client);
assertFalse(rsp.getResponseHeader().getBooleanArg("zkConnected"));
}
} }
finally { finally {
miniCluster.shutdown(); miniCluster.shutdown();
} }
} }
@Test
public void testRequireZkConnected() throws Exception{
MiniSolrCloudCluster miniCluster = new MiniSolrCloudCluster(5, createTempDir(), buildJettyConfig("/solr"));
final CloudSolrClient cloudSolrClient = miniCluster.getSolrClient();
try {
assertNotNull(miniCluster.getZkServer());
List<JettySolrRunner> jettys = miniCluster.getJettySolrRunners();
assertEquals(5, jettys.size());
for (JettySolrRunner jetty : jettys) {
assertTrue(jetty.isRunning());
}
// create collection
String collectionName = "testRequireZkConnectedCollection";
String configName = collectionName + "Config";
miniCluster.uploadConfigSet(SolrTestCaseJ4.TEST_PATH().resolve("collection1/conf"), configName);
CollectionAdminRequest.createCollection(collectionName, configName, 2, 2)
.process(miniCluster.getSolrClient());
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(ShardParams.SHARDS_TOLERANT, "requireZkConnected");
QueryRequest req = new QueryRequest(params);
QueryResponse rsp = req.process(cloudSolrClient, collectionName);
assertTrue(rsp.getResponseHeader().getBooleanArg("zkConnected"));
Collection<Slice> slices = cloudSolrClient.getZkStateReader().getClusterState().getCollection(collectionName).getSlices();
Slice disconnectedSlice = getRandomEntry(slices);
Replica disconnectedReplica = getRandomEntry(disconnectedSlice.getReplicas());
JettySolrRunner disconnectedJetty = miniCluster.getReplicaJetty(disconnectedReplica);
// Use the replica's core URL to avoid ZK communication
try (HttpSolrClient httpSolrClient = new HttpSolrClient.Builder(disconnectedReplica.getCoreUrl()).build()) {
ignoreException("ZooKeeper is not connected");
disconnectedJetty.getCoreContainer().getZkController().getZkClient().close();
req.process(httpSolrClient);
fail("An exception should be thrown when ZooKeeper is not connected and shards.tolerant=requireZkConnected");
} catch (Exception e) {
assertTrue(e.getMessage().contains("ZooKeeper is not connected"));
}
}
finally {
miniCluster.shutdown();
}
}
@Test
public void testRequireZkConnectedDistrib() throws Exception{
MiniSolrCloudCluster miniCluster = new MiniSolrCloudCluster(2, createTempDir(), buildJettyConfig("/solr"));
final CloudSolrClient cloudSolrClient = miniCluster.getSolrClient();
try {
assertNotNull(miniCluster.getZkServer());
List<JettySolrRunner> jettys = miniCluster.getJettySolrRunners();
assertEquals(2, jettys.size());
for (JettySolrRunner jetty : jettys) {
assertTrue(jetty.isRunning());
}
// create collection
String collectionName = "testRequireZkConnectedDistribCollection";
String configName = collectionName + "Config";
miniCluster.uploadConfigSet(SolrTestCaseJ4.TEST_PATH().resolve("collection1/conf"), configName);
CollectionAdminRequest.createCollection(collectionName, configName, 2, 1)
.process(miniCluster.getSolrClient());
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(ShardParams.SHARDS_TOLERANT, "requireZkConnected");
QueryRequest req = new QueryRequest(params);
QueryResponse rsp = req.process(cloudSolrClient, collectionName);
assertTrue(rsp.getResponseHeader().getBooleanArg("zkConnected"));
Collection<Slice> slices = cloudSolrClient.getZkStateReader().getClusterState().getCollection(collectionName).getSlices();
Slice disconnectedSlice = getRandomEntry(slices);
Replica disconnectedReplica = getRandomEntry(disconnectedSlice.getReplicas());
// Query a coordinating replica that is connected to ZooKeeper
Slice connectedSlice = getRandomEntry(slices);
while (connectedSlice.getName().equals(disconnectedSlice.getName())) {
connectedSlice = getRandomEntry(slices);
}
Replica connectedReplica = connectedSlice.getReplicas().iterator().next();
try (HttpSolrClient httpSolrClient = new HttpSolrClient.Builder(connectedReplica.getCoreUrl()).build()) {
ignoreException("ZooKeeper is not connected");
JettySolrRunner disconnectedJetty = miniCluster.getReplicaJetty(disconnectedReplica);
disconnectedJetty.getCoreContainer().getZkController().getZkClient().close();
req.process(httpSolrClient);
fail("An exception should be thrown when ZooKeeper is not connected and shards.tolerant=requireZkConnected");
} catch (Exception e) {
assertTrue(e.getMessage().contains("no servers hosting shard:"));
}
}
finally {
miniCluster.shutdown();
}
}
private static <T> T getRandomEntry(Collection<T> collection) {
if (null == collection || collection.isEmpty())
return null;
Iterator<T> iterator = collection.iterator();
T entry = iterator.next();
int index = 0, rand = random().nextInt(collection.size());
while (index++ < rand)
entry = iterator.next();
return entry;
}
} }

View File

@ -27,11 +27,11 @@ Even if some nodes in the cluster are offline or unreachable, a Solr node will b
=== zkConnected === zkConnected
A Solr node will return the results of a search request as long as it can communicate with at least one replica of every shard that it knows about, even if it can _not_ communicate with ZooKeeper at the time it receives the request. This is normally the preferred behavior from a fault tolerance standpoint, but may result in stale or incorrect results if there have been major changes to the collection structure that the node has not been informed of via ZooKeeper (i.e., shards may have been added or removed, or split into sub-shards) A Solr node will return the results of a search request as long as it can communicate with at least one replica of every shard that it knows about, even if it can _not_ communicate with ZooKeeper at the time it receives the request. This is normally the preferred behavior from a fault tolerance standpoint, but may result in stale or incorrect results if there have been major changes to the collection structure that the node has not been informed of via ZooKeeper (i.e., shards may have been added or removed, or split into sub-shards).
A `zkConnected` header is included in every search response indicating if the node that processed the request was connected with ZooKeeper at the time: A `zkConnected` header is included in every search response indicating if the node that processed the request was connected with ZooKeeper at the time:
.Solr Response with partialResults .Solr Response with zkConnected
[source,json] [source,json]
---- ----
{ {
@ -51,12 +51,16 @@ A `zkConnected` header is included in every search response indicating if the no
} }
---- ----
To prevent stale or incorrect results in the event that the request-serving node can't communicate with ZooKeeper, set the <<shards-tolerant,`shards.tolerant`>> parameter to `requireZkConnected`. This will cause requests to fail rather than setting a `zkConnected` header to `false`.
=== shards.tolerant === shards.tolerant
In the event that one or more shards queried are completely unavailable, then Solr's default behavior is to fail the request. However, there are many use-cases where partial results are acceptable and so Solr provides a boolean `shards.tolerant` parameter (default `false`). In the event that one or more shards queried are completely unavailable, then Solr's default behavior is to fail the request. However, there are many use-cases where partial results are acceptable and so Solr provides a boolean `shards.tolerant` parameter (default `false`). In addition to `true` and `false`, `shards.tolerant` may also be set to `requireZkConnected` - see below.
If `shards.tolerant=true` then partial results may be returned. If the returned response does not contain results from all the appropriate shards then the response header contains a special flag called `partialResults`. If `shards.tolerant=true` then partial results may be returned. If the returned response does not contain results from all the appropriate shards then the response header contains a special flag called `partialResults`.
If `shards.tolerant=requireZkConnected` and the node serving the search request cannot communicate with ZooKeeper, the request will fail, rather than returning potentially stale or incorrect results. This will also cause requests to fail when one or more queried shards are completely unavailable, just like when `shards.tolerant=false`.
The client can specify '<<distributed-search-with-index-sharding.adoc#distributed-search-with-index-sharding,`shards.info`>>' along with the `shards.tolerant` parameter to retrieve more fine-grained details. The client can specify '<<distributed-search-with-index-sharding.adoc#distributed-search-with-index-sharding,`shards.info`>>' along with the `shards.tolerant` parameter to retrieve more fine-grained details.
Example response with `partialResults` flag set to 'true': Example response with `partialResults` flag set to 'true':

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.solr.common.params; package org.apache.solr.common.params;
import org.apache.solr.common.util.StrUtils;
/** /**
* Parameters used for distributed search. * Parameters used for distributed search.
* *
@ -46,7 +48,7 @@ public interface ShardParams {
/** Request detailed match info for each shard (true/false) */ /** Request detailed match info for each shard (true/false) */
String SHARDS_INFO = "shards.info"; String SHARDS_INFO = "shards.info";
/** Should things fail if there is an error? (true/false) */ /** Should things fail if there is an error? (true/false/{@value #REQUIRE_ZK_CONNECTED}) */
String SHARDS_TOLERANT = "shards.tolerant"; String SHARDS_TOLERANT = "shards.tolerant";
/** query purpose for shard requests */ /** query purpose for shard requests */
@ -68,4 +70,28 @@ public interface ShardParams {
/** Force a single-pass distributed query? (true/false) */ /** Force a single-pass distributed query? (true/false) */
String DISTRIB_SINGLE_PASS = "distrib.singlePass"; String DISTRIB_SINGLE_PASS = "distrib.singlePass";
/**
* Throw an error from search requests when the {@value #SHARDS_TOLERANT} param
* has this value and ZooKeeper is not connected.
*
* @see #getShardsTolerantAsBool(SolrParams)
*/
String REQUIRE_ZK_CONNECTED = "requireZkConnected";
/**
* Parse the {@value #SHARDS_TOLERANT} param from <code>params</code> as a boolean;
* accepts {@value #REQUIRE_ZK_CONNECTED} as a valid value indicating <code>false</code>.
*
* By default, returns <code>false</code> when {@value #SHARDS_TOLERANT} is not set
* in <code>params</code>.
*/
static boolean getShardsTolerantAsBool(SolrParams params) {
String shardsTolerantValue = params.get(SHARDS_TOLERANT);
if (null == shardsTolerantValue || shardsTolerantValue.equals(REQUIRE_ZK_CONNECTED)) {
return false;
} else {
return StrUtils.parseBool(shardsTolerantValue); // throw an exception if non-boolean
}
}
} }

View File

@ -17,6 +17,7 @@
package org.apache.solr.common.params; package org.apache.solr.common.params;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.common.SolrException;
/** /**
* This class tests backwards compatibility of {@link ShardParams} parameter constants. * This class tests backwards compatibility of {@link ShardParams} parameter constants.
@ -40,10 +41,39 @@ public class ShardParamsTest extends LuceneTestCase
public void testShardsInfo() { assertEquals(ShardParams.SHARDS_INFO, "shards.info"); } public void testShardsInfo() { assertEquals(ShardParams.SHARDS_INFO, "shards.info"); }
public void testShardsTolerant() { assertEquals(ShardParams.SHARDS_TOLERANT, "shards.tolerant"); } public void testShardsTolerant() { assertEquals(ShardParams.SHARDS_TOLERANT, "shards.tolerant"); }
public void testRequireZkConnected() { assertEquals(ShardParams.REQUIRE_ZK_CONNECTED, "requireZkConnected"); }
public void testShardsPurpose() { assertEquals(ShardParams.SHARDS_PURPOSE, "shards.purpose"); } public void testShardsPurpose() { assertEquals(ShardParams.SHARDS_PURPOSE, "shards.purpose"); }
public void testRoute() { assertEquals(ShardParams._ROUTE_, "_route_"); } public void testRoute() { assertEquals(ShardParams._ROUTE_, "_route_"); }
public void testDistribSinglePass() { assertEquals(ShardParams.DISTRIB_SINGLE_PASS, "distrib.singlePass"); } public void testDistribSinglePass() { assertEquals(ShardParams.DISTRIB_SINGLE_PASS, "distrib.singlePass"); }
public void testGetShardsTolerantAsBool() {
ModifiableSolrParams params = new ModifiableSolrParams();
// shards.tolerant param is not set; default should be false
assertFalse(ShardParams.getShardsTolerantAsBool(params));
// shards.tolerant boolean true param should return true
for (String trueValue : new String[] { "true", "yes", "on"}) {
params.set(ShardParams.SHARDS_TOLERANT, trueValue);
assertTrue(ShardParams.getShardsTolerantAsBool(params));
}
// shards.tolerant boolean false param should return false
for (String falseValue : new String[] { "false", "no", "off"}) {
params.set(ShardParams.SHARDS_TOLERANT, falseValue);
assertFalse(ShardParams.getShardsTolerantAsBool(params));
}
// shards.tolerant=requireZkConnected should return false
params.set(ShardParams.SHARDS_TOLERANT, ShardParams.REQUIRE_ZK_CONNECTED);
assertFalse(ShardParams.getShardsTolerantAsBool(params));
// values that aren't "requireZkConnected" or boolean should throw an exception
params.set(ShardParams.SHARDS_TOLERANT, "bogusValue");
Exception exception = expectThrows(SolrException.class, () -> ShardParams.getShardsTolerantAsBool(params));
assertTrue(exception.getMessage().startsWith("invalid boolean value: "));
}
} }