mirror of https://github.com/apache/lucene.git
SOLR-13884: detect multiple replicas on single node
This commit is contained in:
parent
73c535261c
commit
db65c82c39
|
@ -16,48 +16,31 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.solr.cloud.api.collections;
|
package org.apache.solr.cloud.api.collections;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase.Nightly;
|
|
||||||
import org.apache.solr.SolrTestCaseJ4;
|
|
||||||
import org.apache.solr.client.solrj.SolrClient;
|
|
||||||
import org.apache.solr.client.solrj.SolrQuery;
|
|
||||||
import org.apache.solr.client.solrj.SolrRequest;
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
|
||||||
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
|
||||||
import org.apache.solr.client.solrj.response.UpdateResponse;
|
|
||||||
import org.apache.solr.cloud.CloudTestUtils;
|
import org.apache.solr.cloud.CloudTestUtils;
|
||||||
import org.apache.solr.cloud.MiniSolrCloudCluster;
|
|
||||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
import org.apache.solr.common.SolrDocument;
|
|
||||||
import org.apache.solr.common.SolrDocumentList;
|
|
||||||
import org.apache.solr.common.cloud.ClusterState;
|
import org.apache.solr.common.cloud.ClusterState;
|
||||||
import org.apache.solr.common.cloud.DocCollection;
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
import org.apache.solr.common.cloud.Replica;
|
import org.apache.solr.common.cloud.Replica;
|
||||||
import org.apache.solr.common.util.IOUtils;
|
import org.apache.solr.common.cloud.Slice;
|
||||||
import org.apache.solr.common.util.TimeSource;
|
|
||||||
import org.apache.solr.util.TimeOut;
|
|
||||||
import org.apache.zookeeper.KeeperException;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public class ConcurrentCreateCollectionTest extends SolrCloudTestCase {
|
public class ConcurrentCreateCollectionTest extends SolrCloudTestCase {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
@ -86,22 +69,94 @@ public class ConcurrentCreateCollectionTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private CollectionAdminRequest.Create createCollectionRequest(String cname, int numShards, int numReplicas) throws Exception {
|
||||||
|
CollectionAdminRequest.Create creq = CollectionAdminRequest
|
||||||
|
// nocommit .createCollection(cname, "conf", NODES - 1, NODES - 1)
|
||||||
|
.createCollection(cname, "conf", numShards, numReplicas)
|
||||||
|
.setMaxShardsPerNode(100);
|
||||||
|
creq.setWaitForFinalState(true);
|
||||||
|
creq.setAutoAddReplicas(true);
|
||||||
|
return creq;
|
||||||
|
}
|
||||||
|
|
||||||
public void testConcurrentCreatePlacement() throws Exception {
|
public void testConcurrentCreatePlacement() throws Exception {
|
||||||
final int nThreads = 20;
|
final int nThreads = 2;
|
||||||
final int createsPerThread = 1;
|
final int createsPerThread = 1;
|
||||||
final int repFactor = 1;
|
final int nShards = 1;
|
||||||
final boolean useClusterPolicy = true;
|
final int repFactor = 2;
|
||||||
final boolean useCollectionPolicy = false;
|
final boolean useClusterPolicy = false;
|
||||||
|
final boolean useCollectionPolicy = true;
|
||||||
|
final boolean startUnbalanced = true; // can help make a smaller test that can still reproduce an issue.
|
||||||
|
final int unbalancedSize = 1; // the number of replicas to create first
|
||||||
|
final boolean stopNode = false; // only applicable when startUnbalanced==true... stops a node during first collection creation, then restarts
|
||||||
|
|
||||||
final CloudSolrClient client = cluster.getSolrClient();
|
final CloudSolrClient client = cluster.getSolrClient();
|
||||||
|
|
||||||
|
|
||||||
|
if (startUnbalanced) {
|
||||||
|
/*** This produces a failure (multiple replicas of single shard on same node) when run with NODES=4 and
|
||||||
|
final int nThreads = 2;
|
||||||
|
final int createsPerThread = 1;
|
||||||
|
final int nShards = 2;
|
||||||
|
final int repFactor = 2;
|
||||||
|
final boolean useClusterPolicy = false;
|
||||||
|
final boolean useCollectionPolicy = true;
|
||||||
|
final boolean startUnbalanced = true;
|
||||||
|
// NOTE: useClusterPolicy=true seems to fix it! So does putting both creates in a single thread!
|
||||||
|
// NOTE: even creating a single replica to start with causes failure later on.
|
||||||
|
|
||||||
|
Also reproduced with smaller cluster: NODES=2 and
|
||||||
|
final int nThreads = 2;
|
||||||
|
final int createsPerThread = 1;
|
||||||
|
final int nShards = 1;
|
||||||
|
final int repFactor = 2;
|
||||||
|
final boolean useClusterPolicy = false;
|
||||||
|
final boolean useCollectionPolicy = true;
|
||||||
|
final boolean startUnbalanced = true;
|
||||||
|
|
||||||
|
Also, with NODES=3:
|
||||||
|
final int nThreads = 2;
|
||||||
|
final int createsPerThread = 1;
|
||||||
|
final int nShards = 1;
|
||||||
|
final int repFactor = 2;
|
||||||
|
final boolean useClusterPolicy = false;
|
||||||
|
final boolean useCollectionPolicy = true;
|
||||||
|
final boolean startUnbalanced = false;
|
||||||
|
|
||||||
|
// Also succeeded in replicating a bug where all 5 replicas were on a single node: CORES=5, nThreads=5, repFactor=5,
|
||||||
|
// unbalancedSize = 16 (4 replicas on each of the up nodes), stopNode=true
|
||||||
|
***/
|
||||||
|
|
||||||
|
|
||||||
|
JettySolrRunner downJetty = cluster.getJettySolrRunners().get(0);
|
||||||
|
if (stopNode) {
|
||||||
|
cluster.stopJettySolrRunner(downJetty);
|
||||||
|
}
|
||||||
|
|
||||||
|
String cname = "STARTCOLLECTION";
|
||||||
|
CollectionAdminRequest.Create creq = CollectionAdminRequest
|
||||||
|
// nocommit .createCollection(cname, "conf", NODES - 1, NODES - 1)
|
||||||
|
.createCollection(cname, "conf", unbalancedSize, 1)
|
||||||
|
.setMaxShardsPerNode(100);
|
||||||
|
creq.setWaitForFinalState(true);
|
||||||
|
// creq.setAutoAddReplicas(true);
|
||||||
|
if (useCollectionPolicy) { creq.setPolicy("policy1"); }
|
||||||
|
creq.process(client);
|
||||||
|
|
||||||
|
if (stopNode) {
|
||||||
|
// this will start it with a new port.... does it matter?
|
||||||
|
cluster.startJettySolrRunner(downJetty);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if (useClusterPolicy) {
|
if (useClusterPolicy) {
|
||||||
String setClusterPolicyCommand = "{" +
|
String setClusterPolicyCommand = "{" +
|
||||||
" 'set-cluster-policy': [" +
|
" 'set-cluster-policy': [" +
|
||||||
// " {'cores':'<100', 'node':'#ANY'}," +
|
// " {'cores':'<100', 'node':'#ANY'}," +
|
||||||
" {'replica':'<2', 'shard': '#EACH', 'node': '#ANY'}," +
|
" {'replica':'<2', 'shard': '#EACH', 'node': '#ANY'}," +
|
||||||
|
// " {'replica':'<2', 'node': '#ANY'}," +
|
||||||
" ]" +
|
" ]" +
|
||||||
"}";
|
"}";
|
||||||
|
|
||||||
|
@ -110,12 +165,36 @@ public class ConcurrentCreateCollectionTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (useCollectionPolicy) {
|
if (useCollectionPolicy) {
|
||||||
// NOTE: the meer act of setting this named policy prevents LegacyAssignStrategy from being used, even if the policy is
|
// NOTE: the mere act of setting this named policy prevents LegacyAssignStrategy from being used, even if the policy is
|
||||||
// not used during collection creation.
|
// not used during collection creation.
|
||||||
String commands = "{set-policy :{policy1 : [{replica:'<2' , node:'#ANY'}]}}";
|
String commands = "{set-policy : {" +
|
||||||
|
" policy1 : [{replica:'<2' , node:'#ANY'}]" +
|
||||||
|
",policy2 : [{replica:'<2' , shard:'#EACH', node:'#ANY'}]" +
|
||||||
|
"}}";
|
||||||
client.request(CloudTestUtils.AutoScalingRequest.create(SolrRequest.METHOD.POST, commands));
|
client.request(CloudTestUtils.AutoScalingRequest.create(SolrRequest.METHOD.POST, commands));
|
||||||
|
|
||||||
|
/*** take defaults for cluster preferences
|
||||||
|
String cmd = "{" +
|
||||||
|
" 'set-cluster-preferences': [" +
|
||||||
|
// " {'cores':'<100', 'node':'#ANY'}," +
|
||||||
|
" {minimize:cores}" +
|
||||||
|
" ]" +
|
||||||
|
"}";
|
||||||
|
|
||||||
|
SolrRequest req = CloudTestUtils.AutoScalingRequest.create(SolrRequest.METHOD.POST, cmd);
|
||||||
|
client.request(req);
|
||||||
|
***/
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/***
|
||||||
|
SolrRequest req = CloudTestUtils.AutoScalingRequest.create(SolrRequest.METHOD.GET, null);
|
||||||
|
SolrResponse response = req.process(client);
|
||||||
|
log.info("######### AUTOSCALE " + response);
|
||||||
|
***/
|
||||||
|
|
||||||
|
|
||||||
|
byte[] data = client.getZkStateReader().getZkClient().getData("/autoscaling.json", null, null, true);
|
||||||
|
log.info("AUTOSCALE DATA: " + new String(data, "UTF-8"));
|
||||||
|
|
||||||
final AtomicInteger collectionNum = new AtomicInteger();
|
final AtomicInteger collectionNum = new AtomicInteger();
|
||||||
Thread[] indexThreads = new Thread[nThreads];
|
Thread[] indexThreads = new Thread[nThreads];
|
||||||
|
@ -125,14 +204,17 @@ public class ConcurrentCreateCollectionTest extends SolrCloudTestCase {
|
||||||
try {
|
try {
|
||||||
for (int j=0; j<createsPerThread; j++) {
|
for (int j=0; j<createsPerThread; j++) {
|
||||||
int num = collectionNum.incrementAndGet();
|
int num = collectionNum.incrementAndGet();
|
||||||
|
// Thread.sleep(num*1000); // nocommit
|
||||||
String collectionName = "collection" + num;
|
String collectionName = "collection" + num;
|
||||||
CollectionAdminRequest.Create createReq = CollectionAdminRequest
|
CollectionAdminRequest.Create createReq = CollectionAdminRequest
|
||||||
.createCollection(collectionName, "conf", 1, repFactor)
|
.createCollection(collectionName, "conf", nShards, repFactor)
|
||||||
.setMaxShardsPerNode(1);
|
// .setMaxShardsPerNode(1) // should be default
|
||||||
|
;
|
||||||
createReq.setWaitForFinalState(false);
|
createReq.setWaitForFinalState(false);
|
||||||
if (useCollectionPolicy) {
|
if (useCollectionPolicy) {
|
||||||
createReq.setPolicy("policy1");
|
createReq.setPolicy("policy1");
|
||||||
}
|
}
|
||||||
|
createReq.setAutoAddReplicas(true);
|
||||||
|
|
||||||
createReq.process(client);
|
createReq.process(client);
|
||||||
// cluster.waitForActiveCollection(collectionName, 1, repFactor);
|
// cluster.waitForActiveCollection(collectionName, 1, repFactor);
|
||||||
|
@ -152,8 +234,9 @@ public class ConcurrentCreateCollectionTest extends SolrCloudTestCase {
|
||||||
thread.join();
|
thread.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
int expectedTotalReplicas = nThreads * createsPerThread * repFactor;
|
int expectedTotalReplicas = unbalancedSize + nThreads * createsPerThread * nShards * repFactor;
|
||||||
int expectedPerNode = expectedTotalReplicas / NODES;
|
int expectedPerNode = expectedTotalReplicas / NODES;
|
||||||
|
boolean expectBalanced = (expectedPerNode * NODES == expectedTotalReplicas);
|
||||||
|
|
||||||
Map<String,List<Replica>> replicaMap = new HashMap<>();
|
Map<String,List<Replica>> replicaMap = new HashMap<>();
|
||||||
ClusterState cstate = client.getZkStateReader().getClusterState();
|
ClusterState cstate = client.getZkStateReader().getClusterState();
|
||||||
|
@ -173,11 +256,28 @@ public class ConcurrentCreateCollectionTest extends SolrCloudTestCase {
|
||||||
boolean failed = false;
|
boolean failed = false;
|
||||||
for (List<Replica> replicas : replicaMap.values()) {
|
for (List<Replica> replicas : replicaMap.values()) {
|
||||||
if (replicas.size() != expectedPerNode ) {
|
if (replicas.size() != expectedPerNode ) {
|
||||||
failed = true;
|
if (expectBalanced) {
|
||||||
|
failed = true;
|
||||||
|
}
|
||||||
log.error("UNBALANCED CLUSTER: expected replicas per node " + expectedPerNode + " but got " + replicas.size());
|
log.error("UNBALANCED CLUSTER: expected replicas per node " + expectedPerNode + " but got " + replicas.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check if there were multiple replicas of the same shard placed on the same node
|
||||||
|
for (DocCollection collection : cstate.getCollectionsMap().values()) {
|
||||||
|
for (Slice slice : collection.getSlices()) {
|
||||||
|
Map<String, Replica> nodeToReplica = new HashMap<>();
|
||||||
|
for (Replica replica : slice.getReplicas()) {
|
||||||
|
Replica prev = nodeToReplica.put(replica.getBaseUrl(), replica);
|
||||||
|
if (prev != null) {
|
||||||
|
failed = true;
|
||||||
|
// NOTE: with a replication factor > 2, this will print multiple times per bad slice.
|
||||||
|
log.error("MULTIPLE REPLICAS OF SINGLE SHARD ON SAME NODE: r1=" + prev + " r2=" + replica);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (failed) {
|
if (failed) {
|
||||||
log.error("Cluster state " + cstate.getCollectionsMap());
|
log.error("Cluster state " + cstate.getCollectionsMap());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue