mirror of https://github.com/apache/lucene.git
SOLR-4044: CloudSolrClient.connect() can take a timeout parameter to wait for the cluster
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1665174 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1968413943
commit
a8fd826f4a
|
@ -152,6 +152,10 @@ New Features
|
|||
|
||||
* SOLR-5846: EnumField supports DocValues functionality. (Elran Dvir, shalin)
|
||||
|
||||
* SOLR-4044: CloudSolrClient.connect() throws a more useful exception if the
|
||||
cluster is not ready, and can now take an optional timeout argument to wait
|
||||
for the cluster. (Alan Woodward, shalin, yonik, Mark Miller, Vitaliy Zhovtyuk)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -99,14 +99,6 @@ import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
|
|||
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
|
||||
/**
|
||||
* Handle ZooKeeper interactions.
|
||||
*
|
||||
|
@ -599,6 +591,20 @@ public final class ZkController {
|
|||
return zkServerAddress;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the zknodes necessary for a cluster to operate
|
||||
* @param zkClient a SolrZkClient
|
||||
* @throws KeeperException if there is a Zookeeper error
|
||||
* @throws InterruptedException on interrupt
|
||||
*/
|
||||
public static void createClusterZkNodes(SolrZkClient zkClient) throws KeeperException, InterruptedException {
|
||||
ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
|
||||
cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
|
||||
cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
|
||||
cmdExecutor.ensureExists(ZkStateReader.ALIASES, zkClient);
|
||||
cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, zkClient);
|
||||
}
|
||||
|
||||
private void init(CurrentCoreDescriptorProvider registerOnReconnect) {
|
||||
|
||||
try {
|
||||
|
@ -610,11 +616,9 @@ public final class ZkController {
|
|||
publishAndWaitForDownStates();
|
||||
}
|
||||
|
||||
// makes nodes zkNode
|
||||
cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
|
||||
|
||||
createClusterZkNodes(zkClient);
|
||||
|
||||
createEphemeralLiveNode();
|
||||
cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
|
||||
|
||||
ShardHandler shardHandler;
|
||||
UpdateShardHandler updateShardHandler;
|
||||
|
|
|
@ -86,6 +86,9 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
public MockZKController(String zkAddress, String nodeName) throws InterruptedException, TimeoutException, IOException, KeeperException {
|
||||
this.nodeName = nodeName;
|
||||
zkClient = new SolrZkClient(zkAddress, TIMEOUT);
|
||||
|
||||
ZkController.createClusterZkNodes(zkClient);
|
||||
|
||||
zkStateReader = new ZkStateReader(zkClient);
|
||||
zkStateReader.createClusterStateWatchersAndUpdate();
|
||||
|
||||
|
@ -235,7 +238,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
|
||||
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
|
||||
zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
|
||||
ZkController.createClusterZkNodes(zkClient);
|
||||
|
||||
overseerClient = electNewOverseer(server.getZkAddress());
|
||||
|
||||
|
@ -290,7 +293,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
|
||||
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
|
||||
zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
|
||||
ZkController.createClusterZkNodes(zkClient);
|
||||
|
||||
overseerClient = electNewOverseer(server.getZkAddress());
|
||||
|
||||
|
@ -370,7 +373,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
|
||||
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
|
||||
zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
|
||||
ZkController.createClusterZkNodes(zkClient);
|
||||
|
||||
overseerClient = electNewOverseer(server.getZkAddress());
|
||||
|
||||
|
@ -534,7 +537,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
|
||||
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
|
||||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
zkClient.makePath("/live_nodes", true);
|
||||
ZkController.createClusterZkNodes(zkClient);
|
||||
|
||||
reader = new ZkStateReader(zkClient);
|
||||
reader.createClusterStateWatchersAndUpdate();
|
||||
|
@ -632,8 +635,8 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
|
||||
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
|
||||
|
||||
zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
|
||||
|
||||
ZkController.createClusterZkNodes(zkClient);
|
||||
|
||||
reader = new ZkStateReader(zkClient);
|
||||
reader.createClusterStateWatchersAndUpdate();
|
||||
|
@ -751,7 +754,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
|
||||
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
|
||||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
|
||||
ZkController.createClusterZkNodes(controllerClient);
|
||||
|
||||
killer = new OverseerRestarter(server.getZkAddress());
|
||||
killerThread = new Thread(killer);
|
||||
|
@ -808,7 +811,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
|
||||
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
|
||||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
|
||||
ZkController.createClusterZkNodes(controllerClient);
|
||||
|
||||
reader = new ZkStateReader(controllerClient);
|
||||
reader.createClusterStateWatchersAndUpdate();
|
||||
|
@ -872,8 +875,8 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
|
||||
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
|
||||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
|
||||
|
||||
ZkController.createClusterZkNodes(controllerClient);
|
||||
|
||||
reader = new ZkStateReader(controllerClient);
|
||||
reader.createClusterStateWatchersAndUpdate();
|
||||
|
||||
|
@ -914,7 +917,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
|
||||
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
|
||||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
|
||||
ZkController.createClusterZkNodes(controllerClient);
|
||||
|
||||
reader = new ZkStateReader(controllerClient);
|
||||
reader.createClusterStateWatchersAndUpdate();
|
||||
|
@ -1046,7 +1049,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
|
||||
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
|
||||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
|
||||
ZkController.createClusterZkNodes(zkClient);
|
||||
|
||||
reader = new ZkStateReader(zkClient);
|
||||
reader.createClusterStateWatchersAndUpdate();
|
||||
|
|
|
@ -17,14 +17,12 @@ package org.apache.solr.cloud.overseer;
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.cloud.AbstractZkTestCase;
|
||||
import org.apache.solr.cloud.Overseer;
|
||||
import org.apache.solr.cloud.OverseerTest;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.cloud.ZkTestServer;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
|
@ -33,6 +31,9 @@ import org.apache.solr.common.cloud.Slice;
|
|||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class ZkStateWriterTest extends SolrTestCaseJ4 {
|
||||
|
||||
public void testZkStateWriterBatching() throws Exception {
|
||||
|
@ -48,7 +49,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
|
|||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
|
||||
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
|
||||
zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
|
||||
ZkController.createClusterZkNodes(zkClient);
|
||||
|
||||
ZkStateReader reader = new ZkStateReader(zkClient);
|
||||
reader.createClusterStateWatchersAndUpdate();
|
||||
|
@ -131,7 +132,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
|
|||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
|
||||
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
|
||||
zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
|
||||
ZkController.createClusterZkNodes(zkClient);
|
||||
|
||||
ZkStateReader reader = new ZkStateReader(zkClient);
|
||||
reader.createClusterStateWatchersAndUpdate();
|
||||
|
|
|
@ -74,6 +74,8 @@ import java.util.concurrent.ExecutionException;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* SolrJ client class to communicate with SolrCloud.
|
||||
|
@ -472,6 +474,30 @@ public class CloudSolrClient extends SolrClient {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to a cluster. If the cluster is not ready, retry connection up to a given timeout.
|
||||
* @param duration the timeout
|
||||
* @param timeUnit the units of the timeout
|
||||
* @throws TimeoutException if the cluster is not ready after the timeout
|
||||
* @throws InterruptedException if the wait is interrupted
|
||||
*/
|
||||
public void connect(long duration, TimeUnit timeUnit) throws TimeoutException, InterruptedException {
|
||||
log.info("Waiting for {} {} for cluster at {} to be ready", duration, timeUnit, zkHost);
|
||||
long timeout = System.nanoTime() + timeUnit.toNanos(duration);
|
||||
while (System.nanoTime() < timeout) {
|
||||
try {
|
||||
connect();
|
||||
log.info("Cluster at {} ready", zkHost);
|
||||
return;
|
||||
}
|
||||
catch (RuntimeException e) {
|
||||
// not ready yet, then...
|
||||
}
|
||||
TimeUnit.MILLISECONDS.sleep(250);
|
||||
}
|
||||
throw new TimeoutException("Timed out waiting for cluster");
|
||||
}
|
||||
|
||||
public void setParallelUpdates(boolean parallelUpdates) {
|
||||
this.parallelUpdates = parallelUpdates;
|
||||
}
|
||||
|
|
|
@ -733,4 +733,11 @@ public class SolrZkClient implements Closeable {
|
|||
return e;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the address of the zookeeper cluster
|
||||
*/
|
||||
public String getZkServerAddress() {
|
||||
return zkServerAddress;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -295,12 +295,10 @@ public class ZkStateReader implements Closeable {
|
|||
// We need to fetch the current cluster state and the set of live nodes
|
||||
|
||||
synchronized (getUpdateLock()) {
|
||||
cmdExecutor.ensureExists(CLUSTER_STATE, zkClient);
|
||||
cmdExecutor.ensureExists(ALIASES, zkClient);
|
||||
|
||||
|
||||
log.info("Updating cluster state from ZooKeeper... ");
|
||||
|
||||
zkClient.exists(CLUSTER_STATE, new Watcher() {
|
||||
Stat stat = zkClient.exists(CLUSTER_STATE, new Watcher() {
|
||||
|
||||
@Override
|
||||
public void process(WatchedEvent event) {
|
||||
|
@ -339,6 +337,10 @@ public class ZkStateReader implements Closeable {
|
|||
}
|
||||
|
||||
}, true);
|
||||
|
||||
if (stat == null)
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
|
||||
"Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -45,10 +45,11 @@ import org.apache.solr.common.params.ModifiableSolrParams;
|
|||
import org.apache.solr.common.params.ShardParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.SimpleOrderedMap;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -610,17 +611,20 @@ public class CloudSolrClientTest extends AbstractFullDistribZkTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Rule
|
||||
public ExpectedException exception = ExpectedException.none();
|
||||
|
||||
public void testWrongZkChrootTest() throws IOException {
|
||||
|
||||
exception.expect(SolrException.class);
|
||||
exception.expectMessage("cluster not found/not ready");
|
||||
|
||||
try (CloudSolrClient client = new CloudSolrClient(zkServer.getZkAddress() + "/xyz/foo")) {
|
||||
client.setDefaultCollection(DEFAULT_COLLECTION);
|
||||
client.setZkClientTimeout(1000 * 60);
|
||||
client.connect();
|
||||
fail("Expected exception");
|
||||
} catch(SolrException e) {
|
||||
assertTrue(e.getCause() instanceof KeeperException);
|
||||
}
|
||||
// see SOLR-6146 - this test will fail by virtue of the zkClient tracking performed
|
||||
// in the afterClass method of the base class
|
||||
}
|
||||
|
||||
public void customHttpClientTest() throws IOException {
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
package org.apache.solr.client.solrj.impl;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.MiniSolrCloudCluster;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class TestCloudSolrClientConnections extends SolrTestCaseJ4 {
|
||||
|
||||
@Test
|
||||
public void testCloudClientCanConnectAfterClusterComesUp() throws Exception {
|
||||
|
||||
// Start by creating a cluster with no jetties
|
||||
|
||||
File solrXml = getFile("solrj").toPath().resolve("solr/solr.xml").toFile();
|
||||
MiniSolrCloudCluster cluster = new MiniSolrCloudCluster(0, createTempDir().toFile(), solrXml, buildJettyConfig("/solr"));
|
||||
try {
|
||||
|
||||
CloudSolrClient client = cluster.getSolrClient();
|
||||
CollectionAdminRequest.List listReq = new CollectionAdminRequest.List();
|
||||
|
||||
try {
|
||||
client.request(listReq);
|
||||
fail("Requests to a non-running cluster should throw a SolrException");
|
||||
}
|
||||
catch (SolrException e) {
|
||||
assertTrue("Unexpected message: " + e.getMessage(), e.getMessage().contains("cluster not found/not ready"));
|
||||
}
|
||||
|
||||
cluster.startJettySolrRunner();
|
||||
client.connect(20, TimeUnit.SECONDS);
|
||||
|
||||
// should work now!
|
||||
client.request(listReq);
|
||||
|
||||
}
|
||||
finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue