mirror of https://github.com/apache/lucene.git
SOLR-11449: MoveReplicaCmd mistakenly called registerCollectionStateWatcher on failure.
This commit is contained in:
parent
5e1474dca4
commit
338a421175
|
@ -243,6 +243,8 @@ Bug Fixes
|
|||
|
||||
* SOLR-11425: SolrClientBuilder does not allow infinite timeout (value 0). (Peter Szantai-Kis via Mark Miller)
|
||||
|
||||
* SOLR-11449: MoveReplicaCmd mistakenly called registerCollectionStateWatcher on failure. (ab)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -206,7 +206,7 @@ public class MoveReplicaCmd implements Cmd{
|
|||
log.warn(errorString);
|
||||
results.add("failure", errorString);
|
||||
if (watcher != null) { // unregister
|
||||
ocmh.zkStateReader.registerCollectionStateWatcher(coll.getName(), watcher);
|
||||
ocmh.zkStateReader.removeCollectionStateWatcher(coll.getName(), watcher);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -22,22 +22,30 @@ import java.lang.invoke.MethodHandles;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest;
|
||||
import org.apache.solr.client.solrj.response.CoreAdminResponse;
|
||||
import org.apache.solr.client.solrj.response.RequestStatusState;
|
||||
import org.apache.solr.common.cloud.CollectionStateWatcher;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkStateReaderAccessor;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -46,17 +54,40 @@ import org.slf4j.LoggerFactory;
|
|||
public class MoveReplicaTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static ZkStateReaderAccessor accessor;
|
||||
private static int overseerLeaderIndex;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(4)
|
||||
.addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
|
||||
.configure();
|
||||
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
|
||||
JettySolrRunner overseerJetty = null;
|
||||
String overseerLeader = (String) overSeerStatus.get("leader");
|
||||
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
||||
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
|
||||
if (jetty.getNodeName().equals(overseerLeader)) {
|
||||
overseerJetty = jetty;
|
||||
overseerLeaderIndex = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (overseerJetty == null) {
|
||||
fail("no overseer leader!");
|
||||
}
|
||||
accessor = new ZkStateReaderAccessor(overseerJetty.getCoreContainer().getZkController().getZkStateReader());
|
||||
}
|
||||
|
||||
protected String getSolrXml() {
|
||||
return "solr.xml";
|
||||
}
|
||||
|
||||
@Before
|
||||
public void beforeTest() throws Exception {
|
||||
cluster.deleteAllCollections();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
cluster.waitForAllNodes(5000);
|
||||
|
@ -89,6 +120,11 @@ public class MoveReplicaTest extends SolrCloudTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
Set<CollectionStateWatcher> watchers = new HashSet<>(accessor.getStateWatchers(coll));
|
||||
|
||||
int sourceNumCores = getNumOfCores(cloudClient, replica.getNodeName(), coll);
|
||||
int targetNumCores = getNumOfCores(cloudClient, targetNode, coll);
|
||||
|
||||
CollectionAdminRequest.MoveReplica moveReplica = createMoveReplicaRequest(coll, replica, targetNode);
|
||||
moveReplica.processAsync("000", cloudClient);
|
||||
CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("000");
|
||||
|
@ -104,8 +140,8 @@ public class MoveReplicaTest extends SolrCloudTestCase {
|
|||
Thread.sleep(500);
|
||||
}
|
||||
assertTrue(success);
|
||||
checkNumOfCores(cloudClient, replica.getNodeName(), 0);
|
||||
assertTrue("should be at least one core on target node!", getNumOfCores(cloudClient, targetNode) > 0);
|
||||
assertEquals("should be one less core on the source node!", sourceNumCores - 1, getNumOfCores(cloudClient, replica.getNodeName(), coll));
|
||||
assertEquals("should be one more core on target node!", targetNumCores + 1, getNumOfCores(cloudClient, targetNode, coll));
|
||||
// wait for recovery
|
||||
boolean recovered = false;
|
||||
for (int i = 0; i < 300; i++) {
|
||||
|
@ -144,9 +180,12 @@ public class MoveReplicaTest extends SolrCloudTestCase {
|
|||
}
|
||||
assertTrue("replica never fully recovered", recovered);
|
||||
|
||||
Set<CollectionStateWatcher> newWatchers = new HashSet<>(accessor.getStateWatchers(coll));
|
||||
assertEquals(watchers, newWatchers);
|
||||
|
||||
moveReplica = createMoveReplicaRequest(coll, replica, targetNode, shardId);
|
||||
moveReplica.process(cloudClient);
|
||||
checkNumOfCores(cloudClient, replica.getNodeName(), 1);
|
||||
checkNumOfCores(cloudClient, replica.getNodeName(), coll, sourceNumCores);
|
||||
// wait for recovery
|
||||
recovered = false;
|
||||
for (int i = 0; i < 300; i++) {
|
||||
|
@ -182,6 +221,65 @@ public class MoveReplicaTest extends SolrCloudTestCase {
|
|||
}
|
||||
}
|
||||
assertTrue("replica never fully recovered", recovered);
|
||||
newWatchers = new HashSet<>(accessor.getStateWatchers(coll));
|
||||
assertEquals(watchers, newWatchers);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailedMove() throws Exception {
|
||||
String coll = "movereplicatest_failed_coll";
|
||||
int REPLICATION = 2;
|
||||
|
||||
CloudSolrClient cloudClient = cluster.getSolrClient();
|
||||
|
||||
Set<CollectionStateWatcher> watchers = new HashSet<>(accessor.getStateWatchers(coll));
|
||||
|
||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, REPLICATION);
|
||||
cloudClient.request(create);
|
||||
|
||||
Replica replica = getRandomReplica(coll, cloudClient);
|
||||
Set<String> liveNodes = cloudClient.getZkStateReader().getClusterState().getLiveNodes();
|
||||
ArrayList<String> l = new ArrayList<>(liveNodes);
|
||||
Collections.shuffle(l, random());
|
||||
String targetNode = null;
|
||||
for (String node : liveNodes) {
|
||||
if (!replica.getNodeName().equals(node)) {
|
||||
targetNode = node;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertNotNull(targetNode);
|
||||
CollectionAdminRequest.MoveReplica moveReplica = createMoveReplicaRequest(coll, replica, targetNode);
|
||||
// start moving
|
||||
moveReplica.processAsync("001", cloudClient);
|
||||
// shut down target node
|
||||
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
||||
if (cluster.getJettySolrRunner(i).getNodeName().equals(targetNode)) {
|
||||
cluster.stopJettySolrRunner(i);
|
||||
}
|
||||
}
|
||||
CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("001");
|
||||
// wait for async request success
|
||||
boolean success = true;
|
||||
for (int i = 0; i < 200; i++) {
|
||||
CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient);
|
||||
assertTrue(rsp.getRequestStatus().toString(), rsp.getRequestStatus() != RequestStatusState.COMPLETED);
|
||||
if (rsp.getRequestStatus() == RequestStatusState.FAILED) {
|
||||
success = false;
|
||||
break;
|
||||
}
|
||||
Thread.sleep(500);
|
||||
}
|
||||
assertFalse(success);
|
||||
|
||||
Set<CollectionStateWatcher> newWatchers = new HashSet<>(accessor.getStateWatchers(coll));
|
||||
for (Iterator<CollectionStateWatcher> it = newWatchers.iterator(); it.hasNext(); ) {
|
||||
CollectionStateWatcher watcher = it.next();
|
||||
if (watcher instanceof ReplaceNodeCmd.RecoveryWatcher) {
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
assertEquals(watchers, newWatchers);
|
||||
}
|
||||
|
||||
private CollectionAdminRequest.MoveReplica createMoveReplicaRequest(String coll, Replica replica, String targetNode, String shardId) {
|
||||
|
@ -233,14 +331,30 @@ public class MoveReplicaTest extends SolrCloudTestCase {
|
|||
return replicas.get(0);
|
||||
}
|
||||
|
||||
private void checkNumOfCores(CloudSolrClient cloudClient, String nodeName, int expectedCores) throws IOException, SolrServerException {
|
||||
assertEquals(nodeName + " does not have expected number of cores",expectedCores, getNumOfCores(cloudClient, nodeName));
|
||||
private void checkNumOfCores(CloudSolrClient cloudClient, String nodeName, String collectionName, int expectedCores) throws IOException, SolrServerException {
|
||||
assertEquals(nodeName + " does not have expected number of cores",expectedCores, getNumOfCores(cloudClient, nodeName, collectionName));
|
||||
}
|
||||
|
||||
private int getNumOfCores(CloudSolrClient cloudClient, String nodeName) throws IOException, SolrServerException {
|
||||
private int getNumOfCores(CloudSolrClient cloudClient, String nodeName, String collectionName) throws IOException, SolrServerException {
|
||||
try (HttpSolrClient coreclient = getHttpSolrClient(cloudClient.getZkStateReader().getBaseUrlForNodeName(nodeName))) {
|
||||
CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreclient);
|
||||
if (status.getCoreStatus().size() == 0) {
|
||||
return 0;
|
||||
}
|
||||
// filter size by collection name
|
||||
if (collectionName == null) {
|
||||
return status.getCoreStatus().size();
|
||||
} else {
|
||||
int size = 0;
|
||||
Iterator<Map.Entry<String, NamedList<Object>>> it = status.getCoreStatus().iterator();
|
||||
while (it.hasNext()) {
|
||||
String coll = (String)it.next().getValue().findRecursive("cloud", "collection");
|
||||
if (collectionName.equals(coll)) {
|
||||
size++;
|
||||
}
|
||||
}
|
||||
return size;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.common.cloud;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Helper class to access package-private methods from ZkStateReader.
|
||||
*/
|
||||
public class ZkStateReaderAccessor {
|
||||
ZkStateReader zkStateReader;
|
||||
|
||||
public ZkStateReaderAccessor(ZkStateReader zkStateReader) {
|
||||
this.zkStateReader = zkStateReader;
|
||||
}
|
||||
|
||||
public Set<CollectionStateWatcher> getStateWatchers(String collection) {
|
||||
return zkStateReader.getStateWatchers(collection);
|
||||
}
|
||||
|
||||
|
||||
}
|
Loading…
Reference in New Issue