mirror of https://github.com/apache/lucene.git
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/lucene-solr
This commit is contained in:
commit
017f59bae5
|
@ -90,6 +90,9 @@ New Features
|
|||
|
||||
* SOLR-12181: Add index size autoscaling trigger, based on document count or size in bytes. (ab)
|
||||
|
||||
* SOLR-11982: Add possibility to define replica order with the shards.preference parameter to e.g. prefer PULL replicas
|
||||
for distributed queries. (Ere Maijala, Tomás Fernández Löbbe)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
@ -137,6 +140,10 @@ Bug Fixes
|
|||
* SOLR-12201: TestReplicationHandler.doTestIndexFetchOnMasterRestart(): handle unexpected replication failures
|
||||
(Steve Rowe)
|
||||
|
||||
* SOLR-12190: Need to properly escape output in GraphMLResponseWriter. (yonik)
|
||||
|
||||
* SOLR-12214: Leader may skip publish itself as ACTIVE when its last published state is DOWN (Cao Manh Dat)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -580,7 +580,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
|||
zkStateReader.forceUpdateCollection(collection);
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
Replica rep = getReplica(clusterState, collection, leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP));
|
||||
if (rep != null && rep.getState() != Replica.State.ACTIVE) {
|
||||
if (rep == null) return;
|
||||
if (rep.getState() != Replica.State.ACTIVE || core.getCoreDescriptor().getCloudDescriptor().getLastPublished() != Replica.State.ACTIVE) {
|
||||
log.debug("We have become the leader after core registration but are not in an ACTIVE state - publishing ACTIVE");
|
||||
zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
|
||||
}
|
||||
|
|
|
@ -749,7 +749,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
|
|||
while (true) {
|
||||
CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
|
||||
DocCollection docCollection = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName());
|
||||
if (mayPutReplicaAsDown && numTried == 1 &&
|
||||
if (!isClosed() && mayPutReplicaAsDown && numTried == 1 &&
|
||||
docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName()).getState() == Replica.State.ACTIVE) {
|
||||
// this operation may take a long time, by putting replica into DOWN state, client won't query this replica
|
||||
zkController.publish(coreDesc, Replica.State.DOWN);
|
||||
|
|
|
@ -29,12 +29,12 @@ import org.apache.solr.common.SolrException;
|
|||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.ShardParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.common.util.URLUtil;
|
||||
import org.apache.solr.core.CoreDescriptor;
|
||||
import org.apache.solr.core.PluginInfo;
|
||||
import org.apache.solr.core.SolrInfoBean;
|
||||
import org.apache.solr.metrics.SolrMetricManager;
|
||||
|
@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
@ -303,30 +304,61 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
|
|||
/**
|
||||
* A distributed request is made via {@link LBHttpSolrClient} to the first live server in the URL list.
|
||||
* This means it is just as likely to choose current host as any of the other hosts.
|
||||
* This function makes sure that the cores of current host are always put first in the URL list.
|
||||
* If all nodes prefer local-cores then a bad/heavily-loaded node will receive less requests from healthy nodes.
|
||||
* This will help prevent a distributed deadlock or timeouts in all the healthy nodes due to one bad node.
|
||||
* This function makes sure that the cores are sorted according to the given list of preferences.
|
||||
* E.g. If all nodes prefer local cores then a bad/heavily-loaded node will receive less requests from
|
||||
* healthy nodes. This will help prevent a distributed deadlock or timeouts in all the healthy nodes due
|
||||
* to one bad node.
|
||||
*/
|
||||
private static class IsOnPreferredHostComparator implements Comparator<Object> {
|
||||
final private String preferredHostAddress;
|
||||
public IsOnPreferredHostComparator(String preferredHostAddress) {
|
||||
this.preferredHostAddress = preferredHostAddress;
|
||||
static class NodePreferenceRulesComparator implements Comparator<Object> {
|
||||
private static class PreferenceRule {
|
||||
public final String name;
|
||||
public final String value;
|
||||
|
||||
public PreferenceRule(String name, String value) {
|
||||
this.name = name;
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
private final SolrQueryRequest request;
|
||||
private List<PreferenceRule> preferenceRules;
|
||||
private String localHostAddress = null;
|
||||
|
||||
public NodePreferenceRulesComparator(final List<String> sortRules, final SolrQueryRequest request) {
|
||||
this.request = request;
|
||||
this.preferenceRules = new ArrayList<PreferenceRule>(sortRules.size());
|
||||
sortRules.forEach(rule -> {
|
||||
String[] parts = rule.split(":", 2);
|
||||
if (parts.length != 2) {
|
||||
throw new IllegalArgumentException("Invalid " + ShardParams.SHARDS_PREFERENCE + " rule: " + rule);
|
||||
}
|
||||
this.preferenceRules.add(new PreferenceRule(parts[0], parts[1]));
|
||||
});
|
||||
}
|
||||
@Override
|
||||
public int compare(Object left, Object right) {
|
||||
final boolean lhs = hasPrefix(objectToString(left));
|
||||
final boolean rhs = hasPrefix(objectToString(right));
|
||||
if (lhs != rhs) {
|
||||
if (lhs) {
|
||||
return -1;
|
||||
} else {
|
||||
return +1;
|
||||
for (PreferenceRule preferenceRule: this.preferenceRules) {
|
||||
final boolean lhs;
|
||||
final boolean rhs;
|
||||
switch (preferenceRule.name) {
|
||||
case ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE:
|
||||
lhs = hasReplicaType(left, preferenceRule.value);
|
||||
rhs = hasReplicaType(right, preferenceRule.value);
|
||||
break;
|
||||
case ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION:
|
||||
lhs = hasCoreUrlPrefix(left, preferenceRule.value);
|
||||
rhs = hasCoreUrlPrefix(right, preferenceRule.value);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Invalid " + ShardParams.SHARDS_PREFERENCE + " type: " + preferenceRule.name);
|
||||
}
|
||||
if (lhs != rhs) {
|
||||
return lhs ? -1 : +1;
|
||||
}
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
private String objectToString(Object o) {
|
||||
private boolean hasCoreUrlPrefix(Object o, String prefix) {
|
||||
final String s;
|
||||
if (o instanceof String) {
|
||||
s = (String)o;
|
||||
|
@ -334,44 +366,80 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
|
|||
else if (o instanceof Replica) {
|
||||
s = ((Replica)o).getCoreUrl();
|
||||
} else {
|
||||
s = null;
|
||||
return false;
|
||||
}
|
||||
return s;
|
||||
if (prefix.equals(ShardParams.REPLICA_LOCAL)) {
|
||||
if (null == localHostAddress) {
|
||||
final ZkController zkController = this.request.getCore().getCoreContainer().getZkController();
|
||||
localHostAddress = zkController != null ? zkController.getBaseUrl() : "";
|
||||
if (localHostAddress.isEmpty()) {
|
||||
log.warn("Couldn't determine current host address for sorting of local replicas");
|
||||
}
|
||||
}
|
||||
if (!localHostAddress.isEmpty()) {
|
||||
if (s.startsWith(localHostAddress)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (s.startsWith(prefix)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
private boolean hasPrefix(String s) {
|
||||
return s != null && s.startsWith(preferredHostAddress);
|
||||
private static boolean hasReplicaType(Object o, String preferred) {
|
||||
if (!(o instanceof Replica)) {
|
||||
return false;
|
||||
}
|
||||
final String s = ((Replica)o).getType().toString();
|
||||
return s.equals(preferred);
|
||||
}
|
||||
}
|
||||
protected ReplicaListTransformer getReplicaListTransformer(final SolrQueryRequest req)
|
||||
{
|
||||
final SolrParams params = req.getParams();
|
||||
|
||||
if (params.getBool(CommonParams.PREFER_LOCAL_SHARDS, false)) {
|
||||
final CoreDescriptor coreDescriptor = req.getCore().getCoreDescriptor();
|
||||
final ZkController zkController = req.getCore().getCoreContainer().getZkController();
|
||||
final String preferredHostAddress = (zkController != null) ? zkController.getBaseUrl() : null;
|
||||
if (preferredHostAddress == null) {
|
||||
log.warn("Couldn't determine current host address to prefer local shards");
|
||||
} else {
|
||||
return new ShufflingReplicaListTransformer(r) {
|
||||
@Override
|
||||
public void transform(List<?> choices)
|
||||
{
|
||||
if (choices.size() > 1) {
|
||||
super.transform(choices);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Trying to prefer local shard on {} among the choices: {}",
|
||||
preferredHostAddress, Arrays.toString(choices.toArray()));
|
||||
}
|
||||
choices.sort(new IsOnPreferredHostComparator(preferredHostAddress));
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Applied local shard preference for choices: {}",
|
||||
Arrays.toString(choices.toArray()));
|
||||
}
|
||||
protected ReplicaListTransformer getReplicaListTransformer(final SolrQueryRequest req) {
|
||||
final SolrParams params = req.getParams();
|
||||
@SuppressWarnings("deprecation")
|
||||
final boolean preferLocalShards = params.getBool(CommonParams.PREFER_LOCAL_SHARDS, false);
|
||||
final String shardsPreferenceSpec = params.get(ShardParams.SHARDS_PREFERENCE, "");
|
||||
|
||||
if (preferLocalShards || !shardsPreferenceSpec.isEmpty()) {
|
||||
if (preferLocalShards && !shardsPreferenceSpec.isEmpty()) {
|
||||
throw new SolrException(
|
||||
SolrException.ErrorCode.BAD_REQUEST,
|
||||
"preferLocalShards is deprecated and must not be used with shards.preference"
|
||||
);
|
||||
}
|
||||
List<String> preferenceRules = StrUtils.splitSmart(shardsPreferenceSpec, ',');
|
||||
if (preferLocalShards) {
|
||||
preferenceRules.add(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION + ":" + ShardParams.REPLICA_LOCAL);
|
||||
}
|
||||
|
||||
return new ShufflingReplicaListTransformer(r) {
|
||||
@Override
|
||||
public void transform(List<?> choices)
|
||||
{
|
||||
if (choices.size() > 1) {
|
||||
super.transform(choices);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Applying the following sorting preferences to replicas: {}",
|
||||
Arrays.toString(preferenceRules.toArray()));
|
||||
}
|
||||
try {
|
||||
choices.sort(new NodePreferenceRulesComparator(preferenceRules, req));
|
||||
} catch (IllegalArgumentException iae) {
|
||||
throw new SolrException(
|
||||
SolrException.ErrorCode.BAD_REQUEST,
|
||||
iae.getMessage()
|
||||
);
|
||||
}
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Applied sorting preferences to replica list: {}",
|
||||
Arrays.toString(choices.toArray()));
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
return shufflingReplicaListTransformer;
|
||||
|
@ -409,4 +477,5 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
|
|||
manager.registry(registry),
|
||||
SolrMetricManager.mkName("httpShardExecutor", expandedScope, "threadPool"));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -97,7 +97,7 @@ public class GraphMLResponseWriter implements QueryResponseWriter {
|
|||
id = tuple.getString("collection") + "." + id;
|
||||
}
|
||||
|
||||
writer.write("<node id=\""+replace(id)+"\"");
|
||||
printWriter.write("<node id=\""+ xmlEscape(id)+"\"");
|
||||
|
||||
List<String> outfields = new ArrayList();
|
||||
Iterator<String> keys = tuple.fields.keySet().iterator();
|
||||
|
@ -115,7 +115,7 @@ public class GraphMLResponseWriter implements QueryResponseWriter {
|
|||
for (String nodeAttribute : outfields) {
|
||||
Object o = tuple.get(nodeAttribute);
|
||||
if (o != null) {
|
||||
printWriter.println("<data key=\""+nodeAttribute+"\">" + o.toString() + "</data>");
|
||||
printWriter.println("<data key=\"" + xmlEscape(nodeAttribute) + "\">" + xmlEscape(o.toString()) + "</data>");
|
||||
}
|
||||
}
|
||||
printWriter.println("</node>");
|
||||
|
@ -128,20 +128,20 @@ public class GraphMLResponseWriter implements QueryResponseWriter {
|
|||
if(ancestors != null) {
|
||||
for (String ancestor : ancestors) {
|
||||
++edgeCount;
|
||||
writer.write("<edge id=\"" + edgeCount + "\" ");
|
||||
writer.write(" source=\"" + replace(ancestor) + "\" ");
|
||||
printWriter.println(" target=\"" + replace(id) + "\"/>");
|
||||
printWriter.write("<edge id=\"" + edgeCount + "\" ");
|
||||
printWriter.write(" source=\"" + xmlEscape(ancestor) + "\" ");
|
||||
printWriter.println(" target=\"" + xmlEscape(id) + "\"/>");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
writer.write("</graph></graphml>");
|
||||
printWriter.write("</graph></graphml>");
|
||||
} finally {
|
||||
stream.close();
|
||||
}
|
||||
}
|
||||
|
||||
private String replace(String s) {
|
||||
private String xmlEscape(String s) {
|
||||
if(s.indexOf(">") > -1) {
|
||||
s = s.replace(">", ">");
|
||||
}
|
||||
|
|
|
@ -90,7 +90,6 @@ public class LeaderVoteWaitTimeoutTest extends SolrCloudTestCase {
|
|||
}
|
||||
|
||||
@Test
|
||||
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 17-Mar-2018
|
||||
public void basicTest() throws Exception {
|
||||
final String collectionName = "basicTest";
|
||||
CollectionAdminRequest.createCollection(collectionName, 1, 1)
|
||||
|
@ -127,7 +126,6 @@ public class LeaderVoteWaitTimeoutTest extends SolrCloudTestCase {
|
|||
}
|
||||
|
||||
@Test
|
||||
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 17-Mar-2018
|
||||
public void testMostInSyncReplicasCanWinElection() throws Exception {
|
||||
final String collectionName = "collection1";
|
||||
CollectionAdminRequest.createCollection(collectionName, 1, 3)
|
||||
|
@ -187,15 +185,18 @@ public class LeaderVoteWaitTimeoutTest extends SolrCloudTestCase {
|
|||
proxies.get(cluster.getJettySolrRunner(2)).reopen();
|
||||
cluster.getJettySolrRunner(0).stop();
|
||||
|
||||
// even replica2 joined election at the end of the queue, but it is the one with highest term
|
||||
waitForState("Timeout waiting for new leader", collectionName, new CollectionStatePredicate() {
|
||||
@Override
|
||||
public boolean matches(Set<String> liveNodes, DocCollection collectionState) {
|
||||
try {
|
||||
// even replica2 joined election at the end of the queue, but it is the one with highest term
|
||||
waitForState("Timeout waiting for new leader", collectionName, (liveNodes, collectionState) -> {
|
||||
Replica newLeader = collectionState.getSlice("shard1").getLeader();
|
||||
return newLeader.getName().equals(replica2.getName());
|
||||
}
|
||||
});
|
||||
|
||||
});
|
||||
} catch (Exception e) {
|
||||
List<String> children = zkClient().getChildren("/collections/"+collectionName+"/leader_elect/shard1/election",
|
||||
null, true);
|
||||
LOG.info("{} election nodes:{}", collectionName, children);
|
||||
throw e;
|
||||
}
|
||||
cluster.getJettySolrRunner(0).start();
|
||||
proxies.get(cluster.getJettySolrRunner(0)).reopen();
|
||||
|
||||
|
|
|
@ -24,6 +24,10 @@ import java.util.List;
|
|||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.ShardParams;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.handler.component.HttpShardHandlerFactory;
|
||||
import org.apache.solr.handler.component.ShardHandlerFactory;
|
||||
|
@ -99,4 +103,119 @@ public class TestHttpShardHandlerFactory extends SolrTestCaseJ4 {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testNodePreferenceRulesComparator() throws Exception {
|
||||
List<Replica> replicas = new ArrayList<Replica>();
|
||||
replicas.add(
|
||||
new Replica(
|
||||
"node1",
|
||||
map(
|
||||
ZkStateReader.BASE_URL_PROP, "http://host1:8983/solr",
|
||||
ZkStateReader.NODE_NAME_PROP, "node1",
|
||||
ZkStateReader.CORE_NAME_PROP, "collection1",
|
||||
ZkStateReader.REPLICA_TYPE, "NRT"
|
||||
)
|
||||
)
|
||||
);
|
||||
replicas.add(
|
||||
new Replica(
|
||||
"node2",
|
||||
map(
|
||||
ZkStateReader.BASE_URL_PROP, "http://host2:8983/solr",
|
||||
ZkStateReader.NODE_NAME_PROP, "node2",
|
||||
ZkStateReader.CORE_NAME_PROP, "collection1",
|
||||
ZkStateReader.REPLICA_TYPE, "TLOG"
|
||||
)
|
||||
)
|
||||
);
|
||||
replicas.add(
|
||||
new Replica(
|
||||
"node3",
|
||||
map(
|
||||
ZkStateReader.BASE_URL_PROP, "http://host2_2:8983/solr",
|
||||
ZkStateReader.NODE_NAME_PROP, "node3",
|
||||
ZkStateReader.CORE_NAME_PROP, "collection1",
|
||||
ZkStateReader.REPLICA_TYPE, "PULL"
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
// Simple replica type rule
|
||||
List<String> rules = StrUtils.splitSmart(
|
||||
ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":NRT," +
|
||||
ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":TLOG",
|
||||
','
|
||||
);
|
||||
HttpShardHandlerFactory.NodePreferenceRulesComparator comparator =
|
||||
new HttpShardHandlerFactory.NodePreferenceRulesComparator(rules, null);
|
||||
replicas.sort(comparator);
|
||||
assertEquals("node1", replicas.get(0).getNodeName());
|
||||
assertEquals("node2", replicas.get(1).getNodeName());
|
||||
|
||||
// Another simple replica type rule
|
||||
rules = StrUtils.splitSmart(
|
||||
ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":TLOG," +
|
||||
ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":NRT",
|
||||
','
|
||||
);
|
||||
comparator = new HttpShardHandlerFactory.NodePreferenceRulesComparator(rules, null);
|
||||
replicas.sort(comparator);
|
||||
assertEquals("node2", replicas.get(0).getNodeName());
|
||||
assertEquals("node1", replicas.get(1).getNodeName());
|
||||
|
||||
// replicaLocation rule
|
||||
rules = StrUtils.splitSmart(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION + ":http://host2:8983", ',');
|
||||
comparator = new HttpShardHandlerFactory.NodePreferenceRulesComparator(rules, null);
|
||||
replicas.sort(comparator);
|
||||
assertEquals("node2", replicas.get(0).getNodeName());
|
||||
assertEquals("node1", replicas.get(1).getNodeName());
|
||||
|
||||
// Add a replica so that sorting by replicaType:TLOG can cause a tie
|
||||
replicas.add(
|
||||
new Replica(
|
||||
"node4",
|
||||
map(
|
||||
ZkStateReader.BASE_URL_PROP, "http://host2_2:8983/solr",
|
||||
ZkStateReader.NODE_NAME_PROP, "node4",
|
||||
ZkStateReader.CORE_NAME_PROP, "collection1",
|
||||
ZkStateReader.REPLICA_TYPE, "TLOG"
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
// replicaType and replicaLocation combined rule
|
||||
rules = StrUtils.splitSmart(
|
||||
ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":NRT," +
|
||||
ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":TLOG," +
|
||||
ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION + ":http://host2_2",
|
||||
','
|
||||
);
|
||||
comparator = new HttpShardHandlerFactory.NodePreferenceRulesComparator(rules, null);
|
||||
replicas.sort(comparator);
|
||||
assertEquals("node1", replicas.get(0).getNodeName());
|
||||
assertEquals("node4", replicas.get(1).getNodeName());
|
||||
assertEquals("node2", replicas.get(2).getNodeName());
|
||||
assertEquals("node3", replicas.get(3).getNodeName());
|
||||
|
||||
// Bad rule
|
||||
rules = StrUtils.splitSmart(ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE, ',');
|
||||
try {
|
||||
comparator = new HttpShardHandlerFactory.NodePreferenceRulesComparator(rules, null);
|
||||
replicas.sort(comparator);
|
||||
fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals("Invalid shards.preference rule: " + ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE, e.getMessage());
|
||||
}
|
||||
|
||||
// Unknown rule
|
||||
rules = StrUtils.splitSmart("badRule:test", ',');
|
||||
try {
|
||||
comparator = new HttpShardHandlerFactory.NodePreferenceRulesComparator(rules, null);
|
||||
replicas.sort(comparator);
|
||||
fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals("Invalid shards.preference type: badRule", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -138,6 +138,46 @@ For example, a deadlock might occur in the case of two shards, each with just a
|
|||
|
||||
== preferLocalShards Parameter
|
||||
|
||||
Deprecated, use `shards.preference=replica.location:local` instead (see below).
|
||||
|
||||
Solr allows you to pass an optional boolean parameter named `preferLocalShards` to indicate that a distributed query should prefer local replicas of a shard when available. In other words, if a query includes `preferLocalShards=true`, then the query controller will look for local replicas to service the query instead of selecting replicas at random from across the cluster. This is useful when a query requests many fields or large fields to be returned per document because it avoids moving large amounts of data over the network when it is available locally. In addition, this feature can be useful for minimizing the impact of a problematic replica with degraded performance, as it reduces the likelihood that the degraded replica will be hit by other healthy replicas.
|
||||
|
||||
Lastly, it follows that the value of this feature diminishes as the number of shards in a collection increases because the query controller will have to direct the query to non-local replicas for most of the shards. In other words, this feature is mostly useful for optimizing queries directed towards collections with a small number of shards and many replicas. Also, this option should only be used if you are load balancing requests across all nodes that host replicas for the collection you are querying, as Solr's CloudSolrClient will do. If not load-balancing, this feature can introduce a hotspot in the cluster since queries won't be evenly distributed across the cluster.
|
||||
|
||||
== shards.preference Parameter
|
||||
|
||||
Solr allows you to pass an optional string parameter named `shards.preference` to indicate that a distributed query should sort the available replicas in the given order of precedence within each shard. The syntax is: `shards.preference=property:value`. The order of the properties and the values are significant meaning that the first one is the primary sort, second one is secondary etc.
|
||||
|
||||
IMPORTANT: `shards.preference` only works for distributed queries, i.e. queries targeting multiple shards. Not implemented yet for single shard scenarios
|
||||
|
||||
The properties that can be specified are as follows:
|
||||
|
||||
`replica.type`::
|
||||
One or more replica types that are preferred. Any combination of PULL, TLOG and NRT is allowed.
|
||||
|
||||
`replica.location`::
|
||||
One or more replica locations that are preferred. A location starts with `http://hostname:port`. Matching is done for the given string as a prefix, so it's possible to e.g. leave out the port. `local` may be used as special value to denote any local replica running on the same Solr instance as the one handling the query. This is useful when a query requests many fields or large fields to be returned per document because it avoids moving large amounts of data over the network when it is available locally. In addition, this feature can be useful for minimizing the impact of a problematic replica with degraded performance, as it reduces the likelihood that the degraded replica will be hit by other healthy replicas.
|
||||
|
||||
The value of `replica.location:local` diminishes as the number of shards (that have no locally-available replicas) in a collection increases because the query controller will have to direct the query to non-local replicas for most of the shards. In other words, this feature is mostly useful for optimizing queries directed towards collections with a small number of shards and many replicas. Also, this option should only be used if you are load balancing requests across all nodes that host replicas for the collection you are querying, as Solr's CloudSolrClient will do. If not load-balancing, this feature can introduce a hotspot in the cluster since queries won't be evenly distributed across the cluster.
|
||||
|
||||
Examples:
|
||||
|
||||
* Prefer PULL replicas:
|
||||
`shards.preference=replica.type:PULL`
|
||||
|
||||
* Prefer PULL replicas, or TLOG replicas if PULL replicas not available:
|
||||
`shards.preference=replica.type:PULL,replica.type:TLOG`
|
||||
|
||||
* Prefer any local replicas:
|
||||
`shards.preference=replica.location:local`
|
||||
|
||||
* Prefer any replicas on a host called "server1" with "server2" as the secondary option:
|
||||
`shards.preference=replica.location:http://server1,replica.location:http://server2`
|
||||
|
||||
* Prefer PULL replicas if available, otherwise TLOG replicas, and local ones among those:
|
||||
`shards.preference=replica.type:PULL,replica.type:TLOG,replica.location:local`
|
||||
|
||||
* Prefer local replicas, and among them PULL replicas when available TLOG otherwise:
|
||||
`shards.preference=replica.location:local,replica.type:PULL,replica.type:TLOG`
|
||||
|
||||
Note that if you provide the settings in a query string, they need to be properly URL-encoded.
|
||||
|
|
|
@ -86,6 +86,10 @@ If the PULL replica cannot connect to ZooKeeper, it would be removed from the cl
|
|||
|
||||
If the PULL replica dies or is unreachable for any other reason, it won't be query-able. When it rejoins the cluster, it would replicate from the leader and when that is complete, it would be ready to serve queries again.
|
||||
|
||||
=== Queries with Preferred Replica Types
|
||||
|
||||
By default all replicas serve queries. See the section <<distributed-requests.adoc#shards-preference-parameter,shards.preference Parameter>> for details on how to indicate preferred replica types for queries.
|
||||
|
||||
== Document Routing
|
||||
|
||||
Solr offers the ability to specify the router implementation used by a collection by specifying the `router.name` parameter when <<collections-api.adoc#create,creating your collection>>.
|
||||
|
|
|
@ -269,6 +269,7 @@ public interface CommonParams {
|
|||
|
||||
/**
|
||||
* When querying a node, prefer local node's cores for distributed queries.
|
||||
* @deprecated Use {@code ShardParams.SHARDS_PREFERENCE}
|
||||
*/
|
||||
String PREFER_LOCAL_SHARDS = "preferLocalShards";
|
||||
|
||||
|
|
|
@ -52,6 +52,18 @@ public interface ShardParams {
|
|||
/** query purpose for shard requests */
|
||||
String SHARDS_PURPOSE = "shards.purpose";
|
||||
|
||||
/** Shards sorting rules */
|
||||
String SHARDS_PREFERENCE = "shards.preference";
|
||||
|
||||
/** Replica type sort rule */
|
||||
String SHARDS_PREFERENCE_REPLICA_TYPE = "replica.type";
|
||||
|
||||
/** Replica location sort rule */
|
||||
String SHARDS_PREFERENCE_REPLICA_LOCATION = "replica.location";
|
||||
|
||||
/** Value denoting local replicas */
|
||||
String REPLICA_LOCAL = "local";
|
||||
|
||||
String _ROUTE_ = "_route_";
|
||||
|
||||
/** Force a single-pass distributed query? (true/false) */
|
||||
|
|
|
@ -416,18 +416,24 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
|
|||
.commit(getRandomClient(), collectionName);
|
||||
|
||||
// Run the actual test for 'preferLocalShards'
|
||||
queryWithPreferLocalShards(getRandomClient(), true, collectionName);
|
||||
queryWithShardsPreferenceRules(getRandomClient(), false, collectionName);
|
||||
queryWithShardsPreferenceRules(getRandomClient(), true, collectionName);
|
||||
}
|
||||
|
||||
private void queryWithPreferLocalShards(CloudSolrClient cloudClient,
|
||||
boolean preferLocalShards,
|
||||
@SuppressWarnings("deprecation")
|
||||
private void queryWithShardsPreferenceRules(CloudSolrClient cloudClient,
|
||||
boolean useShardsPreference,
|
||||
String collectionName)
|
||||
throws Exception
|
||||
{
|
||||
SolrQuery qRequest = new SolrQuery("*:*");
|
||||
|
||||
ModifiableSolrParams qParams = new ModifiableSolrParams();
|
||||
qParams.add(CommonParams.PREFER_LOCAL_SHARDS, Boolean.toString(preferLocalShards));
|
||||
if (useShardsPreference) {
|
||||
qParams.add(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION + ":" + ShardParams.REPLICA_LOCAL);
|
||||
} else {
|
||||
qParams.add(CommonParams.PREFER_LOCAL_SHARDS, "true");
|
||||
}
|
||||
qParams.add(ShardParams.SHARDS_INFO, "true");
|
||||
qRequest.add(qParams);
|
||||
|
||||
|
@ -454,17 +460,15 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
|
|||
log.info("Shards giving the response: " + Arrays.toString(shardAddresses.toArray()));
|
||||
|
||||
// Make sure the distributed queries were directed to a single node only
|
||||
if (preferLocalShards) {
|
||||
Set<Integer> ports = new HashSet<Integer>();
|
||||
for (String shardAddr: shardAddresses) {
|
||||
URL url = new URL (shardAddr);
|
||||
ports.add(url.getPort());
|
||||
}
|
||||
|
||||
// This assertion would hold true as long as every shard has a core on each node
|
||||
assertTrue ("Response was not received from shards on a single node",
|
||||
shardAddresses.size() > 1 && ports.size()==1);
|
||||
Set<Integer> ports = new HashSet<Integer>();
|
||||
for (String shardAddr: shardAddresses) {
|
||||
URL url = new URL (shardAddr);
|
||||
ports.add(url.getPort());
|
||||
}
|
||||
|
||||
// This assertion would hold true as long as every shard has a core on each node
|
||||
assertTrue ("Response was not received from shards on a single node",
|
||||
shardAddresses.size() > 1 && ports.size()==1);
|
||||
}
|
||||
|
||||
private Long getNumRequests(String baseUrl, String collectionName) throws
|
||||
|
@ -844,4 +848,115 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests if the specification of 'preferReplicaTypes' in the query-params
|
||||
* limits the distributed query to locally hosted shards only
|
||||
*/
|
||||
@Test
|
||||
public void preferReplicaTypesTest() throws Exception {
|
||||
|
||||
String collectionName = "replicaTypesTestColl";
|
||||
|
||||
int liveNodes = cluster.getJettySolrRunners().size();
|
||||
|
||||
// For these tests we need to have multiple replica types.
|
||||
// Hence the below configuration for our collection
|
||||
CollectionAdminRequest.createCollection(collectionName, "conf", liveNodes, 1, 1, Math.max(1, liveNodes - 2))
|
||||
.setMaxShardsPerNode(liveNodes)
|
||||
.processAndWait(cluster.getSolrClient(), TIMEOUT);
|
||||
AbstractDistribZkTestBase.waitForRecoveriesToFinish(collectionName, cluster.getSolrClient().getZkStateReader(), false, true, TIMEOUT);
|
||||
|
||||
// Add some new documents
|
||||
new UpdateRequest()
|
||||
.add(id, "0", "a_t", "hello1")
|
||||
.add(id, "2", "a_t", "hello2")
|
||||
.add(id, "3", "a_t", "hello2")
|
||||
.commit(getRandomClient(), collectionName);
|
||||
|
||||
// Run the actual tests for 'shards.preference=replica.type:*'
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "PULL", false, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "PULL|TLOG", false, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "TLOG", false, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "TLOG|PULL", false, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "NRT", false, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "NRT|PULL", false, collectionName);
|
||||
// Test to verify that preferLocalShards=true doesn't break this
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "PULL", true, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "PULL|TLOG", true, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "TLOG", true, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "TLOG|PULL", true, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "NRT", false, collectionName);
|
||||
queryWithPreferReplicaTypes(getRandomClient(), "NRT|PULL", true, collectionName);
|
||||
}
|
||||
|
||||
private void queryWithPreferReplicaTypes(CloudSolrClient cloudClient,
|
||||
String preferReplicaTypes,
|
||||
boolean preferLocalShards,
|
||||
String collectionName)
|
||||
throws Exception
|
||||
{
|
||||
SolrQuery qRequest = new SolrQuery("*:*");
|
||||
ModifiableSolrParams qParams = new ModifiableSolrParams();
|
||||
|
||||
final List<String> preferredTypes = Arrays.asList(preferReplicaTypes.split("\\|"));
|
||||
StringBuilder rule = new StringBuilder();
|
||||
preferredTypes.forEach(type -> {
|
||||
if (rule.length() != 0) {
|
||||
rule.append(',');
|
||||
}
|
||||
rule.append(ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE);
|
||||
rule.append(':');
|
||||
rule.append(type);
|
||||
});
|
||||
if (preferLocalShards) {
|
||||
if (rule.length() != 0) {
|
||||
rule.append(',');
|
||||
}
|
||||
rule.append(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION);
|
||||
rule.append(":local");
|
||||
}
|
||||
qParams.add(ShardParams.SHARDS_PREFERENCE, rule.toString());
|
||||
qParams.add(ShardParams.SHARDS_INFO, "true");
|
||||
qRequest.add(qParams);
|
||||
|
||||
// CloudSolrClient sends the request to some node.
|
||||
// And since all the nodes are hosting cores from all shards, the
|
||||
// distributed query formed by this node will select cores from the
|
||||
// local shards only
|
||||
QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
|
||||
|
||||
Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
|
||||
assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
|
||||
|
||||
Map<String, String> replicaTypeMap = new HashMap<String, String>();
|
||||
DocCollection collection = getCollectionState(collectionName);
|
||||
for (Slice slice : collection.getSlices()) {
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
String coreUrl = replica.getCoreUrl();
|
||||
// It seems replica reports its core URL with a trailing slash while shard
|
||||
// info returned from the query doesn't. Oh well.
|
||||
if (coreUrl.endsWith("/")) {
|
||||
coreUrl = coreUrl.substring(0, coreUrl.length() - 1);
|
||||
}
|
||||
replicaTypeMap.put(coreUrl, replica.getType().toString());
|
||||
}
|
||||
}
|
||||
|
||||
// Iterate over shards-info and check that replicas of correct type responded
|
||||
SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
|
||||
Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
|
||||
List<String> shardAddresses = new ArrayList<String>();
|
||||
while (itr.hasNext()) {
|
||||
Map.Entry<String, ?> e = itr.next();
|
||||
assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
|
||||
String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
|
||||
assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
|
||||
assertTrue(replicaTypeMap.containsKey(shardAddress));
|
||||
assertTrue(preferredTypes.indexOf(replicaTypeMap.get(shardAddress)) == 0);
|
||||
shardAddresses.add(shardAddress);
|
||||
}
|
||||
assertTrue("No responses", shardAddresses.size() > 0);
|
||||
log.info("Shards giving the response: " + Arrays.toString(shardAddresses.toArray()));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue