mirror of https://github.com/apache/lucene.git
SOLR-8215: Only active replicas should handle incoming requests against a collection
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1712601 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
94e23b458b
commit
b5b4292afc
|
@ -322,6 +322,8 @@ Bug Fixes
|
|||
(Mark Miller, yonik)
|
||||
|
||||
|
||||
* SOLR-8215: Only active replicas should handle incoming requests against a collection (Varun Thacker)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -765,26 +765,31 @@ public class HttpSolrCall {
|
|||
return result;
|
||||
}
|
||||
|
||||
private SolrCore getCoreByCollection(String corename) {
|
||||
private SolrCore getCoreByCollection(String collection) {
|
||||
ZkStateReader zkStateReader = cores.getZkController().getZkStateReader();
|
||||
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
Map<String, Slice> slices = clusterState.getActiveSlicesMap(corename);
|
||||
Map<String, Slice> slices = clusterState.getActiveSlicesMap(collection);
|
||||
if (slices == null) {
|
||||
return null;
|
||||
}
|
||||
Set<String> liveNodes = clusterState.getLiveNodes();
|
||||
// look for a core on this node
|
||||
Set<Map.Entry<String, Slice>> entries = slices.entrySet();
|
||||
SolrCore core = null;
|
||||
done:
|
||||
|
||||
//Hitting the leaders is useful when it's an update request.
|
||||
//For queries it doesn't matter and hence we don't distinguish here.
|
||||
for (Map.Entry<String, Slice> entry : entries) {
|
||||
// first see if we have the leader
|
||||
ZkNodeProps leaderProps = clusterState.getLeader(corename, entry.getKey());
|
||||
if (leaderProps != null) {
|
||||
core = checkProps(leaderProps);
|
||||
}
|
||||
if (core != null) {
|
||||
break done;
|
||||
Replica leaderProps = clusterState.getLeader(collection, entry.getKey());
|
||||
if (liveNodes.contains(leaderProps.getNodeName()) && leaderProps.getState() == Replica.State.ACTIVE) {
|
||||
if (leaderProps != null) {
|
||||
core = checkProps(leaderProps);
|
||||
}
|
||||
if (core != null) {
|
||||
return core;
|
||||
}
|
||||
}
|
||||
|
||||
// check everyone then
|
||||
|
@ -792,13 +797,15 @@ public class HttpSolrCall {
|
|||
Set<Map.Entry<String, Replica>> shardEntries = shards.entrySet();
|
||||
for (Map.Entry<String, Replica> shardEntry : shardEntries) {
|
||||
Replica zkProps = shardEntry.getValue();
|
||||
core = checkProps(zkProps);
|
||||
if (core != null) {
|
||||
break done;
|
||||
if (liveNodes.contains(zkProps.getNodeName()) && zkProps.getState() == Replica.State.ACTIVE) {
|
||||
core = checkProps(zkProps);
|
||||
if (core != null) {
|
||||
return core;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return core;
|
||||
return null;
|
||||
}
|
||||
|
||||
private SolrCore checkProps(ZkNodeProps zkProps) {
|
||||
|
|
|
@ -65,6 +65,8 @@ import org.junit.Ignore;
|
|||
import org.junit.Test;
|
||||
import org.xml.sax.SAXException;
|
||||
|
||||
import static org.apache.solr.cloud.AbstractDistribZkTestBase.verifyReplicaStatus;
|
||||
|
||||
@Slow
|
||||
public class OverseerTest extends SolrTestCaseJ4 {
|
||||
|
||||
|
@ -325,7 +327,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
|
||||
// make sure the Overseer is still processing items
|
||||
for (int i = 0; i < numShards; i++) {
|
||||
assertNotNull("shard got no id?", zkController.publishState("collection2", "core" + (i+1), "node" + (i+1), Replica.State.ACTIVE, 3));
|
||||
assertNotNull("shard got no id?", zkController.publishState("collection2", "core" + (i + 1), "node" + (i + 1), Replica.State.ACTIVE, 3));
|
||||
}
|
||||
|
||||
assertEquals(1, reader.getClusterState().getSlice("collection2", "shard1").getReplicasMap().size());
|
||||
|
@ -573,7 +575,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
|
||||
q.offer(Utils.toJSON(m));
|
||||
|
||||
verifyStatus(reader, "collection1", "shard1", "core_node1", Replica.State.ACTIVE);
|
||||
verifyReplicaStatus(reader, "collection1", "shard1", "core_node1", Replica.State.ACTIVE);
|
||||
|
||||
} finally {
|
||||
|
||||
|
@ -584,22 +586,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
server.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyStatus(ZkStateReader reader, String collection, String shard, String coreNodeName, Replica.State expectedState) throws InterruptedException {
|
||||
int maxIterations = 100;
|
||||
Replica.State coreState = null;
|
||||
while(maxIterations-->0) {
|
||||
Slice slice = reader.getClusterState().getSlice(collection, shard);
|
||||
if(slice!=null) {
|
||||
coreState = slice.getReplicasMap().get(coreNodeName).getState();
|
||||
if(coreState == expectedState) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
Thread.sleep(50);
|
||||
}
|
||||
fail("Illegal state, was: " + coreState + " expected:" + expectedState + " clusterState:" + reader.getClusterState());
|
||||
}
|
||||
|
||||
private void verifyShardLeader(ZkStateReader reader, String collection, String shard, String expectedCore) throws InterruptedException, KeeperException {
|
||||
int maxIterations = 200;
|
||||
|
@ -649,15 +635,15 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
mockController.publishState(collection, "core1", "core_node1", Replica.State.RECOVERING, 1);
|
||||
|
||||
waitForCollections(reader, collection);
|
||||
verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.RECOVERING);
|
||||
verifyReplicaStatus(reader, collection, "shard1", "core_node1", Replica.State.RECOVERING);
|
||||
|
||||
int version = getClusterStateVersion(zkClient);
|
||||
|
||||
mockController.publishState(collection, "core1", "core_node1", Replica.State.ACTIVE, 1);
|
||||
|
||||
while (version == getClusterStateVersion(zkClient));
|
||||
|
||||
verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.ACTIVE);
|
||||
|
||||
verifyReplicaStatus(reader, collection, "shard1", "core_node1", Replica.State.ACTIVE);
|
||||
version = getClusterStateVersion(zkClient);
|
||||
overseerClient.close();
|
||||
Thread.sleep(1000); // wait for overseer to get killed
|
||||
|
@ -668,8 +654,8 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
overseerClient = electNewOverseer(server.getZkAddress());
|
||||
|
||||
while (version == getClusterStateVersion(zkClient));
|
||||
|
||||
verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.RECOVERING);
|
||||
|
||||
verifyReplicaStatus(reader, collection, "shard1", "core_node1", Replica.State.RECOVERING);
|
||||
|
||||
assertEquals("Live nodes count does not match", 1, reader
|
||||
.getClusterState().getLiveNodes().size());
|
||||
|
@ -883,8 +869,8 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
mockController.publishState(collection, "core1", "core_node1", Replica.State.RECOVERING, 1);
|
||||
|
||||
waitForCollections(reader, "collection1");
|
||||
|
||||
verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.RECOVERING);
|
||||
|
||||
verifyReplicaStatus(reader, collection, "shard1", "core_node1", Replica.State.RECOVERING);
|
||||
|
||||
mockController.close();
|
||||
|
||||
|
@ -1201,7 +1187,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
q.offer(Utils.toJSON(m));
|
||||
|
||||
waitForCollections(reader, "c1");
|
||||
verifyStatus(reader, "c1", "shard1", "core_node1", Replica.State.DOWN);
|
||||
verifyReplicaStatus(reader, "c1", "shard1", "core_node1", Replica.State.DOWN);
|
||||
|
||||
m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
|
||||
ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
|
||||
|
@ -1255,10 +1241,10 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
q.offer(Utils.toJSON(m));
|
||||
|
||||
waitForCollections(reader, "test");
|
||||
verifyStatus(reader, "test", "x", "core_node1", Replica.State.DOWN);
|
||||
verifyReplicaStatus(reader, "test", "x", "core_node1", Replica.State.DOWN);
|
||||
|
||||
waitForCollections(reader, "c1");
|
||||
verifyStatus(reader, "c1", "shard1", "core_node1", Replica.State.ACTIVE);
|
||||
verifyReplicaStatus(reader, "c1", "shard1", "core_node1", Replica.State.ACTIVE);
|
||||
|
||||
} finally {
|
||||
close(zkClient);
|
||||
|
|
|
@ -29,36 +29,45 @@ import org.apache.solr.client.solrj.SolrQuery;
|
|||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest;
|
||||
import org.apache.solr.cloud.overseer.OverseerAction;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.handler.component.ShardHandlerFactory;
|
||||
import org.apache.solr.handler.component.TrackingShardHandlerFactory;
|
||||
import org.apache.solr.request.SolrRequestHandler;
|
||||
import org.apache.solr.servlet.SolrDispatchFilter;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Asserts that requests aren't always sent to the same poor node. See SOLR-7493
|
||||
*/
|
||||
|
||||
@SolrTestCaseJ4.SuppressSSL
|
||||
public class TestRandomRequestDistribution extends AbstractFullDistribZkTestBase {
|
||||
|
||||
List<String> nodeNames = new ArrayList<>(3);
|
||||
|
||||
@Test
|
||||
@BaseDistributedSearchTestCase.ShardsFixed(num = 3)
|
||||
public void testRequestTracking() throws Exception {
|
||||
public void test() throws Exception {
|
||||
waitForThingsToLevelOut(30);
|
||||
|
||||
List<String> nodeNames = new ArrayList<>(3);
|
||||
for (CloudJettyRunner cloudJetty : cloudJettys) {
|
||||
nodeNames.add(cloudJetty.nodeName);
|
||||
}
|
||||
assertEquals(3, nodeNames.size());
|
||||
|
||||
testRequestTracking();
|
||||
testQueryAgainstDownReplica();
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts that requests aren't always sent to the same poor node. See SOLR-7493
|
||||
*/
|
||||
private void testRequestTracking() throws Exception {
|
||||
|
||||
new CollectionAdminRequest.Create()
|
||||
.setCollectionName("a1x2")
|
||||
.setNumShards(1)
|
||||
|
@ -109,4 +118,77 @@ public class TestRandomRequestDistribution extends AbstractFullDistribZkTestBase
|
|||
assertTrue("Shard " + entry.getKey() + " received all 10 requests", entry.getValue() != 10);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts that requests against a collection are only served by a 'active' local replica
|
||||
*/
|
||||
private void testQueryAgainstDownReplica() throws Exception {
|
||||
|
||||
log.info("Creating collection 'football' with 1 shard and 2 replicas");
|
||||
new CollectionAdminRequest.Create()
|
||||
.setCollectionName("football")
|
||||
.setNumShards(1)
|
||||
.setReplicationFactor(2)
|
||||
.setCreateNodeSet(nodeNames.get(0) + ',' + nodeNames.get(1))
|
||||
.process(cloudClient);
|
||||
|
||||
waitForRecoveriesToFinish("football", true);
|
||||
|
||||
cloudClient.getZkStateReader().updateClusterState();
|
||||
|
||||
Replica leader = null;
|
||||
Replica notLeader = null;
|
||||
|
||||
Collection<Replica> replicas = cloudClient.getZkStateReader().getClusterState().getSlice("football", "shard1").getReplicas();
|
||||
for (Replica replica : replicas) {
|
||||
if (replica.getStr(ZkStateReader.LEADER_PROP) != null) {
|
||||
leader = replica;
|
||||
} else {
|
||||
notLeader = replica;
|
||||
}
|
||||
}
|
||||
|
||||
//Simulate a replica being in down state.
|
||||
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
|
||||
ZkStateReader.BASE_URL_PROP, notLeader.getStr(ZkStateReader.BASE_URL_PROP),
|
||||
ZkStateReader.NODE_NAME_PROP, notLeader.getStr(ZkStateReader.NODE_NAME_PROP),
|
||||
ZkStateReader.COLLECTION_PROP, "football",
|
||||
ZkStateReader.SHARD_ID_PROP, "shard1",
|
||||
ZkStateReader.CORE_NAME_PROP, notLeader.getStr(ZkStateReader.CORE_NAME_PROP),
|
||||
ZkStateReader.ROLES_PROP, "",
|
||||
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
|
||||
|
||||
log.info("Forcing {} to go into 'down' state", notLeader.getStr(ZkStateReader.CORE_NAME_PROP));
|
||||
DistributedQueue q = Overseer.getInQueue(cloudClient.getZkStateReader().getZkClient());
|
||||
q.offer(Utils.toJSON(m));
|
||||
|
||||
verifyReplicaStatus(cloudClient.getZkStateReader(), "football", "shard1", notLeader.getName(), Replica.State.DOWN);
|
||||
|
||||
//Query against the node which hosts the down replica
|
||||
|
||||
String baseUrl = notLeader.getStr(ZkStateReader.BASE_URL_PROP);
|
||||
if (!baseUrl.endsWith("/")) baseUrl += "/";
|
||||
String path = baseUrl + "football";
|
||||
log.info("Firing query against path=" + path);
|
||||
HttpSolrClient client = new HttpSolrClient(path);
|
||||
client.setSoTimeout(5000);
|
||||
client.setConnectionTimeout(2000);
|
||||
|
||||
client.query(new SolrQuery("*:*"));
|
||||
client.close();
|
||||
|
||||
//Test to see if the query got forwarded to the active replica or not.
|
||||
for (JettySolrRunner jetty : jettys) {
|
||||
CoreContainer container = ((SolrDispatchFilter) jetty.getDispatchFilter().getFilter()).getCores();
|
||||
for (SolrCore core : container.getCores()) {
|
||||
if (core.getName().equals(leader.getStr(ZkStateReader.CORE_NAME_PROP))) {
|
||||
SolrRequestHandler select = core.getRequestHandler("");
|
||||
long c = (long) select.getStatistics().get("requests");
|
||||
assertEquals(core.getName() + " should have got 1 request", 1, c);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -214,6 +214,22 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
|
|||
|
||||
log.info("Collection has disappeared - collection: " + collection);
|
||||
}
|
||||
|
||||
public static void verifyReplicaStatus(ZkStateReader reader, String collection, String shard, String coreNodeName, Replica.State expectedState) throws InterruptedException {
|
||||
int maxIterations = 100;
|
||||
Replica.State coreState = null;
|
||||
while(maxIterations-->0) {
|
||||
Slice slice = reader.getClusterState().getSlice(collection, shard);
|
||||
if(slice!=null) {
|
||||
coreState = slice.getReplicasMap().get(coreNodeName).getState();
|
||||
if(coreState == expectedState) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
Thread.sleep(50);
|
||||
}
|
||||
fail("Illegal state, was: " + coreState + " expected:" + expectedState + " clusterState:" + reader.getClusterState());
|
||||
}
|
||||
|
||||
protected void assertAllActive(String collection,ZkStateReader zkStateReader)
|
||||
throws KeeperException, InterruptedException {
|
||||
|
|
Loading…
Reference in New Issue