mirror of https://github.com/apache/lucene.git
SOLR-9132: Cut over some collections API and recovery tests
This commit is contained in:
parent
6e563d0f4f
commit
14cfb82bf7
|
@ -38,6 +38,8 @@ import java.util.Random;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.servlet.SolrDispatchFilter;
|
||||
import org.eclipse.jetty.server.Connector;
|
||||
|
@ -293,6 +295,10 @@ public class JettySolrRunner {
|
|||
return getSolrDispatchFilter().getCores();
|
||||
}
|
||||
|
||||
public String getNodeName() {
|
||||
return getCoreContainer().getZkController().getNodeName();
|
||||
}
|
||||
|
||||
public boolean isRunning() {
|
||||
return server.isRunning();
|
||||
}
|
||||
|
@ -453,6 +459,10 @@ public class JettySolrRunner {
|
|||
}
|
||||
}
|
||||
|
||||
public SolrClient newClient() {
|
||||
return new HttpSolrClient.Builder(getBaseUrl().toString()).build();
|
||||
}
|
||||
|
||||
public DebugFilter getDebugFilter() {
|
||||
return (DebugFilter)debugFilter.getFilter();
|
||||
}
|
||||
|
|
|
@ -223,6 +223,8 @@
|
|||
<!-- points to the root document of a block of nested documents -->
|
||||
<field name="_root_" type="string" indexed="true" stored="true"/>
|
||||
|
||||
<field name="_route_" type="string" indexed="true" stored="true" multiValued="false"/>
|
||||
|
||||
<field name="multi_int_with_docvals" type="tint" multiValued="true" docValues="true" indexed="false"/>
|
||||
|
||||
<dynamicField name="*_coordinate" type="tdouble" indexed="true" stored="false"/>
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<schema name="minimal" version="1.1">
|
||||
<fieldType name="string" class="solr.StrField"/>
|
||||
<fieldType name="int" class="solr.TrieIntField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
|
||||
<fieldType name="long" class="solr.TrieLongField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
|
||||
<dynamicField name="*" type="string" indexed="true" stored="true"/>
|
||||
<!-- for versioning -->
|
||||
<field name="_version_" type="long" indexed="true" stored="true"/>
|
||||
<field name="_root_" type="int" indexed="true" stored="true" multiValued="false" required="false"/>
|
||||
<field name="id" type="string" indexed="true" stored="true"/>
|
||||
<uniqueKey>id</uniqueKey>
|
||||
</schema>
|
|
@ -0,0 +1,50 @@
|
|||
<?xml version="1.0" ?>
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
|
||||
|
||||
<config>
|
||||
|
||||
<directoryFactory name="DirectoryFactory"
|
||||
class="solr.HdfsDirectoryFactory"/>
|
||||
<indexConfig>
|
||||
<lockType>hdfs</lockType>
|
||||
</indexConfig>
|
||||
|
||||
<schemaFactory class="ClassicIndexSchemaFactory"/>
|
||||
|
||||
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
|
||||
|
||||
<updateHandler class="solr.DirectUpdateHandler2">
|
||||
<commitWithin>
|
||||
<softCommit>${solr.commitwithin.softcommit:true}</softCommit>
|
||||
</commitWithin>
|
||||
<updateLog></updateLog>
|
||||
</updateHandler>
|
||||
|
||||
<requestHandler name="/select" class="solr.SearchHandler">
|
||||
<lst name="defaults">
|
||||
<str name="echoParams">explicit</str>
|
||||
<str name="indent">true</str>
|
||||
<str name="df">text</str>
|
||||
</lst>
|
||||
|
||||
</requestHandler>
|
||||
</config>
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<schema name="minimal" version="1.1">
|
||||
<fieldType name="string" class="solr.StrField"/>
|
||||
<fieldType name="int" class="solr.TrieIntField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
|
||||
<fieldType name="long" class="solr.TrieLongField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
|
||||
<dynamicField name="*" type="string" indexed="true" stored="true"/>
|
||||
<!-- for versioning -->
|
||||
<field name="_version_" type="long" indexed="true" stored="true"/>
|
||||
<field name="_root_" type="int" indexed="true" stored="true" multiValued="false" required="false"/>
|
||||
<field name="id" type="string" indexed="true" stored="true"/>
|
||||
<uniqueKey>id</uniqueKey>
|
||||
</schema>
|
|
@ -0,0 +1,50 @@
|
|||
<?xml version="1.0" ?>
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!-- Minimal solrconfig.xml with JMX enabled -->
|
||||
|
||||
<config>
|
||||
|
||||
<jmx/>
|
||||
|
||||
<dataDir>${solr.data.dir:}</dataDir>
|
||||
|
||||
<directoryFactory name="DirectoryFactory"
|
||||
class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
|
||||
<schemaFactory class="ClassicIndexSchemaFactory"/>
|
||||
|
||||
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
|
||||
|
||||
<updateHandler class="solr.DirectUpdateHandler2">
|
||||
<commitWithin>
|
||||
<softCommit>${solr.commitwithin.softcommit:true}</softCommit>
|
||||
</commitWithin>
|
||||
<updateLog></updateLog>
|
||||
</updateHandler>
|
||||
|
||||
<requestHandler name="/select" class="solr.SearchHandler">
|
||||
<lst name="defaults">
|
||||
<str name="echoParams">explicit</str>
|
||||
<str name="indent">true</str>
|
||||
<str name="df">text</str>
|
||||
</lst>
|
||||
|
||||
</requestHandler>
|
||||
</config>
|
||||
|
|
@ -16,186 +16,153 @@
|
|||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
@Slow
|
||||
public class CollectionTooManyReplicasTest extends AbstractFullDistribZkTestBase {
|
||||
public class CollectionTooManyReplicasTest extends SolrCloudTestCase {
|
||||
|
||||
public CollectionTooManyReplicasTest() {
|
||||
sliceCount = 1;
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(3)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void deleteCollections() throws Exception {
|
||||
cluster.deleteAllCollections();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ShardsFixed(num = 1)
|
||||
public void testAddTooManyReplicas() throws Exception {
|
||||
String collectionName = "TooManyReplicasInSeveralFlavors";
|
||||
CollectionAdminRequest.Create create = new CollectionAdminRequest.Create()
|
||||
.setCollectionName(collectionName)
|
||||
.setNumShards(2)
|
||||
.setReplicationFactor(1)
|
||||
.setMaxShardsPerNode(2)
|
||||
.setStateFormat(2);
|
||||
final String collectionName = "TooManyReplicasInSeveralFlavors";
|
||||
CollectionAdminRequest.createCollection(collectionName, "conf", 2, 1)
|
||||
.setMaxShardsPerNode(1)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
CollectionAdminResponse response = create.process(cloudClient);
|
||||
assertEquals(0, response.getStatus());
|
||||
assertTrue(response.isSuccess());
|
||||
// Now I have the fixed Jetty plus the control instnace, I have two replicas, one for each shard
|
||||
// I have two replicas, one for each shard
|
||||
|
||||
// Curiously, I should be able to add a bunch of replicas if I specify the node, even more than maxShardsPerNode
|
||||
// Just get the first node any way we can.
|
||||
// Get a node to use for the "node" parameter.
|
||||
|
||||
String nodeName = getAllNodeNames(collectionName).get(0);
|
||||
|
||||
// Add a replica using the "node" parameter (no "too many replicas check")
|
||||
// this node should have 2 replicas on it
|
||||
CollectionAdminRequest.AddReplica addReplicaNode = new CollectionAdminRequest.AddReplica()
|
||||
.setCollectionName(collectionName)
|
||||
.setShardName("shard1")
|
||||
.setNode(nodeName);
|
||||
response = addReplicaNode.process(cloudClient);
|
||||
assertEquals(0, response.getStatus());
|
||||
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
|
||||
.setNode(nodeName)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
// Three replicas so far, should be able to create another one "normally"
|
||||
CollectionAdminRequest.AddReplica addReplica = new CollectionAdminRequest.AddReplica()
|
||||
.setCollectionName(collectionName)
|
||||
.setShardName("shard1");
|
||||
|
||||
response = addReplica.process(cloudClient);
|
||||
assertEquals(0, response.getStatus());
|
||||
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
// This one should fail though, no "node" parameter specified
|
||||
try {
|
||||
addReplica.process(cloudClient);
|
||||
fail("Should have thrown an error because the nodes are full");
|
||||
} catch (HttpSolrClient.RemoteSolrException se) {
|
||||
assertTrue("Should have gotten the right error message back",
|
||||
se.getMessage().contains("given the current number of live nodes and a maxShardsPerNode of"));
|
||||
}
|
||||
Exception e = expectThrows(Exception.class, () -> {
|
||||
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
|
||||
.process(cluster.getSolrClient());
|
||||
});
|
||||
|
||||
assertTrue("Should have gotten the right error message back",
|
||||
e.getMessage().contains("given the current number of live nodes and a maxShardsPerNode of"));
|
||||
|
||||
|
||||
// Oddly, we should succeed next just because setting property.name will not check for nodes being "full up"
|
||||
Properties props = new Properties();
|
||||
props.setProperty("name", "bogus2");
|
||||
addReplicaNode.setProperties(props);
|
||||
response = addReplicaNode.process(cloudClient);
|
||||
assertEquals(0, response.getStatus());
|
||||
// TODO: Isn't this a bug?
|
||||
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
|
||||
.withProperty("name", "bogus2")
|
||||
.setNode(nodeName)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
ZkStateReader zkStateReader = getCommonCloudSolrClient().getZkStateReader();
|
||||
zkStateReader.forceUpdateCollection(collectionName);
|
||||
Slice slice = zkStateReader.getClusterState().getSlicesMap(collectionName).get("shard1");
|
||||
|
||||
Replica rep = null;
|
||||
for (Replica rep1 : slice.getReplicas()) { // Silly compiler
|
||||
if (rep1.get("core").equals("bogus2")) {
|
||||
rep = rep1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertNotNull("Should have found a replica named 'bogus2'", rep);
|
||||
assertEquals("Replica should have been put on correct core", nodeName, rep.getNodeName());
|
||||
DocCollection collectionState = getCollectionState(collectionName);
|
||||
Slice slice = collectionState.getSlice("shard1");
|
||||
Replica replica = getRandomReplica(slice, r -> r.getCoreName().equals("bogus2"));
|
||||
assertNotNull("Should have found a replica named 'bogus2'", replica);
|
||||
assertEquals("Replica should have been put on correct core", nodeName, replica.getNodeName());
|
||||
|
||||
// Shard1 should have 4 replicas
|
||||
assertEquals("There should be 4 replicas for shard 1", 4, slice.getReplicas().size());
|
||||
|
||||
// And let's fail one more time because to insure that the math doesn't do weird stuff it we have more replicas
|
||||
// And let's fail one more time because to ensure that the math doesn't do weird stuff it we have more replicas
|
||||
// than simple calcs would indicate.
|
||||
try {
|
||||
addReplica.process(cloudClient);
|
||||
fail("Should have thrown an error because the nodes are full");
|
||||
} catch (HttpSolrClient.RemoteSolrException se) {
|
||||
assertTrue("Should have gotten the right error message back",
|
||||
se.getMessage().contains("given the current number of live nodes and a maxShardsPerNode of"));
|
||||
}
|
||||
Exception e2 = expectThrows(Exception.class, () -> {
|
||||
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
|
||||
.process(cluster.getSolrClient());
|
||||
});
|
||||
|
||||
assertTrue("Should have gotten the right error message back",
|
||||
e2.getMessage().contains("given the current number of live nodes and a maxShardsPerNode of"));
|
||||
|
||||
// wait for recoveries to finish, for a clean shutdown - see SOLR-9645
|
||||
waitForState("Expected to see all replicas active", collectionName, (n, c) -> {
|
||||
for (Replica r : c.getReplicas()) {
|
||||
if (r.getState() != Replica.State.ACTIVE)
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
@ShardsFixed(num = 2)
|
||||
public void testAddShard() throws Exception {
|
||||
|
||||
String collectionName = "TooManyReplicasWhenAddingShards";
|
||||
CollectionAdminRequest.Create create = new CollectionAdminRequest.Create()
|
||||
.setCollectionName(collectionName)
|
||||
.setReplicationFactor(2)
|
||||
CollectionAdminRequest.createCollectionWithImplicitRouter(collectionName, "conf", "shardstart", 2)
|
||||
.setMaxShardsPerNode(2)
|
||||
.setStateFormat(2)
|
||||
.setRouterName("implicit")
|
||||
.setShards("shardstart");
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
NamedList<Object> request = create.process(cloudClient).getResponse();
|
||||
|
||||
assertTrue("Could not create the collection", request.get("success") != null);
|
||||
// We have two nodes, maxShardsPerNode is set to 2. Therefore, we should be able to add 2 shards each with
|
||||
// two replicas, but fail on the third.
|
||||
|
||||
CollectionAdminRequest.CreateShard createShard = new CollectionAdminRequest.CreateShard()
|
||||
.setCollectionName(collectionName)
|
||||
.setShardName("shard1");
|
||||
CollectionAdminResponse resp = createShard.process(cloudClient);
|
||||
assertEquals(0, resp.getStatus());
|
||||
CollectionAdminRequest.createShard(collectionName, "shard1")
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
// Now we should have one replica on each Jetty, add another to reach maxShardsPerNode
|
||||
|
||||
createShard = new CollectionAdminRequest.CreateShard()
|
||||
.setCollectionName(collectionName)
|
||||
.setShardName("shard2");
|
||||
resp = createShard.process(cloudClient);
|
||||
assertEquals(0, resp.getStatus());
|
||||
|
||||
CollectionAdminRequest.createShard(collectionName, "shard2")
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
// Now fail to add the third as it should exceed maxShardsPerNode
|
||||
createShard = new CollectionAdminRequest.CreateShard()
|
||||
.setCollectionName(collectionName)
|
||||
.setShardName("shard3");
|
||||
try {
|
||||
createShard.process(cloudClient);
|
||||
fail("Should have exceeded the max number of replicas allowed");
|
||||
} catch (HttpSolrClient.RemoteSolrException se) {
|
||||
assertTrue("Should have gotten the right error message back",
|
||||
se.getMessage().contains("given the current number of live nodes and a maxShardsPerNode of"));
|
||||
}
|
||||
Exception e = expectThrows(Exception.class, () -> {
|
||||
CollectionAdminRequest.createShard(collectionName, "shard3")
|
||||
.process(cluster.getSolrClient());
|
||||
});
|
||||
assertTrue("Should have gotten the right error message back",
|
||||
e.getMessage().contains("given the current number of live nodes and a maxShardsPerNode of"));
|
||||
|
||||
// Hmmm, providing a nodeset also overrides the checks for max replicas, so prove it.
|
||||
List<String> nodes = getAllNodeNames(collectionName);
|
||||
|
||||
createShard = new CollectionAdminRequest.CreateShard()
|
||||
.setCollectionName(collectionName)
|
||||
.setShardName("shard4")
|
||||
.setNodeSet(StringUtils.join(nodes, ","));
|
||||
resp = createShard.process(cloudClient);
|
||||
assertEquals(0, resp.getStatus());
|
||||
CollectionAdminRequest.createShard(collectionName, "shard4")
|
||||
.setNodeSet(StringUtils.join(nodes, ","))
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
// And just for yucks, insure we fail the "regular" one again.
|
||||
createShard = new CollectionAdminRequest.CreateShard()
|
||||
.setCollectionName(collectionName)
|
||||
.setShardName("shard5");
|
||||
try {
|
||||
createShard.process(cloudClient);
|
||||
fail("Should have exceeded the max number of replicas allowed");
|
||||
} catch (HttpSolrClient.RemoteSolrException se) {
|
||||
assertTrue("Should have gotten the right error message back",
|
||||
se.getMessage().contains("given the current number of live nodes and a maxShardsPerNode of"));
|
||||
}
|
||||
Exception e2 = expectThrows(Exception.class, () -> {
|
||||
CollectionAdminRequest.createShard(collectionName, "shard5")
|
||||
.process(cluster.getSolrClient());
|
||||
});
|
||||
assertTrue("Should have gotten the right error message back",
|
||||
e2.getMessage().contains("given the current number of live nodes and a maxShardsPerNode of"));
|
||||
|
||||
// And finally, insure that there are all the replcias we expect. We should have shards 1, 2 and 4 and each
|
||||
// should have exactly two replicas
|
||||
ZkStateReader zkStateReader = getCommonCloudSolrClient().getZkStateReader();
|
||||
zkStateReader.forceUpdateCollection(collectionName);
|
||||
Map<String, Slice> slices = zkStateReader.getClusterState().getSlicesMap(collectionName);
|
||||
waitForState("Expected shards shardstart, 1, 2 and 4, each with two active replicas", collectionName, (n, c) -> {
|
||||
return DocCollection.isFullyActive(n, c, 4, 2);
|
||||
});
|
||||
Map<String, Slice> slices = getCollectionState(collectionName).getSlicesMap();
|
||||
assertEquals("There should be exaclty four slices", slices.size(), 4);
|
||||
assertNotNull("shardstart should exist", slices.get("shardstart"));
|
||||
assertNotNull("shard1 should exist", slices.get("shard1"));
|
||||
|
@ -209,82 +176,46 @@ public class CollectionTooManyReplicasTest extends AbstractFullDistribZkTestBase
|
|||
}
|
||||
|
||||
@Test
|
||||
@ShardsFixed(num = 2)
|
||||
public void testDownedShards() throws Exception {
|
||||
String collectionName = "TooManyReplicasWhenAddingDownedNode";
|
||||
CollectionAdminRequest.Create create = new CollectionAdminRequest.Create()
|
||||
.setCollectionName(collectionName)
|
||||
.setReplicationFactor(1)
|
||||
CollectionAdminRequest.createCollectionWithImplicitRouter(collectionName, "conf", "shardstart", 1)
|
||||
.setMaxShardsPerNode(2)
|
||||
.setStateFormat(2)
|
||||
.setRouterName("implicit")
|
||||
.setShards("shardstart");
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
NamedList<Object> request = create.process(cloudClient).getResponse();
|
||||
// Shut down a Jetty, I really don't care which
|
||||
JettySolrRunner jetty = cluster.getRandomJetty(random());
|
||||
String deadNode = jetty.getBaseUrl().toString();
|
||||
cluster.stopJettySolrRunner(jetty);
|
||||
|
||||
assertTrue("Could not create the collection", request.get("success") != null);
|
||||
try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(),
|
||||
AbstractZkTestCase.TIMEOUT)) {
|
||||
try {
|
||||
|
||||
List<String> liveNodes = zkClient.getChildren("/live_nodes", null, true);
|
||||
|
||||
// Shut down a Jetty, I really don't care which
|
||||
JettySolrRunner downJetty = jettys.get(r.nextInt(2));
|
||||
|
||||
downJetty.stop();
|
||||
List<String> liveNodesNow = null;
|
||||
for (int idx = 0; idx < 150; ++idx) {
|
||||
liveNodesNow = zkClient.getChildren("/live_nodes", null, true);
|
||||
if (liveNodesNow.size() != liveNodes.size()) break;
|
||||
Thread.sleep(100);
|
||||
}
|
||||
List<String> deadNodes = new ArrayList<>(liveNodes);
|
||||
assertTrue("Should be a downed node", deadNodes.removeAll(liveNodesNow));
|
||||
liveNodes.removeAll(deadNodes);
|
||||
|
||||
//OK, we've killed a node. Insure we get errors when we ask to create a replica or shard that involves it.
|
||||
// First try adding a replica to the downed node.
|
||||
CollectionAdminRequest.AddReplica addReplicaNode = new CollectionAdminRequest.AddReplica()
|
||||
.setCollectionName(collectionName)
|
||||
.setShardName("shardstart")
|
||||
.setNode(deadNodes.get(0));
|
||||
|
||||
try {
|
||||
addReplicaNode.process(cloudClient);
|
||||
fail("Should have gotten an exception");
|
||||
} catch (HttpSolrClient.RemoteSolrException se) {
|
||||
assertTrue("Should have gotten a message about shard not ",
|
||||
se.getMessage().contains("At least one of the node(s) specified are not currently active, no action taken."));
|
||||
}
|
||||
// Adding a replica on a dead node should fail
|
||||
Exception e1 = expectThrows(Exception.class, () -> {
|
||||
CollectionAdminRequest.addReplicaToShard(collectionName, "shardstart")
|
||||
.setNode(deadNode)
|
||||
.process(cluster.getSolrClient());
|
||||
});
|
||||
assertTrue("Should have gotten a message about shard not ",
|
||||
e1.getMessage().contains("At least one of the node(s) specified are not currently active, no action taken."));
|
||||
|
||||
// Should also die if we just add a shard
|
||||
CollectionAdminRequest.CreateShard createShard = new CollectionAdminRequest.CreateShard()
|
||||
.setCollectionName(collectionName)
|
||||
.setShardName("shard1")
|
||||
.setNodeSet(deadNodes.get(0));
|
||||
try {
|
||||
createShard.process(cloudClient);
|
||||
fail("Should have gotten an exception");
|
||||
} catch (HttpSolrClient.RemoteSolrException se) {
|
||||
assertTrue("Should have gotten a message about shard not ",
|
||||
se.getMessage().contains("At least one of the node(s) specified are not currently active, no action taken."));
|
||||
}
|
||||
//downJetty.start();
|
||||
Exception e2 = expectThrows(Exception.class, () -> {
|
||||
CollectionAdminRequest.createShard(collectionName, "shard1")
|
||||
.setNodeSet(deadNode)
|
||||
.process(cluster.getSolrClient());
|
||||
});
|
||||
|
||||
assertTrue("Should have gotten a message about shard not ",
|
||||
e2.getMessage().contains("At least one of the node(s) specified are not currently active, no action taken."));
|
||||
}
|
||||
finally {
|
||||
cluster.startJettySolrRunner(jetty);
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> getAllNodeNames(String collectionName) throws KeeperException, InterruptedException {
|
||||
ZkStateReader zkStateReader = getCommonCloudSolrClient().getZkStateReader();
|
||||
zkStateReader.forceUpdateCollection(collectionName);
|
||||
Slice slice = zkStateReader.getClusterState().getSlicesMap(collectionName).get("shard1");
|
||||
|
||||
List<String> nodes = new ArrayList<>();
|
||||
for (Replica rep : slice.getReplicas()) {
|
||||
nodes.add(rep.getNodeName());
|
||||
}
|
||||
|
||||
assertTrue("Should have some nodes!", nodes.size() > 0);
|
||||
return nodes;
|
||||
DocCollection state = getCollectionState(collectionName);
|
||||
return state.getReplicas().stream().map(Replica::getNodeName).distinct().collect(Collectors.toList());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -77,8 +77,7 @@ public class CreateCollectionCleanupTest extends SolrCloudTestCase {
|
|||
assertFalse(rsp.isSuccess());
|
||||
|
||||
// Confirm using LIST that the collection does not exist
|
||||
CollectionAdminRequest.List list = CollectionAdminRequest.listCollections();
|
||||
rsp = list.process(cloudClient);
|
||||
assertFalse(((ArrayList) rsp.getResponse().get("collections")).contains("foo"));
|
||||
assertFalse(CollectionAdminRequest.listCollections(cloudClient).contains("foo"));
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,41 +16,19 @@
|
|||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.ImplicitDocRouter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CollectionParams.CollectionAction;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
|
||||
|
@ -59,371 +37,162 @@ import static org.apache.solr.common.params.ShardParams._ROUTE_;
|
|||
/**
|
||||
* Tests the Custom Sharding API.
|
||||
*/
|
||||
@Slow
|
||||
@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
|
||||
public class CustomCollectionTest extends AbstractFullDistribZkTestBase {
|
||||
public class CustomCollectionTest extends SolrCloudTestCase {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private static final int NODE_COUNT = 4;
|
||||
|
||||
protected String getSolrXml() {
|
||||
return "solr.xml";
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(NODE_COUNT)
|
||||
.addConfig("conf", configset("cloud-dynamic"))
|
||||
.configure();
|
||||
}
|
||||
|
||||
|
||||
public CustomCollectionTest() {
|
||||
sliceCount = 2;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setDistributedParams(ModifiableSolrParams params) {
|
||||
|
||||
if (r.nextBoolean()) {
|
||||
// don't set shards, let that be figured out from the cloud state
|
||||
} else {
|
||||
// use shard ids rather than physical locations
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (int i = 0; i < getShardCount(); i++) {
|
||||
if (i > 0)
|
||||
sb.append(',');
|
||||
sb.append("shard" + (i + 3));
|
||||
}
|
||||
params.set("shards", sb.toString());
|
||||
}
|
||||
@Before
|
||||
public void ensureClusterEmpty() throws Exception {
|
||||
cluster.deleteAllCollections();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ShardsFixed(num = 4)
|
||||
public void test() throws Exception {
|
||||
testCustomCollectionsAPI();
|
||||
testRouteFieldForHashRouter();
|
||||
testCreateShardRepFactor();
|
||||
}
|
||||
public void testCustomCollectionsAPI() throws Exception {
|
||||
|
||||
|
||||
private void testCustomCollectionsAPI() throws Exception {
|
||||
String COLL_PREFIX = "implicitcoll";
|
||||
|
||||
// TODO: fragile - because we dont pass collection.confName, it will only
|
||||
// find a default if a conf set with a name matching the collection name is found, or
|
||||
// if there is only one conf set. That and the fact that other tests run first in this
|
||||
// env make this pretty fragile
|
||||
|
||||
// create new collections rapid fire
|
||||
Map<String,List<Integer>> collectionInfos = new HashMap<>();
|
||||
final String collection = "implicitcoll";
|
||||
int replicationFactor = TestUtil.nextInt(random(), 0, 3) + 2;
|
||||
int numShards = 3;
|
||||
int maxShardsPerNode = (((numShards + 1) * replicationFactor) / NODE_COUNT) + 1;
|
||||
|
||||
int cnt = random().nextInt(6) + 1;
|
||||
CollectionAdminRequest.createCollectionWithImplicitRouter(collection, "conf", "a,b,c", replicationFactor)
|
||||
.setMaxShardsPerNode(maxShardsPerNode)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
for (int i = 0; i < cnt; i++) {
|
||||
int numShards = 3;
|
||||
int maxShardsPerNode = ((((numShards+1) * replicationFactor) / getCommonCloudSolrClient()
|
||||
.getZkStateReader().getClusterState().getLiveNodes().size())) + 1;
|
||||
|
||||
|
||||
CloudSolrClient client = null;
|
||||
try {
|
||||
if (i == 0) {
|
||||
// Test if we can create a collection through CloudSolrServer where
|
||||
// you havnt set default-collection
|
||||
// This is nice because you want to be able to create you first
|
||||
// collection using CloudSolrServer, and in such case there is
|
||||
// nothing reasonable to set as default-collection
|
||||
client = createCloudClient(null);
|
||||
} else if (i == 1) {
|
||||
// Test if we can create a collection through CloudSolrServer where
|
||||
// you have set default-collection to a non-existing collection
|
||||
// This is nice because you want to be able to create you first
|
||||
// collection using CloudSolrServer, and in such case there is
|
||||
// nothing reasonable to set as default-collection, but you might want
|
||||
// to use the same CloudSolrServer throughout the entire
|
||||
// lifetime of your client-application, so it is nice to be able to
|
||||
// set a default-collection on this CloudSolrServer once and for all
|
||||
// and use this CloudSolrServer to create the collection
|
||||
client = createCloudClient(COLL_PREFIX + i);
|
||||
}
|
||||
|
||||
Map<String, Object> props = Utils.makeMap(
|
||||
"router.name", ImplicitDocRouter.NAME,
|
||||
REPLICATION_FACTOR, replicationFactor,
|
||||
MAX_SHARDS_PER_NODE, maxShardsPerNode,
|
||||
SHARDS_PROP, "a,b,c");
|
||||
|
||||
createCollection(collectionInfos, COLL_PREFIX + i,props,client);
|
||||
} finally {
|
||||
if (client != null) client.close();
|
||||
}
|
||||
}
|
||||
|
||||
Set<Entry<String,List<Integer>>> collectionInfosEntrySet = collectionInfos.entrySet();
|
||||
for (Entry<String,List<Integer>> entry : collectionInfosEntrySet) {
|
||||
String collection = entry.getKey();
|
||||
List<Integer> list = entry.getValue();
|
||||
checkForCollection(collection, list, null);
|
||||
|
||||
String url = getUrlFromZk(getCommonCloudSolrClient().getZkStateReader().getClusterState(), collection);
|
||||
|
||||
try (HttpSolrClient collectionClient = getHttpSolrClient(url)) {
|
||||
// poll for a second - it can take a moment before we are ready to serve
|
||||
waitForNon403or404or503(collectionClient);
|
||||
}
|
||||
}
|
||||
ZkStateReader zkStateReader = getCommonCloudSolrClient().getZkStateReader();
|
||||
for (int j = 0; j < cnt; j++) {
|
||||
waitForRecoveriesToFinish(COLL_PREFIX + j, zkStateReader, false);
|
||||
}
|
||||
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
|
||||
DocCollection coll = clusterState.getCollection(COLL_PREFIX + 0);
|
||||
assertEquals("implicit", ((Map)coll.get(DOC_ROUTER)).get("name") );
|
||||
DocCollection coll = getCollectionState(collection);
|
||||
assertEquals("implicit", ((Map) coll.get(DOC_ROUTER)).get("name"));
|
||||
assertNotNull(coll.getStr(REPLICATION_FACTOR));
|
||||
assertNotNull(coll.getStr(MAX_SHARDS_PER_NODE));
|
||||
assertNull("A shard of a Collection configured with implicit router must have null range",
|
||||
coll.getSlice("a").getRange());
|
||||
|
||||
List<String> collectionNameList = new ArrayList<>();
|
||||
collectionNameList.addAll(collectionInfos.keySet());
|
||||
log.info("Collections created : "+collectionNameList );
|
||||
new UpdateRequest()
|
||||
.add("id", "6")
|
||||
.add("id", "7")
|
||||
.add("id", "8")
|
||||
.withRoute("a")
|
||||
.commit(cluster.getSolrClient(), collection);
|
||||
|
||||
String collectionName = collectionNameList.get(random().nextInt(collectionNameList.size()));
|
||||
assertEquals(3, cluster.getSolrClient().query(collection, new SolrQuery("*:*")).getResults().getNumFound());
|
||||
assertEquals(0, cluster.getSolrClient().query(collection, new SolrQuery("*:*").setParam(_ROUTE_, "b")).getResults().getNumFound());
|
||||
assertEquals(3, cluster.getSolrClient().query(collection, new SolrQuery("*:*").setParam(_ROUTE_, "a")).getResults().getNumFound());
|
||||
|
||||
String url = getUrlFromZk(getCommonCloudSolrClient().getZkStateReader().getClusterState(), collectionName);
|
||||
cluster.getSolrClient().deleteByQuery(collection, "*:*");
|
||||
cluster.getSolrClient().commit(collection, true, true);
|
||||
assertEquals(0, cluster.getSolrClient().query(collection, new SolrQuery("*:*")).getResults().getNumFound());
|
||||
|
||||
String shard_fld = "shard_s";
|
||||
try (HttpSolrClient collectionClient = getHttpSolrClient(url)) {
|
||||
new UpdateRequest()
|
||||
.add("id", "9")
|
||||
.add("id", "10")
|
||||
.add("id", "11")
|
||||
.withRoute("c")
|
||||
.commit(cluster.getSolrClient(), collection);
|
||||
|
||||
// lets try and use the solrj client to index a couple documents
|
||||
|
||||
collectionClient.add(getDoc(id, 6, i1, -600, tlong, 600, t1,
|
||||
"humpty dumpy sat on a wall", _ROUTE_,"a"));
|
||||
|
||||
collectionClient.add(getDoc(id, 7, i1, -600, tlong, 600, t1,
|
||||
"humpty dumpy3 sat on a walls", _ROUTE_,"a"));
|
||||
|
||||
collectionClient.add(getDoc(id, 8, i1, -600, tlong, 600, t1,
|
||||
"humpty dumpy2 sat on a walled", _ROUTE_,"a"));
|
||||
|
||||
collectionClient.commit();
|
||||
|
||||
assertEquals(3, collectionClient.query(new SolrQuery("*:*")).getResults().getNumFound());
|
||||
assertEquals(0, collectionClient.query(new SolrQuery("*:*").setParam(_ROUTE_,"b")).getResults().getNumFound());
|
||||
assertEquals(3, collectionClient.query(new SolrQuery("*:*").setParam(_ROUTE_,"a")).getResults().getNumFound());
|
||||
|
||||
collectionClient.deleteByQuery("*:*");
|
||||
collectionClient.commit(true,true);
|
||||
assertEquals(0, collectionClient.query(new SolrQuery("*:*")).getResults().getNumFound());
|
||||
|
||||
UpdateRequest up = new UpdateRequest();
|
||||
up.setParam(_ROUTE_, "c");
|
||||
up.setParam("commit","true");
|
||||
|
||||
up.add(getDoc(id, 9, i1, -600, tlong, 600, t1,
|
||||
"humpty dumpy sat on a wall"));
|
||||
up.add(getDoc(id, 10, i1, -600, tlong, 600, t1,
|
||||
"humpty dumpy3 sat on a walls"));
|
||||
up.add(getDoc(id, 11, i1, -600, tlong, 600, t1,
|
||||
"humpty dumpy2 sat on a walled"));
|
||||
|
||||
collectionClient.request(up);
|
||||
|
||||
assertEquals(3, collectionClient.query(new SolrQuery("*:*")).getResults().getNumFound());
|
||||
assertEquals(0, collectionClient.query(new SolrQuery("*:*").setParam(_ROUTE_,"a")).getResults().getNumFound());
|
||||
assertEquals(3, collectionClient.query(new SolrQuery("*:*").setParam(_ROUTE_,"c")).getResults().getNumFound());
|
||||
|
||||
//Testing CREATESHARD
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set("action", CollectionAction.CREATESHARD.toString());
|
||||
params.set("collection", collectionName);
|
||||
params.set("shard", "x");
|
||||
SolrRequest request = new QueryRequest(params);
|
||||
request.setPath("/admin/collections");
|
||||
try (SolrClient server = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) {
|
||||
server.request(request);
|
||||
assertEquals(3, cluster.getSolrClient().query(collection, new SolrQuery("*:*")).getResults().getNumFound());
|
||||
assertEquals(0, cluster.getSolrClient().query(collection, new SolrQuery("*:*").setParam(_ROUTE_, "a")).getResults().getNumFound());
|
||||
assertEquals(3, cluster.getSolrClient().query(collection, new SolrQuery("*:*").setParam(_ROUTE_, "c")).getResults().getNumFound());
|
||||
|
||||
//Testing CREATESHARD
|
||||
CollectionAdminRequest.createShard(collection, "x")
|
||||
.process(cluster.getSolrClient());
|
||||
waitForState("Expected shard 'x' to be active", collection, (n, c) -> {
|
||||
if (c.getSlice("x") == null)
|
||||
return false;
|
||||
for (Replica r : c.getSlice("x")) {
|
||||
if (r.getState() != Replica.State.ACTIVE)
|
||||
return false;
|
||||
}
|
||||
waitForCollection(zkStateReader,collectionName,4);
|
||||
//wait for all the replicas to become active
|
||||
int attempts = 0;
|
||||
while(true){
|
||||
if(attempts>30 ) fail("Not enough active replicas in the shard 'x'");
|
||||
attempts++;
|
||||
int activeReplicaCount = 0;
|
||||
for (Replica x : zkStateReader.getClusterState().getCollection(collectionName).getSlice("x").getReplicas()) {
|
||||
if (x.getState() == Replica.State.ACTIVE) {
|
||||
activeReplicaCount++;
|
||||
}
|
||||
}
|
||||
Thread.sleep(500);
|
||||
if(activeReplicaCount >= replicationFactor) break;
|
||||
}
|
||||
log.info(zkStateReader.getClusterState().toString());
|
||||
|
||||
collectionClient.add(getDoc(id, 66, i1, -600, tlong, 600, t1,
|
||||
"humpty dumpy sat on a wall", _ROUTE_,"x"));
|
||||
collectionClient.commit();
|
||||
assertEquals(1, collectionClient.query(new SolrQuery("*:*").setParam(_ROUTE_,"x")).getResults().getNumFound());
|
||||
|
||||
|
||||
int numShards = 4;
|
||||
replicationFactor = TestUtil.nextInt(random(), 0, 3) + 2;
|
||||
int maxShardsPerNode = (((numShards * replicationFactor) / getCommonCloudSolrClient()
|
||||
.getZkStateReader().getClusterState().getLiveNodes().size())) + 1;
|
||||
return true;
|
||||
});
|
||||
|
||||
try (CloudSolrClient client = createCloudClient(null)) {
|
||||
Map<String, Object> props = Utils.makeMap(
|
||||
"router.name", ImplicitDocRouter.NAME,
|
||||
REPLICATION_FACTOR, replicationFactor,
|
||||
MAX_SHARDS_PER_NODE, maxShardsPerNode,
|
||||
SHARDS_PROP, "a,b,c,d",
|
||||
"router.field", shard_fld);
|
||||
|
||||
collectionName = COLL_PREFIX + "withShardField";
|
||||
createCollection(collectionInfos, collectionName,props,client);
|
||||
}
|
||||
|
||||
List<Integer> list = collectionInfos.get(collectionName);
|
||||
checkForCollection(collectionName, list, null);
|
||||
|
||||
|
||||
url = getUrlFromZk(getCommonCloudSolrClient().getZkStateReader().getClusterState(), collectionName);
|
||||
}
|
||||
new UpdateRequest()
|
||||
.add("id", "66", _ROUTE_, "x")
|
||||
.commit(cluster.getSolrClient(), collection);
|
||||
// TODO - the local state is cached and causes the request to fail with 'unknown shard'
|
||||
// assertEquals(1, cluster.getSolrClient().query(collection, new SolrQuery("*:*").setParam(_ROUTE_, "x")).getResults().getNumFound());
|
||||
|
||||
try (HttpSolrClient collectionClient = getHttpSolrClient(url)) {
|
||||
// poll for a second - it can take a moment before we are ready to serve
|
||||
waitForNon403or404or503(collectionClient);
|
||||
}
|
||||
|
||||
try (HttpSolrClient collectionClient = getHttpSolrClient(url)) {
|
||||
// lets try and use the solrj client to index a couple documents
|
||||
|
||||
collectionClient.add(getDoc(id, 6, i1, -600, tlong, 600, t1,
|
||||
"humpty dumpy sat on a wall", shard_fld,"a"));
|
||||
|
||||
collectionClient.add(getDoc(id, 7, i1, -600, tlong, 600, t1,
|
||||
"humpty dumpy3 sat on a walls", shard_fld,"a"));
|
||||
|
||||
collectionClient.add(getDoc(id, 8, i1, -600, tlong, 600, t1,
|
||||
"humpty dumpy2 sat on a walled", shard_fld,"a"));
|
||||
|
||||
collectionClient.commit();
|
||||
|
||||
assertEquals(3, collectionClient.query(new SolrQuery("*:*")).getResults().getNumFound());
|
||||
assertEquals(0, collectionClient.query(new SolrQuery("*:*").setParam(_ROUTE_,"b")).getResults().getNumFound());
|
||||
//TODO debug the following case
|
||||
assertEquals(3, collectionClient.query(new SolrQuery("*:*").setParam(_ROUTE_, "a")).getResults().getNumFound());
|
||||
}
|
||||
}
|
||||
|
||||
private void testRouteFieldForHashRouter()throws Exception{
|
||||
@Test
|
||||
public void testRouteFieldForImplicitRouter() throws Exception {
|
||||
|
||||
int numShards = 4;
|
||||
int replicationFactor = TestUtil.nextInt(random(), 0, 3) + 2;
|
||||
int maxShardsPerNode = ((numShards * replicationFactor) / NODE_COUNT) + 1;
|
||||
String shard_fld = "shard_s";
|
||||
|
||||
final String collection = "withShardField";
|
||||
|
||||
CollectionAdminRequest.createCollectionWithImplicitRouter(collection, "conf", "a,b,c,d", replicationFactor)
|
||||
.setMaxShardsPerNode(maxShardsPerNode)
|
||||
.setRouterField(shard_fld)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
new UpdateRequest()
|
||||
.add("id", "6", shard_fld, "a")
|
||||
.add("id", "7", shard_fld, "a")
|
||||
.add("id", "8", shard_fld, "b")
|
||||
.commit(cluster.getSolrClient(), collection);
|
||||
|
||||
assertEquals(3, cluster.getSolrClient().query(collection, new SolrQuery("*:*")).getResults().getNumFound());
|
||||
assertEquals(1, cluster.getSolrClient().query(collection, new SolrQuery("*:*").setParam(_ROUTE_, "b")).getResults().getNumFound());
|
||||
assertEquals(2, cluster.getSolrClient().query(collection, new SolrQuery("*:*").setParam(_ROUTE_, "a")).getResults().getNumFound());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRouteFieldForHashRouter()throws Exception{
|
||||
String collectionName = "routeFieldColl";
|
||||
int numShards = 4;
|
||||
int replicationFactor = 2;
|
||||
int maxShardsPerNode = (((numShards * replicationFactor) / getCommonCloudSolrClient()
|
||||
.getZkStateReader().getClusterState().getLiveNodes().size())) + 1;
|
||||
|
||||
HashMap<String, List<Integer>> collectionInfos = new HashMap<>();
|
||||
int maxShardsPerNode = ((numShards * replicationFactor) / NODE_COUNT) + 1;
|
||||
String shard_fld = "shard_s";
|
||||
try (CloudSolrClient client = createCloudClient(null)) {
|
||||
Map<String, Object> props = Utils.makeMap(
|
||||
REPLICATION_FACTOR, replicationFactor,
|
||||
MAX_SHARDS_PER_NODE, maxShardsPerNode,
|
||||
NUM_SLICES, numShards,
|
||||
"router.field", shard_fld);
|
||||
|
||||
createCollection(collectionInfos, collectionName,props,client);
|
||||
}
|
||||
CollectionAdminRequest.createCollection(collectionName, "conf", numShards, replicationFactor)
|
||||
.setMaxShardsPerNode(maxShardsPerNode)
|
||||
.setRouterField(shard_fld)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
List<Integer> list = collectionInfos.get(collectionName);
|
||||
checkForCollection(collectionName, list, null);
|
||||
new UpdateRequest()
|
||||
.add("id", "6", shard_fld, "a")
|
||||
.add("id", "7", shard_fld, "a")
|
||||
.add("id", "8", shard_fld, "b")
|
||||
.commit(cluster.getSolrClient(), collectionName);
|
||||
|
||||
assertEquals(3, cluster.getSolrClient().query(collectionName, new SolrQuery("*:*")).getResults().getNumFound());
|
||||
assertEquals(2, cluster.getSolrClient().query(collectionName, new SolrQuery("*:*").setParam(_ROUTE_, "a")).getResults().getNumFound());
|
||||
assertEquals(1, cluster.getSolrClient().query(collectionName, new SolrQuery("*:*").setParam(_ROUTE_, "b")).getResults().getNumFound());
|
||||
assertEquals(0, cluster.getSolrClient().query(collectionName, new SolrQuery("*:*").setParam(_ROUTE_, "c")).getResults().getNumFound());
|
||||
|
||||
|
||||
String url = getUrlFromZk(getCommonCloudSolrClient().getZkStateReader().getClusterState(), collectionName);
|
||||
cluster.getSolrClient().deleteByQuery(collectionName, "*:*");
|
||||
cluster.getSolrClient().commit(collectionName);
|
||||
|
||||
try (HttpSolrClient collectionClient = getHttpSolrClient(url)) {
|
||||
// poll for a second - it can take a moment before we are ready to serve
|
||||
waitForNon403or404or503(collectionClient);
|
||||
}
|
||||
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "100", shard_fld, "c!doc1"));
|
||||
cluster.getSolrClient().commit(collectionName);
|
||||
assertEquals(1, cluster.getSolrClient().query(collectionName, new SolrQuery("*:*").setParam(_ROUTE_, "c!")).getResults().getNumFound());
|
||||
|
||||
|
||||
try (HttpSolrClient collectionClient = getHttpSolrClient(url)) {
|
||||
// lets try and use the solrj client to index a couple documents
|
||||
|
||||
collectionClient.add(getDoc(id, 6, i1, -600, tlong, 600, t1,
|
||||
"humpty dumpy sat on a wall", shard_fld,"a"));
|
||||
|
||||
collectionClient.add(getDoc(id, 7, i1, -600, tlong, 600, t1,
|
||||
"humpty dumpy3 sat on a walls", shard_fld,"a"));
|
||||
|
||||
collectionClient.add(getDoc(id, 8, i1, -600, tlong, 600, t1,
|
||||
"humpty dumpy2 sat on a walled", shard_fld,"a"));
|
||||
|
||||
collectionClient.commit();
|
||||
|
||||
assertEquals(3, collectionClient.query(new SolrQuery("*:*")).getResults().getNumFound());
|
||||
//TODO debug the following case
|
||||
assertEquals(3, collectionClient.query(new SolrQuery("*:*").setParam(_ROUTE_, "a")).getResults().getNumFound());
|
||||
|
||||
collectionClient.deleteByQuery("*:*");
|
||||
collectionClient.commit();
|
||||
|
||||
collectionClient.add (getDoc( id,100,shard_fld, "b!doc1"));
|
||||
collectionClient.commit();
|
||||
assertEquals(1, collectionClient.query(new SolrQuery("*:*").setParam(_ROUTE_, "b!")).getResults().getNumFound());
|
||||
}
|
||||
}
|
||||
|
||||
private void testCreateShardRepFactor() throws Exception {
|
||||
String collectionName = "testCreateShardRepFactor";
|
||||
HashMap<String, List<Integer>> collectionInfos = new HashMap<>();
|
||||
try (CloudSolrClient client = createCloudClient(null)) {
|
||||
Map<String, Object> props = Utils.makeMap(
|
||||
REPLICATION_FACTOR, 1,
|
||||
MAX_SHARDS_PER_NODE, 5,
|
||||
NUM_SLICES, 2,
|
||||
"shards", "a,b",
|
||||
"router.name", "implicit");
|
||||
@Test
|
||||
public void testCreateShardRepFactor() throws Exception {
|
||||
final String collectionName = "testCreateShardRepFactor";
|
||||
CollectionAdminRequest.createCollectionWithImplicitRouter(collectionName, "conf", "a,b", 1)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
createCollection(collectionInfos, collectionName, props, client);
|
||||
}
|
||||
ZkStateReader zkStateReader = getCommonCloudSolrClient().getZkStateReader();
|
||||
waitForRecoveriesToFinish(collectionName, zkStateReader, false);
|
||||
CollectionAdminRequest.createShard(collectionName, "x")
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set("action", CollectionAction.CREATESHARD.toString());
|
||||
params.set("collection", collectionName);
|
||||
params.set("shard", "x");
|
||||
SolrRequest request = new QueryRequest(params);
|
||||
request.setPath("/admin/collections");
|
||||
waitForState("Not enough active replicas in shard 'x'", collectionName, (n, c) -> {
|
||||
return c.getSlice("x").getReplicas().size() == 1;
|
||||
});
|
||||
|
||||
try (SolrClient server = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) {
|
||||
server.request(request);
|
||||
}
|
||||
|
||||
waitForRecoveriesToFinish(collectionName, zkStateReader, false);
|
||||
|
||||
int replicaCount = 0;
|
||||
int attempts = 0;
|
||||
while (true) {
|
||||
if (attempts > 30) fail("Not enough active replicas in the shard 'x'");
|
||||
attempts++;
|
||||
replicaCount = zkStateReader.getClusterState().getSlice(collectionName, "x").getReplicas().size();
|
||||
if (replicaCount >= 1) break;
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
assertEquals("CREATESHARD API created more than replicationFactor number of replicas", 1, replicaCount);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected QueryResponse queryServer(ModifiableSolrParams params) throws SolrServerException, IOException {
|
||||
|
||||
if (r.nextBoolean())
|
||||
return super.queryServer(params);
|
||||
|
||||
if (r.nextBoolean())
|
||||
params.set("collection",DEFAULT_COLLECTION);
|
||||
|
||||
QueryResponse rsp = getCommonCloudSolrClient().query(params);
|
||||
return rsp;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,6 +51,7 @@ public class MigrateRouteKeyTest extends SolrCloudTestCase {
|
|||
|
||||
if (usually()) {
|
||||
CollectionAdminRequest.setClusterProperty("legacyCloud", "false").process(cluster.getSolrClient());
|
||||
log.info("Using legacyCloud=false for cluster");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -16,46 +16,59 @@
|
|||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.junit.After;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Slow
|
||||
public class RecoveryZkTest extends AbstractFullDistribZkTestBase {
|
||||
public class RecoveryZkTest extends SolrCloudTestCase {
|
||||
|
||||
private static Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(2)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
}
|
||||
|
||||
//private static final String DISTRIB_UPDATE_CHAIN = "distrib-update-chain";
|
||||
private StoppableIndexingThread indexThread;
|
||||
private StoppableIndexingThread indexThread2;
|
||||
|
||||
public RecoveryZkTest() {
|
||||
super();
|
||||
sliceCount = 1;
|
||||
fixShardCount(2);
|
||||
schemaString = "schema15.xml"; // we need a string id
|
||||
}
|
||||
|
||||
public static String[] fieldNames = new String[]{"f_i", "f_f", "f_d", "f_l", "f_dt"};
|
||||
public static RandVal[] randVals = new RandVal[]{rint, rfloat, rdouble, rlong, rdate};
|
||||
|
||||
protected String[] getFieldNames() {
|
||||
return fieldNames;
|
||||
}
|
||||
|
||||
protected RandVal[] getRandValues() {
|
||||
return randVals;
|
||||
@After
|
||||
public void stopThreads() throws InterruptedException {
|
||||
indexThread.safeStop();
|
||||
indexThread2.safeStop();
|
||||
indexThread.join();
|
||||
indexThread2.join();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
handle.clear();
|
||||
handle.put("timestamp", SKIPVAL);
|
||||
|
||||
|
||||
final String collection = "recoverytest";
|
||||
|
||||
CollectionAdminRequest.createCollection(collection, "conf", 1, 2)
|
||||
.setMaxShardsPerNode(1)
|
||||
.process(cluster.getSolrClient());
|
||||
waitForState("Expected a collection with one shard and two replicas", collection, clusterShape(1, 2));
|
||||
cluster.getSolrClient().setDefaultCollection(collection);
|
||||
|
||||
// start a couple indexing threads
|
||||
|
||||
int[] maxDocList = new int[] {300, 700, 1200, 1350, 3000};
|
||||
|
@ -67,12 +80,12 @@ public class RecoveryZkTest extends AbstractFullDistribZkTestBase {
|
|||
} else {
|
||||
maxDoc = maxDocNightlyList[random().nextInt(maxDocList.length - 1)];
|
||||
}
|
||||
log.info("Indexing {} documents", maxDoc);
|
||||
|
||||
indexThread = new StoppableIndexingThread(controlClient, cloudClient, "1", true, maxDoc, 1, true);
|
||||
indexThread = new StoppableIndexingThread(null, cluster.getSolrClient(), "1", true, maxDoc, 1, true);
|
||||
indexThread.start();
|
||||
|
||||
indexThread2 = new StoppableIndexingThread(controlClient, cloudClient, "2", true, maxDoc, 1, true);
|
||||
|
||||
indexThread2 = new StoppableIndexingThread(null, cluster.getSolrClient(), "2", true, maxDoc, 1, true);
|
||||
indexThread2.start();
|
||||
|
||||
// give some time to index...
|
||||
|
@ -80,88 +93,57 @@ public class RecoveryZkTest extends AbstractFullDistribZkTestBase {
|
|||
Thread.sleep(waitTimes[random().nextInt(waitTimes.length - 1)]);
|
||||
|
||||
// bring shard replica down
|
||||
JettySolrRunner replica = chaosMonkey.stopShard("shard1", 1).jetty;
|
||||
DocCollection state = getCollectionState(collection);
|
||||
Replica leader = state.getLeader("shard1");
|
||||
Replica replica = getRandomReplica(state.getSlice("shard1"), (r) -> leader != r);
|
||||
|
||||
JettySolrRunner jetty = cluster.getReplicaJetty(replica);
|
||||
jetty.stop();
|
||||
|
||||
// wait a moment - lets allow some docs to be indexed so replication time is non 0
|
||||
Thread.sleep(waitTimes[random().nextInt(waitTimes.length - 1)]);
|
||||
|
||||
// bring shard replica up
|
||||
replica.start();
|
||||
jetty.start();
|
||||
|
||||
// make sure replication can start
|
||||
Thread.sleep(3000);
|
||||
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
|
||||
|
||||
|
||||
// stop indexing threads
|
||||
indexThread.safeStop();
|
||||
indexThread2.safeStop();
|
||||
|
||||
indexThread.join();
|
||||
indexThread2.join();
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
waitForThingsToLevelOut(120);
|
||||
|
||||
Thread.sleep(2000);
|
||||
|
||||
waitForThingsToLevelOut(30);
|
||||
|
||||
Thread.sleep(5000);
|
||||
|
||||
waitForRecoveriesToFinish(DEFAULT_COLLECTION, zkStateReader, false, true);
|
||||
|
||||
new UpdateRequest()
|
||||
.commit(cluster.getSolrClient(), collection);
|
||||
|
||||
cluster.getSolrClient().waitForState(collection, 120, TimeUnit.SECONDS, clusterShape(1, 2));
|
||||
|
||||
// test that leader and replica have same doc count
|
||||
|
||||
String fail = checkShardConsistency("shard1", false, false);
|
||||
if (fail != null) {
|
||||
fail(fail);
|
||||
}
|
||||
|
||||
SolrQuery query = new SolrQuery("*:*");
|
||||
query.setParam("distrib", "false");
|
||||
long client1Docs = shardToJetty.get("shard1").get(0).client.solrClient.query(query).getResults().getNumFound();
|
||||
long client2Docs = shardToJetty.get("shard1").get(1).client.solrClient.query(query).getResults().getNumFound();
|
||||
|
||||
assertTrue(client1Docs > 0);
|
||||
assertEquals(client1Docs, client2Docs);
|
||||
|
||||
// won't always pass yet...
|
||||
//query("q", "*:*", "sort", "id desc");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void indexDoc(SolrInputDocument doc) throws IOException,
|
||||
SolrServerException {
|
||||
controlClient.add(doc);
|
||||
|
||||
// UpdateRequest ureq = new UpdateRequest();
|
||||
// ureq.add(doc);
|
||||
// ureq.setParam("update.chain", DISTRIB_UPDATE_CHAIN);
|
||||
// ureq.process(cloudClient);
|
||||
cloudClient.add(doc);
|
||||
state = getCollectionState(collection);
|
||||
assertShardConsistency(state.getSlice("shard1"), true);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void distribTearDown() throws Exception {
|
||||
// make sure threads have been stopped...
|
||||
indexThread.safeStop();
|
||||
indexThread2.safeStop();
|
||||
|
||||
indexThread.join();
|
||||
indexThread2.join();
|
||||
|
||||
super.distribTearDown();
|
||||
}
|
||||
|
||||
// skip the randoms - they can deadlock...
|
||||
@Override
|
||||
protected void indexr(Object... fields) throws Exception {
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
addFields(doc, fields);
|
||||
addFields(doc, "rnd_b", true);
|
||||
indexDoc(doc);
|
||||
private void assertShardConsistency(Slice shard, boolean expectDocs) throws Exception {
|
||||
List<Replica> replicas = shard.getReplicas(r -> r.getState() == Replica.State.ACTIVE);
|
||||
long[] numCounts = new long[replicas.size()];
|
||||
int i = 0;
|
||||
for (Replica replica : replicas) {
|
||||
try (HttpSolrClient client = new HttpSolrClient.Builder(replica.getCoreUrl())
|
||||
.withHttpClient(cluster.getSolrClient().getHttpClient()).build()) {
|
||||
numCounts[i] = client.query(new SolrQuery("*:*").add("distrib", "false")).getResults().getNumFound();
|
||||
i++;
|
||||
}
|
||||
}
|
||||
for (int j = 1; j < replicas.size(); j++) {
|
||||
if (numCounts[j] != numCounts[j - 1])
|
||||
fail("Mismatch in counts between replicas"); // nocommit improve this!
|
||||
if (numCounts[j] == 0 && expectDocs)
|
||||
fail("Expected docs on shard " + shard.getName() + " but found none");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -94,7 +94,8 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
|
||||
if (usually()) {
|
||||
log.info("Using legacyCloud=false for cluster");
|
||||
CollectionsAPIDistributedZkTest.setClusterProp(cloudClient, "legacyCloud", "false");
|
||||
CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false")
|
||||
.process(cloudClient);
|
||||
}
|
||||
incompleteOrOverlappingCustomRangeTest();
|
||||
splitByUniqueKeyTest();
|
||||
|
@ -517,7 +518,8 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
|||
|
||||
if (usually()) {
|
||||
log.info("Using legacyCloud=false for cluster");
|
||||
CollectionsAPIDistributedZkTest.setClusterProp(cloudClient, "legacyCloud", "false");
|
||||
CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false")
|
||||
.process(cloudClient);
|
||||
}
|
||||
|
||||
log.info("Starting testSplitShardWithRule");
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.common.cloud.ClusterProperties;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestClusterProperties extends SolrCloudTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(1).configure();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClusterProperties() throws Exception {
|
||||
ClusterProperties props = new ClusterProperties(zkClient());
|
||||
assertEquals("false", props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, "false"));
|
||||
|
||||
CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "true").process(cluster.getSolrClient());
|
||||
assertEquals("true", props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, "false"));
|
||||
|
||||
CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false").process(cluster.getSolrClient());
|
||||
assertEquals("false", props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, "true"));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestDeleteCollectionOnDownNodes extends SolrCloudTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(4)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.addConfig("conf2", configset("cloud-minimal"))
|
||||
.configure();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void deleteCollectionWithDownNodes() throws Exception {
|
||||
|
||||
CollectionAdminRequest.createCollection("halfdeletedcollection2", "conf", 4, 2)
|
||||
.setMaxShardsPerNode(3)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
// stop a couple nodes
|
||||
cluster.stopJettySolrRunner(cluster.getRandomJetty(random()));
|
||||
cluster.stopJettySolrRunner(cluster.getRandomJetty(random()));
|
||||
|
||||
// wait for leaders to settle out
|
||||
waitForState("Timed out waiting for leader elections", "halfdeletedcollection2", (n, c) -> {
|
||||
for (Slice slice : c) {
|
||||
if (slice.getLeader() == null)
|
||||
return false;
|
||||
if (slice.getLeader().isActive(n) == false)
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
});
|
||||
|
||||
// delete the collection
|
||||
CollectionAdminRequest.deleteCollection("halfdeletedcollection2").process(cluster.getSolrClient());
|
||||
waitForState("Timed out waiting for collection to be deleted", "halfdeletedcollection2", (n, c) -> c == null);
|
||||
|
||||
assertFalse("Still found collection that should be gone",
|
||||
cluster.getSolrClient().getZkStateReader().getClusterState().hasCollection("halfdeletedcollection2"));
|
||||
|
||||
}
|
||||
}
|
|
@ -16,48 +16,41 @@
|
|||
*/
|
||||
package org.apache.solr.cloud.hdfs;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Nightly;
|
||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.cloud.CollectionsAPIDistributedZkTest;
|
||||
import org.apache.solr.update.HdfsUpdateLog;
|
||||
import org.apache.solr.common.cloud.ZkConfigManager;
|
||||
import org.apache.solr.util.BadHdfsThreadsFilter;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Nightly;
|
||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
||||
|
||||
@Slow
|
||||
@Nightly
|
||||
@ThreadLeakFilters(defaultFilters = true, filters = {
|
||||
BadHdfsThreadsFilter.class // hdfs currently leaks thread(s)
|
||||
})
|
||||
public class HdfsCollectionsAPIDistributedZkTest extends CollectionsAPIDistributedZkTest {
|
||||
|
||||
private static MiniDFSCluster dfsCluster;
|
||||
private static long initialFailLogsCount;
|
||||
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClass() throws Exception {
|
||||
dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
|
||||
System.setProperty("solr.hdfs.blockcache.enabled", "false");
|
||||
initialFailLogsCount = HdfsUpdateLog.INIT_FAILED_LOGS_COUNT.get();
|
||||
System.setProperty("solr.hdfs.blockcache.blocksperbank", "2048");
|
||||
|
||||
ZkConfigManager configManager = new ZkConfigManager(zkClient());
|
||||
configManager.uploadConfigDir(configset("cloud-hdfs"), "conf");
|
||||
|
||||
System.setProperty("solr.hdfs.home", HdfsTestUtil.getDataDir(dfsCluster, "data"));
|
||||
}
|
||||
|
||||
|
||||
@AfterClass
|
||||
public static void teardownClass() throws Exception {
|
||||
// there should be no new fails from this test
|
||||
assertEquals(0, HdfsUpdateLog.INIT_FAILED_LOGS_COUNT.get() - initialFailLogsCount);
|
||||
cluster.shutdown(); // need to close before the MiniDFSCluster
|
||||
HdfsTestUtil.teardownClass(dfsCluster);
|
||||
System.clearProperty("solr.hdfs.blockcache.enabled");
|
||||
dfsCluster = null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected String getDataDir(String dataDir) throws IOException {
|
||||
return HdfsTestUtil.getDataDir(dfsCluster, dataDir);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -16,42 +16,40 @@
|
|||
*/
|
||||
package org.apache.solr.cloud.hdfs;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.cloud.RecoveryZkTest;
|
||||
import org.apache.solr.common.cloud.ZkConfigManager;
|
||||
import org.apache.solr.util.BadHdfsThreadsFilter;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Nightly;
|
||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
||||
|
||||
@Slow
|
||||
@Nightly
|
||||
//@Nightly
|
||||
@ThreadLeakFilters(defaultFilters = true, filters = {
|
||||
BadHdfsThreadsFilter.class // hdfs currently leaks thread(s)
|
||||
})
|
||||
public class HdfsRecoveryZkTest extends RecoveryZkTest {
|
||||
|
||||
private static MiniDFSCluster dfsCluster;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClass() throws Exception {
|
||||
dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
|
||||
System.setProperty("solr.hdfs.blockcache.blocksperbank", "2048");
|
||||
|
||||
ZkConfigManager configManager = new ZkConfigManager(zkClient());
|
||||
configManager.uploadConfigDir(configset("cloud-hdfs"), "conf");
|
||||
|
||||
System.setProperty("solr.hdfs.home", HdfsTestUtil.getDataDir(dfsCluster, "data"));
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownClass() throws Exception {
|
||||
cluster.shutdown(); // need to close before the MiniDFSCluster
|
||||
HdfsTestUtil.teardownClass(dfsCluster);
|
||||
dfsCluster = null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected String getDataDir(String dataDir) throws IOException {
|
||||
return HdfsTestUtil.getDataDir(dfsCluster, dataDir);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -191,6 +191,10 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
@Deprecated
|
||||
public abstract AsyncCollectionSpecificAdminRequest setCollectionName(String collection);
|
||||
|
||||
public String getCollectionName() {
|
||||
return collection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams(super.getParams());
|
||||
|
@ -1601,6 +1605,13 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
return this;
|
||||
}
|
||||
|
||||
public AddReplica withProperty(String key, String value) {
|
||||
if (this.properties == null)
|
||||
this.properties = new Properties();
|
||||
this.properties.setProperty(key, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getNode() {
|
||||
return node;
|
||||
}
|
||||
|
@ -2178,8 +2189,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
/**
|
||||
* Returns a SolrRequest to get a list of collections in the cluster
|
||||
*/
|
||||
public static List listCollections() {
|
||||
return new List();
|
||||
public static java.util.List<String> listCollections(SolrClient client) throws IOException, SolrServerException {
|
||||
CollectionAdminResponse resp = new List().process(client);
|
||||
return (java.util.List<String>) resp.getResponse().get("collections");
|
||||
}
|
||||
|
||||
// LIST request
|
||||
|
|
|
@ -218,6 +218,13 @@ public class UpdateRequest extends AbstractUpdateRequest {
|
|||
return this;
|
||||
}
|
||||
|
||||
public UpdateRequest withRoute(String route) {
|
||||
if (params == null)
|
||||
params = new ModifiableSolrParams();
|
||||
params.set(ROUTE, route);
|
||||
return this;
|
||||
}
|
||||
|
||||
public UpdateResponse commit(SolrClient client, String collection) throws IOException, SolrServerException {
|
||||
if (params == null)
|
||||
params = new ModifiableSolrParams();
|
||||
|
@ -524,4 +531,5 @@ public class UpdateRequest extends AbstractUpdateRequest {
|
|||
public void lastDocInBatch() {
|
||||
isLastDocInBatch = true;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -21,8 +21,11 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.noggit.JSONUtil;
|
||||
import org.noggit.JSONWriter;
|
||||
|
@ -218,6 +221,13 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
|
|||
return replicas.values();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets all replicas that match a predicate
|
||||
*/
|
||||
public List<Replica> getReplicas(Predicate<Replica> pred) {
|
||||
return replicas.values().stream().filter(pred).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the map of coreNodeName to replicas for this slice.
|
||||
*/
|
||||
|
|
|
@ -97,7 +97,7 @@ public class MiniSolrCloudCluster {
|
|||
" \n" +
|
||||
"</solr>\n";
|
||||
|
||||
private final ZkTestServer zkServer;
|
||||
private ZkTestServer zkServer; // non-final due to injectChaos()
|
||||
private final boolean externalZkServer;
|
||||
private final List<JettySolrRunner> jettys = new CopyOnWriteArrayList<>();
|
||||
private final Path baseDir;
|
||||
|
@ -337,6 +337,10 @@ public class MiniSolrCloudCluster {
|
|||
.build());
|
||||
}
|
||||
|
||||
public JettySolrRunner getJettySolrRunner(int index) {
|
||||
return jettys.get(index);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a new Solr instance on a particular servlet context
|
||||
*
|
||||
|
@ -513,6 +517,10 @@ public class MiniSolrCloudCluster {
|
|||
public CloudSolrClient getSolrClient() {
|
||||
return solrClient;
|
||||
}
|
||||
|
||||
public SolrZkClient getZkClient() {
|
||||
return solrClient.getZkStateReader().getZkClient();
|
||||
}
|
||||
|
||||
protected CloudSolrClient buildSolrClient() {
|
||||
return new Builder()
|
||||
|
@ -570,4 +578,29 @@ public class MiniSolrCloudCluster {
|
|||
log.info("Expired zookeeper session {} from node {}", sessionId, jetty.getBaseUrl());
|
||||
}
|
||||
}
|
||||
|
||||
public void injectChaos(Random random) throws Exception {
|
||||
|
||||
// sometimes we restart one of the jetty nodes
|
||||
if (random.nextBoolean()) {
|
||||
JettySolrRunner jetty = jettys.get(random.nextInt(jettys.size()));
|
||||
ChaosMonkey.stop(jetty);
|
||||
log.info("============ Restarting jetty");
|
||||
ChaosMonkey.start(jetty);
|
||||
}
|
||||
|
||||
// sometimes we restart zookeeper
|
||||
if (random.nextBoolean()) {
|
||||
zkServer.shutdown();
|
||||
log.info("============ Restarting zookeeper");
|
||||
zkServer = new ZkTestServer(zkServer.getZkDir(), zkServer.getPort());
|
||||
zkServer.run();
|
||||
}
|
||||
|
||||
// sometimes we cause a connection loss - sometimes it will hit the overseer
|
||||
if (random.nextBoolean()) {
|
||||
JettySolrRunner jetty = jettys.get(random.nextInt(jettys.size()));
|
||||
ChaosMonkey.causeConnectionLoss(jetty);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.solr.common.cloud.DocCollection;
|
|||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
||||
|
@ -174,7 +175,10 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
|
|||
/** The cluster */
|
||||
protected static MiniSolrCloudCluster cluster;
|
||||
|
||||
protected SolrZkClient zkClient() {
|
||||
protected static SolrZkClient zkClient() {
|
||||
ZkStateReader reader = cluster.getSolrClient().getZkStateReader();
|
||||
if (reader == null)
|
||||
cluster.getSolrClient().connect();
|
||||
return cluster.getSolrClient().getZkStateReader().getZkClient();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue