mirror of https://github.com/apache/lucene.git
SOLR-9132: Cut over DeleteReplica tests
Also fixes some bugs in CollectionAdminRequest.DeleteReplica from SOLR-9319
This commit is contained in:
parent
45847eb537
commit
11a98a89fd
|
@ -16,144 +16,89 @@
|
|||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import static org.apache.solr.cloud.CollectionsAPIDistributedZkTest.*;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.net.URL;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.MapSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class DeleteInactiveReplicaTest extends AbstractFullDistribZkTestBase{
|
||||
public class DeleteInactiveReplicaTest extends SolrCloudTestCase {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(4)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.withProperty(ZkStateReader.LEGACY_CLOUD, "false")
|
||||
.configure();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void deleteInactiveReplicaTest() throws Exception {
|
||||
try (CloudSolrClient client = createCloudClient(null)) {
|
||||
|
||||
String collectionName = "delDeadColl";
|
||||
String collectionName = "delDeadColl";
|
||||
int replicationFactor = 2;
|
||||
int numShards = 2;
|
||||
int maxShardsPerNode = ((((numShards + 1) * replicationFactor) / cluster.getJettySolrRunners().size())) + 1;
|
||||
|
||||
setClusterProp(client, ZkStateReader.LEGACY_CLOUD, "false");
|
||||
CollectionAdminRequest.createCollection(collectionName, "conf", numShards, replicationFactor)
|
||||
.setMaxShardsPerNode(maxShardsPerNode)
|
||||
.process(cluster.getSolrClient());
|
||||
waitForState("Expected a cluster of 2 shards and 2 replicas", collectionName, (n, c) -> {
|
||||
return DocCollection.isFullyActive(n, c, numShards, replicationFactor);
|
||||
});
|
||||
|
||||
int replicationFactor = 2;
|
||||
int numShards = 2;
|
||||
int maxShardsPerNode = ((((numShards+1) * replicationFactor) / getCommonCloudSolrClient()
|
||||
.getZkStateReader().getClusterState().getLiveNodes().size())) + 1;
|
||||
DocCollection collectionState = getCollectionState(collectionName);
|
||||
|
||||
Map<String,List<Integer>> collectionInfos = new HashMap<>();
|
||||
createCollection(collectionInfos, collectionName, numShards, replicationFactor, maxShardsPerNode, client, null);
|
||||
Slice shard = getRandomShard(collectionState);
|
||||
Replica replica = getRandomReplica(shard);
|
||||
JettySolrRunner jetty = cluster.getReplicaJetty(replica);
|
||||
cluster.stopJettySolrRunner(jetty);
|
||||
|
||||
waitForRecoveriesToFinish(collectionName, false);
|
||||
waitForState("Expected replica " + replica.getName() + " on down node to be removed from cluster state", collectionName, (n, c) -> {
|
||||
Replica r = c.getReplica(replica.getCoreName());
|
||||
return r == null || r.getState() != Replica.State.ACTIVE;
|
||||
});
|
||||
|
||||
Thread.sleep(3000);
|
||||
log.info("Removing replica {}/{} ", shard.getName(), replica.getName());
|
||||
CollectionAdminRequest.deleteReplica(collectionName, shard.getName(), replica.getName())
|
||||
.process(cluster.getSolrClient());
|
||||
waitForState("Expected deleted replica " + replica.getName() + " to be removed from cluster state", collectionName, (n, c) -> {
|
||||
return c.getReplica(replica.getCoreName()) == null;
|
||||
});
|
||||
|
||||
boolean stopped = false;
|
||||
JettySolrRunner stoppedJetty = null;
|
||||
StringBuilder sb = new StringBuilder();
|
||||
Replica replica1 = null;
|
||||
Slice shard1 = null;
|
||||
TimeOut timeout = new TimeOut(3, TimeUnit.SECONDS);
|
||||
DocCollection testcoll = null;
|
||||
while (!stopped && ! timeout.hasTimedOut()) {
|
||||
testcoll = client.getZkStateReader().getClusterState().getCollection(collectionName);
|
||||
for (JettySolrRunner jetty : jettys)
|
||||
sb.append(jetty.getBaseUrl()).append(",");
|
||||
cluster.startJettySolrRunner(jetty);
|
||||
log.info("restarted jetty");
|
||||
|
||||
for (Slice slice : testcoll.getActiveSlices()) {
|
||||
for (Replica replica : slice.getReplicas())
|
||||
for (JettySolrRunner jetty : jettys) {
|
||||
URL baseUrl = null;
|
||||
try {
|
||||
baseUrl = jetty.getBaseUrl();
|
||||
} catch (Exception e) {
|
||||
continue;
|
||||
}
|
||||
if (baseUrl.toString().startsWith(
|
||||
replica.getStr(ZkStateReader.BASE_URL_PROP))) {
|
||||
stoppedJetty = jetty;
|
||||
ChaosMonkey.stop(jetty);
|
||||
replica1 = replica;
|
||||
shard1 = slice;
|
||||
stopped = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Thread.sleep(100);
|
||||
}
|
||||
CoreContainer cc = jetty.getCoreContainer();
|
||||
CoreContainer.CoreLoadFailure loadFailure = cc.getCoreInitFailures().get(replica.getCoreName());
|
||||
assertNotNull("Deleted core was still loaded!", loadFailure);
|
||||
assertTrue("Unexpected load failure message: " + loadFailure.exception.getMessage(),
|
||||
loadFailure.exception.getMessage().contains("not present in cluster state"));
|
||||
|
||||
|
||||
if (!stopped) {
|
||||
fail("Could not find jetty to stop in collection " + testcoll
|
||||
+ " jettys: " + sb);
|
||||
}
|
||||
|
||||
timeout = new TimeOut(20, TimeUnit.SECONDS);
|
||||
boolean success = false;
|
||||
while (! timeout.hasTimedOut()) {
|
||||
testcoll = client.getZkStateReader()
|
||||
.getClusterState().getCollection(collectionName);
|
||||
if (testcoll.getSlice(shard1.getName()).getReplica(replica1.getName()).getState() != Replica.State.ACTIVE) {
|
||||
success = true;
|
||||
}
|
||||
if (success) break;
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
log.info("removed_replicas {}/{} ", shard1.getName(), replica1.getName());
|
||||
DeleteReplicaTest.removeAndWaitForReplicaGone(collectionName, client, replica1,
|
||||
shard1.getName());
|
||||
ChaosMonkey.start(stoppedJetty);
|
||||
log.info("restarted jetty");
|
||||
|
||||
Map m = Utils.makeMap("qt", "/admin/cores", "action", "status");
|
||||
|
||||
try (SolrClient queryClient = getHttpSolrClient(replica1.getStr(ZkStateReader.BASE_URL_PROP))) {
|
||||
NamedList<Object> resp = queryClient.request(new QueryRequest(new MapSolrParams(m)));
|
||||
assertNull("The core is up and running again",
|
||||
((NamedList) resp.get("status")).get(replica1.getStr("core")));
|
||||
}
|
||||
|
||||
Exception exp = null;
|
||||
|
||||
try {
|
||||
|
||||
m = Utils.makeMap(
|
||||
"action", CoreAdminParams.CoreAdminAction.CREATE.toString(),
|
||||
ZkStateReader.COLLECTION_PROP, collectionName,
|
||||
ZkStateReader.SHARD_ID_PROP, "shard2",
|
||||
CoreAdminParams.NAME, "testcore");
|
||||
|
||||
QueryRequest request = new QueryRequest(new MapSolrParams(m));
|
||||
request.setPath("/admin/cores");
|
||||
NamedList<Object> rsp = client.request(request);
|
||||
} catch (Exception e) {
|
||||
exp = e;
|
||||
log.info("error_expected", e);
|
||||
}
|
||||
assertNotNull("Exception expected", exp);
|
||||
setClusterProp(client, ZkStateReader.LEGACY_CLOUD, null);
|
||||
// Check that we can't create a core with no coreNodeName
|
||||
try (SolrClient queryClient = getHttpSolrClient(jetty.getBaseUrl().toString())) {
|
||||
Exception e = expectThrows(Exception.class, () -> {
|
||||
CoreAdminRequest.Create createRequest = new CoreAdminRequest.Create();
|
||||
createRequest.setCoreName("testcore");
|
||||
createRequest.setCollection(collectionName);
|
||||
createRequest.setShardId("shard2");
|
||||
queryClient.request(createRequest);
|
||||
});
|
||||
assertTrue("Unexpected error message: " + e.getMessage(), e.getMessage().contains("coreNodeName missing"));
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -16,327 +16,108 @@
|
|||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.response.CoreAdminResponse;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.client.solrj.request.CoreStatus;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.MapSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.util.FileUtils;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_IF_DOWN;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
|
||||
import static org.apache.solr.common.util.Utils.makeMap;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REQUESTSTATUS;
|
||||
import org.apache.solr.client.solrj.response.RequestStatusState;
|
||||
|
||||
|
||||
public class DeleteReplicaTest extends AbstractFullDistribZkTestBase {
|
||||
public class DeleteReplicaTest extends SolrCloudTestCase {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
protected String getSolrXml() {
|
||||
return "solr.xml";
|
||||
}
|
||||
|
||||
public DeleteReplicaTest() {
|
||||
sliceCount = 2;
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(4)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ShardsFixed(num = 4)
|
||||
public void deleteLiveReplicaTest() throws Exception {
|
||||
String collectionName = "delLiveColl";
|
||||
try (CloudSolrClient client = createCloudClient(null)) {
|
||||
createCollection(collectionName, client);
|
||||
|
||||
waitForRecoveriesToFinish(collectionName, false);
|
||||
final String collectionName = "delLiveColl";
|
||||
|
||||
DocCollection testcoll = getCommonCloudSolrClient().getZkStateReader()
|
||||
.getClusterState().getCollection(collectionName);
|
||||
CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
Slice shard1 = null;
|
||||
Replica replica1 = null;
|
||||
DocCollection state = getCollectionState(collectionName);
|
||||
Slice shard = getRandomShard(state);
|
||||
Replica replica = getRandomReplica(shard, (r) -> r.getState() == Replica.State.ACTIVE);
|
||||
|
||||
// Get an active replica
|
||||
for (Slice slice : testcoll.getSlices()) {
|
||||
if(replica1 != null)
|
||||
break;
|
||||
if (slice.getState() == Slice.State.ACTIVE) {
|
||||
shard1 = slice;
|
||||
for (Replica replica : shard1.getReplicas()) {
|
||||
if (replica.getState() == Replica.State.ACTIVE) {
|
||||
replica1 = replica;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
CoreStatus coreStatus = getCoreStatus(replica);
|
||||
Path dataDir = Paths.get(coreStatus.getDataDirectory());
|
||||
|
||||
if (replica1 == null) fail("no active replicas found");
|
||||
Exception e = expectThrows(Exception.class, () -> {
|
||||
CollectionAdminRequest.deleteReplica(collectionName, shard.getName(), replica.getName())
|
||||
.setOnlyIfDown(true)
|
||||
.process(cluster.getSolrClient());
|
||||
});
|
||||
assertTrue("Unexpected error message: " + e.getMessage(), e.getMessage().contains("state is 'active'"));
|
||||
assertTrue("Data directory for " + replica.getName() + " should not have been deleted", Files.exists(dataDir));
|
||||
|
||||
String dataDir = null;
|
||||
try (HttpSolrClient replica1Client = getHttpSolrClient(replica1.getStr("base_url"))) {
|
||||
CoreAdminResponse status = CoreAdminRequest.getStatus(replica1.getStr("core"), replica1Client);
|
||||
NamedList<Object> coreStatus = status.getCoreStatus(replica1.getStr("core"));
|
||||
dataDir = (String) coreStatus.get("dataDir");
|
||||
}
|
||||
try {
|
||||
// Should not be able to delete a replica that is up if onlyIfDown=true.
|
||||
tryToRemoveOnlyIfDown(collectionName, client, replica1, shard1.getName());
|
||||
fail("Should have thrown an exception here because the replica is NOT down");
|
||||
} catch (SolrException se) {
|
||||
assertEquals("Should see 400 here ", se.code(), 400);
|
||||
assertTrue("Expected DeleteReplica to fail because node state is 'active' but returned message was: " + se.getMessage(), se.getMessage().contains("with onlyIfDown='true', but state is 'active'"));
|
||||
// This bit is a little weak in that if we're screwing up and actually deleting the replica, we might get back
|
||||
// here _before_ the datadir is deleted. But I'd rather not introduce a delay here.
|
||||
assertTrue("dataDir for " + replica1.getName() + " should NOT have been deleted by deleteReplica API with onlyIfDown='true'",
|
||||
new File(dataDir).exists());
|
||||
}
|
||||
CollectionAdminRequest.deleteReplica(collectionName, shard.getName(), replica.getName())
|
||||
.process(cluster.getSolrClient());
|
||||
waitForState("Expected replica " + replica.getName() + " to have been removed", collectionName, (n, c) -> {
|
||||
Slice testShard = c.getSlice(shard.getName());
|
||||
return testShard.getReplica(replica.getName()) == null;
|
||||
});
|
||||
|
||||
removeAndWaitForReplicaGone(collectionName, client, replica1, shard1.getName());
|
||||
assertFalse("dataDir for " + replica1.getName() + " should have been deleted by deleteReplica API", new File(dataDir).exists());
|
||||
}
|
||||
}
|
||||
assertFalse("Data directory for " + replica.getName() + " should have been removed", Files.exists(dataDir));
|
||||
|
||||
|
||||
protected void tryToRemoveOnlyIfDown(String collectionName, CloudSolrClient client, Replica replica, String shard) throws IOException, SolrServerException {
|
||||
Map m = makeMap("collection", collectionName,
|
||||
"action", DELETEREPLICA.toLower(),
|
||||
"shard", shard,
|
||||
"replica", replica.getName(),
|
||||
ONLY_IF_DOWN, "true");
|
||||
SolrParams params = new MapSolrParams(m);
|
||||
SolrRequest request = new QueryRequest(params);
|
||||
request.setPath("/admin/collections");
|
||||
client.request(request);
|
||||
}
|
||||
|
||||
static void removeAndWaitForReplicaGone(String COLL_NAME,
|
||||
CloudSolrClient client, Replica replica, String shard)
|
||||
throws SolrServerException, IOException, InterruptedException {
|
||||
Map m = makeMap("collection", COLL_NAME, "action", DELETEREPLICA.toLower(), "shard",
|
||||
shard, "replica", replica.getName());
|
||||
SolrParams params = new MapSolrParams(m);
|
||||
SolrRequest request = new QueryRequest(params);
|
||||
request.setPath("/admin/collections");
|
||||
client.request(request);
|
||||
TimeOut timeout = new TimeOut(3, TimeUnit.SECONDS);
|
||||
boolean success = false;
|
||||
DocCollection testcoll = null;
|
||||
while (! timeout.hasTimedOut()) {
|
||||
testcoll = client.getZkStateReader()
|
||||
.getClusterState().getCollection(COLL_NAME);
|
||||
success = testcoll.getSlice(shard).getReplica(replica.getName()) == null;
|
||||
if (success) {
|
||||
log.info("replica cleaned up {}/{} core {}",
|
||||
shard + "/" + replica.getName(), replica.getStr("core"));
|
||||
log.info("current state {}", testcoll);
|
||||
break;
|
||||
}
|
||||
Thread.sleep(100);
|
||||
}
|
||||
assertTrue("Replica not cleaned up", success);
|
||||
}
|
||||
|
||||
|
||||
protected void tryRemoveReplicaByCountAndShard(String collectionName, CloudSolrClient client, int count, String shard) throws IOException, SolrServerException {
|
||||
Map m = makeMap("collection", collectionName,
|
||||
"action", DELETEREPLICA.toLower(),
|
||||
"shard", shard,
|
||||
"count", count);
|
||||
SolrParams params = new MapSolrParams(m);
|
||||
SolrRequest request = new QueryRequest(params);
|
||||
request.setPath("/admin/collections");
|
||||
client.request(request);
|
||||
}
|
||||
|
||||
|
||||
protected void tryRemoveReplicaByCountAsync(String collectionName, CloudSolrClient client, int count, String requestid) throws IOException, SolrServerException {
|
||||
Map m = makeMap("collection", collectionName,
|
||||
"action", DELETEREPLICA.toLower(),
|
||||
"count", count,
|
||||
"async", requestid);
|
||||
SolrParams params = new MapSolrParams(m);
|
||||
SolrRequest request = new QueryRequest(params);
|
||||
request.setPath("/admin/collections");
|
||||
client.request(request);
|
||||
}
|
||||
|
||||
|
||||
protected String trackRequestStatus(CloudSolrClient client, String requestId) throws IOException, SolrServerException {
|
||||
Map m = makeMap("action", REQUESTSTATUS.toLower(),
|
||||
"requestid", requestId);
|
||||
SolrParams params = new MapSolrParams(m);
|
||||
SolrRequest request = new QueryRequest(params);
|
||||
request.setPath("/admin/collections");
|
||||
NamedList<Object> resultsList = client.request(request);
|
||||
NamedList innerResponse = (NamedList) resultsList.get("status");
|
||||
return (String) innerResponse.get("state");
|
||||
}
|
||||
|
||||
|
||||
|
||||
protected void createCollection(String COLL_NAME, CloudSolrClient client) throws Exception {
|
||||
int replicationFactor = 2;
|
||||
int numShards = 2;
|
||||
int maxShardsPerNode = ((((numShards+1) * replicationFactor) / getCommonCloudSolrClient()
|
||||
.getZkStateReader().getClusterState().getLiveNodes().size())) + 1;
|
||||
|
||||
Map<String, Object> props = makeMap(
|
||||
ZkStateReader.REPLICATION_FACTOR, replicationFactor,
|
||||
MAX_SHARDS_PER_NODE, maxShardsPerNode,
|
||||
NUM_SLICES, numShards);
|
||||
Map<String,List<Integer>> collectionInfos = new HashMap<>();
|
||||
createCollection(collectionInfos, COLL_NAME, props, client);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ShardsFixed(num = 2)
|
||||
public void deleteReplicaAndVerifyDirectoryCleanup() throws IOException, SolrServerException, InterruptedException {
|
||||
createCollection("deletereplica_test", 1, 2, 4);
|
||||
public void deleteReplicaAndVerifyDirectoryCleanup() throws Exception {
|
||||
|
||||
Replica leader = cloudClient.getZkStateReader().getLeaderRetry("deletereplica_test", "shard1");
|
||||
String baseUrl = (String) leader.get("base_url");
|
||||
String core = (String) leader.get("core");
|
||||
String leaderCoreName = leader.getName();
|
||||
final String collectionName = "deletereplica_test";
|
||||
CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2).process(cluster.getSolrClient());
|
||||
|
||||
String instanceDir;
|
||||
String dataDir;
|
||||
|
||||
try (HttpSolrClient client = getHttpSolrClient(baseUrl)) {
|
||||
CoreAdminResponse statusResp = CoreAdminRequest.getStatus(core, client);
|
||||
NamedList r = statusResp.getCoreStatus().get(core);
|
||||
instanceDir = (String) r.findRecursive("instanceDir");
|
||||
dataDir = (String) r.get("dataDir");
|
||||
}
|
||||
Replica leader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(collectionName, "shard1");
|
||||
|
||||
//Confirm that the instance and data directory exist
|
||||
assertTrue("Instance directory doesn't exist", FileUtils.fileExists(instanceDir));
|
||||
assertTrue("DataDirectory doesn't exist", FileUtils.fileExists(dataDir));
|
||||
CoreStatus coreStatus = getCoreStatus(leader);
|
||||
assertTrue("Instance directory doesn't exist", Files.exists(Paths.get(coreStatus.getInstanceDirectory())));
|
||||
assertTrue("DataDirectory doesn't exist", Files.exists(Paths.get(coreStatus.getDataDirectory())));
|
||||
|
||||
new CollectionAdminRequest.DeleteReplica()
|
||||
.setCollectionName("deletereplica_test")
|
||||
.setShardName("shard1")
|
||||
.setReplica(leaderCoreName)
|
||||
.process(cloudClient);
|
||||
CollectionAdminRequest.deleteReplica(collectionName, "shard1",leader.getName())
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
Replica newLeader = cloudClient.getZkStateReader().getLeaderRetry("deletereplica_test", "shard1");
|
||||
Replica newLeader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(collectionName, "shard1");
|
||||
|
||||
assertFalse(leader.equals(newLeader));
|
||||
|
||||
//Confirm that the instance and data directory were deleted by default
|
||||
|
||||
assertFalse("Instance directory still exists", FileUtils.fileExists(instanceDir));
|
||||
assertFalse("DataDirectory still exists", FileUtils.fileExists(dataDir));
|
||||
assertFalse("Instance directory still exists", Files.exists(Paths.get(coreStatus.getInstanceDirectory())));
|
||||
assertFalse("DataDirectory still exists", Files.exists(Paths.get(coreStatus.getDataDirectory())));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ShardsFixed(num = 4)
|
||||
public void deleteReplicaByCount() throws Exception {
|
||||
String collectionName = "deleteByCount";
|
||||
try (CloudSolrClient client = createCloudClient(null)) {
|
||||
createCollection(collectionName, 1, 3, 5);
|
||||
|
||||
waitForRecoveriesToFinish(collectionName, false);
|
||||
final String collectionName = "deleteByCount";
|
||||
CollectionAdminRequest.createCollection(collectionName, "conf", 1, 3).process(cluster.getSolrClient());
|
||||
waitForState("Expected a single shard with three replicas", collectionName, clusterShape(1, 3));
|
||||
|
||||
DocCollection testcoll = getCommonCloudSolrClient().getZkStateReader()
|
||||
.getClusterState().getCollection(collectionName);
|
||||
Collection<Slice> slices = testcoll.getActiveSlices();
|
||||
assertEquals(slices.size(), 1);
|
||||
for (Slice individualShard: slices) {
|
||||
assertEquals(individualShard.getReplicas().size(),3);
|
||||
}
|
||||
CollectionAdminRequest.deleteReplicasFromShard(collectionName, "shard1", 2).process(cluster.getSolrClient());
|
||||
waitForState("Expected a single shard with a single replica", collectionName, clusterShape(1, 1));
|
||||
|
||||
|
||||
try {
|
||||
// Should not be able to delete 2 replicas (non leader ones for a given shard
|
||||
tryRemoveReplicaByCountAndShard(collectionName, client, 2, "shard1");
|
||||
testcoll = getCommonCloudSolrClient().getZkStateReader()
|
||||
.getClusterState().getCollection(collectionName);
|
||||
slices = testcoll.getActiveSlices();
|
||||
assertEquals(slices.size(), 1);
|
||||
for (Slice individualShard: slices) {
|
||||
assertEquals(individualShard.getReplicas().size(),1);
|
||||
}
|
||||
|
||||
} catch (SolrException se) {
|
||||
fail("Should have been able to remove the replica successfully");
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@ShardsFixed(num = 4)
|
||||
public void deleteReplicaByCountForAllShards() throws Exception {
|
||||
String collectionName = "deleteByCountNew";
|
||||
try (CloudSolrClient client = createCloudClient(null)) {
|
||||
createCollection(collectionName, 2, 2, 5);
|
||||
|
||||
waitForRecoveriesToFinish(collectionName, false);
|
||||
|
||||
DocCollection testcoll = getCommonCloudSolrClient().getZkStateReader()
|
||||
.getClusterState().getCollection(collectionName);
|
||||
Collection<Slice> slices = testcoll.getActiveSlices();
|
||||
assertEquals(slices.size(), 2);
|
||||
for (Slice individualShard: slices) {
|
||||
assertEquals(individualShard.getReplicas().size(),2);
|
||||
}
|
||||
|
||||
String requestIdAsync = "1000";
|
||||
|
||||
try {
|
||||
// Should not be able to delete 2 replicas from all shards (non leader ones)
|
||||
tryRemoveReplicaByCountAsync(collectionName, client, 1, requestIdAsync);
|
||||
|
||||
//Make sure request completes
|
||||
String requestStatus = trackRequestStatus(client, requestIdAsync);
|
||||
|
||||
while ((!requestStatus.equals(RequestStatusState.COMPLETED.getKey())) && (!requestStatus.equals(RequestStatusState.FAILED.getKey()))) {
|
||||
requestStatus = trackRequestStatus(client, requestIdAsync);
|
||||
}
|
||||
|
||||
|
||||
testcoll = getCommonCloudSolrClient().getZkStateReader()
|
||||
.getClusterState().getCollection(collectionName);
|
||||
slices = testcoll.getActiveSlices();
|
||||
assertEquals(slices.size(), 2);
|
||||
for (Slice individualShard: slices) {
|
||||
assertEquals(individualShard.getReplicas().size(),1);
|
||||
}
|
||||
|
||||
} catch (SolrException se) {
|
||||
fail("Should have been able to remove the replica successfully");
|
||||
}
|
||||
|
||||
}
|
||||
final String collectionName = "deleteByCountNew";
|
||||
CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2).process(cluster.getSolrClient());
|
||||
waitForState("Expected two shards with two replicas each", collectionName, clusterShape(2, 2));
|
||||
|
||||
CollectionAdminRequest.deleteReplicasFromAllShards(collectionName, 1).process(cluster.getSolrClient());
|
||||
waitForState("Expected two shards with one replica each", collectionName, clusterShape(2, 1));
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -1331,6 +1331,10 @@ public class CloudSolrClient extends SolrClient {
|
|||
public LBHttpSolrClient getLbClient() {
|
||||
return lbClient;
|
||||
}
|
||||
|
||||
public HttpClient getHttpClient() {
|
||||
return myClient;
|
||||
}
|
||||
|
||||
public boolean isUpdatesToLeaders() {
|
||||
return updatesToLeaders;
|
||||
|
|
|
@ -1526,29 +1526,53 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
return new DeleteReplica(collection, shard, replica);
|
||||
}
|
||||
|
||||
// DELETEREPLICA request
|
||||
public static class DeleteReplica extends AsyncShardSpecificAdminRequest {
|
||||
/**
|
||||
* Returns a SolrRequest to remove a number of replicas from a specific shard
|
||||
*/
|
||||
public static DeleteReplica deleteReplicasFromShard(String collection, String shard, int count) {
|
||||
return new DeleteReplica(collection, shard, count);
|
||||
}
|
||||
|
||||
public static DeleteReplica deleteReplicasFromAllShards(String collection, int count) {
|
||||
return new DeleteReplica(collection, count);
|
||||
}
|
||||
|
||||
// DELETEREPLICA request
|
||||
public static class DeleteReplica extends AsyncCollectionSpecificAdminRequest {
|
||||
|
||||
protected String shard;
|
||||
protected String replica;
|
||||
protected Boolean onlyIfDown;
|
||||
private Boolean deleteDataDir;
|
||||
private Boolean deleteInstanceDir;
|
||||
private Integer count;
|
||||
private Boolean deleteIndexDir;
|
||||
private Integer count;
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link #deleteReplica(String, String, String)}
|
||||
*/
|
||||
@Deprecated
|
||||
public DeleteReplica() {
|
||||
super(CollectionAction.DELETEREPLICA, null, null);
|
||||
super(CollectionAction.DELETEREPLICA, null);
|
||||
}
|
||||
|
||||
private DeleteReplica(String collection, String shard, String replica) {
|
||||
super(CollectionAction.DELETEREPLICA, collection, shard);
|
||||
super(CollectionAction.DELETEREPLICA, collection);
|
||||
this.shard = shard;
|
||||
this.replica = replica;
|
||||
}
|
||||
|
||||
private DeleteReplica(String collection, String shard, int count) {
|
||||
super(CollectionAction.DELETEREPLICA, collection);
|
||||
this.shard = shard;
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
private DeleteReplica(String collection, int count) {
|
||||
super(CollectionAction.DELETEREPLICA, collection);
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public DeleteReplica setReplica(String replica) {
|
||||
this.replica = replica;
|
||||
|
@ -1575,13 +1599,13 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public DeleteReplica setShardName(String shard) {
|
||||
this.shard = shard;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public DeleteReplica setCount(Integer count) {
|
||||
this.count = count;
|
||||
return this;
|
||||
|
@ -1590,7 +1614,18 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
@Override
|
||||
public SolrParams getParams() {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams(super.getParams());
|
||||
params.set(ZkStateReader.REPLICA_PROP, this.replica);
|
||||
|
||||
// AsyncCollectionSpecificAdminRequest uses 'name' rather than 'collection'
|
||||
// TODO - deal with this inconsistency
|
||||
params.remove(CoreAdminParams.NAME);
|
||||
if (this.collection == null)
|
||||
throw new IllegalArgumentException("You must set a collection name for this request");
|
||||
params.set(ZkStateReader.COLLECTION_PROP, this.collection);
|
||||
|
||||
if (this.replica != null)
|
||||
params.set(ZkStateReader.REPLICA_PROP, this.replica);
|
||||
if (this.shard != null)
|
||||
params.set(ZkStateReader.SHARD_ID_PROP, this.shard);
|
||||
|
||||
if (onlyIfDown != null) {
|
||||
params.set("onlyIfDown", onlyIfDown);
|
||||
|
@ -1605,7 +1640,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
params.set(CoreAdminParams.DELETE_INDEX, deleteIndexDir);
|
||||
}
|
||||
if (count != null) {
|
||||
params.set(COUNT_PROP, deleteIndexDir);
|
||||
params.set(COUNT_PROP, count);
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
@ -1627,6 +1662,15 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
this.deleteInstanceDir = deleteInstanceDir;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Boolean getDeleteIndexDir() {
|
||||
return deleteIndexDir;
|
||||
}
|
||||
|
||||
public DeleteReplica setDeleteIndexDir(Boolean deleteIndexDir) {
|
||||
this.deleteIndexDir = deleteIndexDir;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -619,6 +619,12 @@ public class CoreAdminRequest extends SolrRequest<CoreAdminResponse> {
|
|||
return req.process( client );
|
||||
}
|
||||
|
||||
public static CoreStatus getCoreStatus(String coreName, SolrClient client) throws SolrServerException, IOException {
|
||||
CoreAdminRequest req = new CoreAdminRequest();
|
||||
req.setAction(CoreAdminAction.STATUS);
|
||||
return new CoreStatus(req.process(client).getCoreStatus(coreName));
|
||||
}
|
||||
|
||||
public static CoreAdminResponse getStatus( String name, SolrClient client ) throws SolrServerException, IOException
|
||||
{
|
||||
CoreAdminRequest req = new CoreAdminRequest();
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.client.solrj.request;
|
||||
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
|
||||
public class CoreStatus {
|
||||
|
||||
private final NamedList<Object> response;
|
||||
|
||||
public CoreStatus(NamedList<Object> response) {
|
||||
this.response = response;
|
||||
}
|
||||
|
||||
public String getDataDirectory() {
|
||||
return (String) response.get("dataDir");
|
||||
}
|
||||
|
||||
public String getInstanceDirectory() {
|
||||
return (String) response.findRecursive("instanceDir");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return response.toString();
|
||||
}
|
||||
}
|
|
@ -46,6 +46,7 @@ import org.apache.solr.client.solrj.embedded.SSLConfig;
|
|||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkConfigManager;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
|
@ -501,4 +502,15 @@ public class MiniSolrCloudCluster {
|
|||
}
|
||||
return ok ? null : parsed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the jetty that a particular replica resides on
|
||||
*/
|
||||
public JettySolrRunner getReplicaJetty(Replica replica) {
|
||||
for (JettySolrRunner jetty : jettys) {
|
||||
if (replica.getCoreUrl().startsWith(jetty.getBaseUrl().toString()))
|
||||
return jetty;
|
||||
}
|
||||
throw new IllegalArgumentException("Cannot find Jetty for a replica with core url " + replica.getCoreUrl());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,11 +22,27 @@ import java.nio.charset.Charset;
|
|||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.embedded.JettyConfig;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.CoreStatus;
|
||||
import org.apache.solr.common.cloud.ClusterProperties;
|
||||
import org.apache.solr.common.cloud.CollectionStatePredicate;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
@ -78,6 +94,7 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
|
|||
private JettyConfig jettyConfig = buildJettyConfig("/solr");
|
||||
|
||||
private List<Config> configs = new ArrayList<>();
|
||||
private Map<String, String> clusterProperties = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Create a builder
|
||||
|
@ -127,6 +144,16 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a cluster property
|
||||
* @param propertyName the property name
|
||||
* @param propertyValue the property value
|
||||
*/
|
||||
public Builder withProperty(String propertyName, String propertyValue) {
|
||||
this.clusterProperties.put(propertyName, propertyValue);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure and run the {@link MiniSolrCloudCluster}
|
||||
* @throws Exception if an error occurs on startup
|
||||
|
@ -137,6 +164,13 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
|
|||
for (Config config : configs) {
|
||||
client.uploadConfig(config.path, config.name);
|
||||
}
|
||||
|
||||
if (clusterProperties.size() > 0) {
|
||||
ClusterProperties props = new ClusterProperties(cluster.getSolrClient().getZkStateReader().getZkClient());
|
||||
for (Map.Entry<String, String> entry : clusterProperties.entrySet()) {
|
||||
props.setClusterProperty(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -173,4 +207,105 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
|
|||
throw new RuntimeException("MiniSolrCloudCluster not configured - have you called configureCluster().configure()?");
|
||||
}
|
||||
|
||||
/* Cluster helper methods ************************************/
|
||||
|
||||
/**
|
||||
* Get the collection state for a particular collection
|
||||
*/
|
||||
protected DocCollection getCollectionState(String collectionName) {
|
||||
return cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(collectionName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a particular collection state to appear in the cluster client's state reader
|
||||
*
|
||||
* This is a convenience method using the {@link #DEFAULT_TIMEOUT}
|
||||
*
|
||||
* @param message a message to report on failure
|
||||
* @param collection the collection to watch
|
||||
* @param predicate a predicate to match against the collection state
|
||||
*/
|
||||
protected void waitForState(String message, String collection, CollectionStatePredicate predicate) {
|
||||
AtomicReference<DocCollection> state = new AtomicReference<>();
|
||||
try {
|
||||
cluster.getSolrClient().waitForState(collection, DEFAULT_TIMEOUT, TimeUnit.SECONDS, (n, c) -> {
|
||||
state.set(c);
|
||||
return predicate.matches(n, c);
|
||||
});
|
||||
} catch (Exception e) {
|
||||
fail(message + "\nLast available state: " + state.get());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a {@link CollectionStatePredicate} that returns true if a collection has the expected
|
||||
* number of shards and replicas
|
||||
*/
|
||||
public static CollectionStatePredicate clusterShape(int expectedShards, int expectedReplicas) {
|
||||
return (liveNodes, collectionState) -> {
|
||||
if (collectionState.getSlices().size() != expectedShards)
|
||||
return false;
|
||||
for (Slice slice : collectionState) {
|
||||
int activeReplicas = 0;
|
||||
for (Replica replica : slice) {
|
||||
if (replica.isActive(liveNodes))
|
||||
activeReplicas++;
|
||||
}
|
||||
if (activeReplicas != expectedReplicas)
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a (reproducibly) random shard from a {@link DocCollection}
|
||||
*/
|
||||
protected Slice getRandomShard(DocCollection collection) {
|
||||
List<Slice> shards = new ArrayList<>(collection.getActiveSlices());
|
||||
if (shards.size() == 0)
|
||||
fail("Couldn't get random shard for collection as it has no shards!\n" + collection.toString());
|
||||
Collections.shuffle(shards, random());
|
||||
return shards.get(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a (reproducibly) random replica from a {@link Slice}
|
||||
*/
|
||||
protected Replica getRandomReplica(Slice slice) {
|
||||
List<Replica> replicas = new ArrayList<>(slice.getReplicas());
|
||||
if (replicas.size() == 0)
|
||||
fail("Couldn't get random replica from shard as it has no replicas!\n" + slice.toString());
|
||||
Collections.shuffle(replicas, random());
|
||||
return replicas.get(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a (reproducibly) random replica from a {@link Slice} matching a predicate
|
||||
*/
|
||||
protected Replica getRandomReplica(Slice slice, Predicate<Replica> matchPredicate) {
|
||||
List<Replica> replicas = new ArrayList<>(slice.getReplicas());
|
||||
if (replicas.size() == 0)
|
||||
fail("Couldn't get random replica from shard as it has no replicas!\n" + slice.toString());
|
||||
Collections.shuffle(replicas, random());
|
||||
for (Replica replica : replicas) {
|
||||
if (matchPredicate.test(replica))
|
||||
return replica;
|
||||
}
|
||||
fail("Couldn't get random replica that matched conditions\n" + slice.toString());
|
||||
return null; // just to keep the compiler happy - fail will always throw an Exception
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the {@link CoreStatus} data for a {@link Replica}
|
||||
*
|
||||
* This assumes that the replica is hosted on a live node.
|
||||
*/
|
||||
protected CoreStatus getCoreStatus(Replica replica) throws IOException, SolrServerException {
|
||||
JettySolrRunner jetty = cluster.getReplicaJetty(replica);
|
||||
try (HttpSolrClient client = getHttpSolrClient(jetty.getBaseUrl().toString(), cluster.getSolrClient().getHttpClient())) {
|
||||
return CoreAdminRequest.getCoreStatus(replica.getCoreName(), client);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue