SOLR-9140: Replace some zk state polling with CollectionStateWatchers

This commit is contained in:
Alan Woodward 2016-06-03 15:05:20 +01:00
parent 950fd91335
commit d550b1ca43
6 changed files with 106 additions and 90 deletions

View File

@ -277,6 +277,9 @@ Optimizations
* SOLR-8744: Overseer operations performed with fine grained mutual exclusion (noble, Scott Blum) * SOLR-8744: Overseer operations performed with fine grained mutual exclusion (noble, Scott Blum)
* SOLR-9140: Replace zk polling in ZkController with CollectionStateWatchers
(Alan Woodward)
Other Changes Other Changes
---------------------- ----------------------
* SOLR-7516: Improve javadocs for JavaBinCodec, ObjectResolver and enforce the single-usage policy. * SOLR-7516: Improve javadocs for JavaBinCodec, ObjectResolver and enforce the single-usage policy.

View File

@ -27,6 +27,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.*; import java.util.*;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -683,34 +684,22 @@ public final class ZkController {
publishNodeAsDown(getNodeName()); publishNodeAsDown(getNodeName());
// now wait till the updates are in our state Set<String> collections = cc.getLocalCollections();
long now = System.nanoTime(); CountDownLatch latch = new CountDownLatch(collections.size());
long timeout = now + TimeUnit.NANOSECONDS.convert(WAIT_DOWN_STATES_TIMEOUT_SECONDS, TimeUnit.SECONDS);
boolean foundStates = true;
while (System.nanoTime() < timeout) { for (String collection : collections) {
ClusterState clusterState = zkStateReader.getClusterState(); zkStateReader.registerCollectionStateWatcher(collection, (nodes, state) -> {
Map<String, DocCollection> collections = clusterState.getCollectionsMap(); for (Replica replica : state.getReplicasOnNode(getNodeName())) {
for (Map.Entry<String, DocCollection> entry : collections.entrySet()) { if (replica.getState() != Replica.State.DOWN)
DocCollection collection = entry.getValue(); return false;
Collection<Slice> slices = collection.getSlices();
for (Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
if (getNodeName().equals(replica.getNodeName()) && replica.getState() != Replica.State.DOWN) {
foundStates = false;
}
}
} }
} latch.countDown();
return true;
if (foundStates) { });
Thread.sleep(1000);
break;
}
Thread.sleep(1000);
} }
if (!foundStates) {
if (latch.await(WAIT_DOWN_STATES_TIMEOUT_SECONDS, TimeUnit.SECONDS) == false) {
// TODO should we abort here?
log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state."); log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state.");
} }
@ -1366,7 +1355,7 @@ public final class ZkController {
return zkStateReader; return zkStateReader;
} }
private void doGetShardIdAndNodeNameProcess(CoreDescriptor cd) { private void doGetShardIdAndNodeNameProcess(CoreDescriptor cd) throws InterruptedException {
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName(); final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
if (coreNodeName != null) { if (coreNodeName != null) {
@ -1378,58 +1367,45 @@ public final class ZkController {
} }
} }
private void waitForCoreNodeName(CoreDescriptor descriptor) { private void waitForCoreNodeName(CoreDescriptor descriptor) throws InterruptedException {
int retryCount = 320; log.info("Waiting for coreNodeName for core {} in collection {} to be assigned",
log.info("look for our core node name"); descriptor.getName(), descriptor.getCollectionName());
while (retryCount-- > 0) { final String thisNode = getNodeName();
Map<String, Slice> slicesMap = zkStateReader.getClusterState() try {
.getSlicesMap(descriptor.getCloudDescriptor().getCollectionName()); zkStateReader.waitForState(descriptor.getCollectionName(), 320, TimeUnit.SECONDS, (n, c) -> {
if (slicesMap != null) { if (c == null)
return false;
for (Slice slice : slicesMap.values()) { for (Replica replica : c.getReplicasOnNode(thisNode)) {
for (Replica replica : slice.getReplicas()) { if (descriptor.getName().equals(replica.getCoreName())) {
// TODO: for really large clusters, we could 'index' on this descriptor.getCloudDescriptor().setCoreNodeName(replica.getName());
return true;
String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
String msgNodeName = getNodeName();
String msgCore = descriptor.getName();
if (msgNodeName.equals(nodeName) && core.equals(msgCore)) {
descriptor.getCloudDescriptor()
.setCoreNodeName(replica.getName());
return;
}
} }
} }
} return false;
try { });
Thread.sleep(1000); } catch (TimeoutException e) {
} catch (InterruptedException e) { throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out getting coreNodeName for " + descriptor.getName());
Thread.currentThread().interrupt();
}
} }
} }
private void waitForShardId(CoreDescriptor cd) { private void waitForShardId(CoreDescriptor cd) throws InterruptedException {
log.info("waiting to find shard id in clusterstate for " + cd.getName()); log.info("waiting to find shard id in clusterstate for " + cd.getName());
int retryCount = 320; final String thisNode = getNodeName();
while (retryCount-- > 0) { try {
final String shardId = zkStateReader.getClusterState().getShardId(cd.getCollectionName(), getNodeName(), cd.getName()); zkStateReader.waitForState(cd.getCollectionName(), 320, TimeUnit.SECONDS, (n, c) -> {
if (shardId != null) { if (c == null)
cd.getCloudDescriptor().setShardId(shardId); return false;
return; String shardId = c.getShardId(thisNode, cd.getName());
} if (shardId != null) {
try { cd.getCloudDescriptor().setShardId(shardId);
Thread.sleep(1000); return true;
} catch (InterruptedException e) { }
Thread.currentThread().interrupt(); return false;
} });
}
catch (TimeoutException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out getting shard id for core: " + cd.getName());
} }
throw new SolrException(ErrorCode.SERVER_ERROR,
"Could not get shard id for core: " + cd.getName());
} }
@ -1443,7 +1419,7 @@ public final class ZkController {
return coreNodeName; return coreNodeName;
} }
public void preRegister(CoreDescriptor cd) { public void preRegister(CoreDescriptor cd) throws InterruptedException {
String coreNodeName = getCoreNodeName(cd); String coreNodeName = getCoreNodeName(cd);

View File

@ -16,17 +16,6 @@
*/ */
package org.apache.solr.core; package org.apache.solr.core;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Collections.EMPTY_MAP;
import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
import static org.apache.solr.common.params.CommonParams.ZK_PATH;
import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
import java.io.IOException; import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.nio.file.Path; import java.nio.file.Path;
@ -37,12 +26,15 @@ import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.Callable; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.http.auth.AuthSchemeProvider; import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.client.CredentialsProvider; import org.apache.http.client.CredentialsProvider;
import org.apache.http.config.Lookup; import org.apache.http.config.Lookup;
@ -56,6 +48,7 @@ import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController; import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils; import org.apache.solr.common.util.IOUtils;
@ -83,8 +76,16 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap; import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.Maps; import static java.util.Collections.EMPTY_MAP;
import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
import static org.apache.solr.common.params.CommonParams.ZK_PATH;
import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
/** /**
@ -844,6 +845,7 @@ public class CoreContainer {
return core; return core;
} catch (Exception e) { } catch (Exception e) {
SolrZkClient.checkInterrupted(e);
coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e)); coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e));
log.error("Error creating core [{}]: {}", dcore.getName(), e.getMessage(), e); log.error("Error creating core [{}]: {}", dcore.getName(), e.getMessage(), e);
final SolrException solrException = new SolrException(ErrorCode.SERVER_ERROR, "Unable to create core [" + dcore.getName() + "]", e); final SolrException solrException = new SolrException(ErrorCode.SERVER_ERROR, "Unable to create core [" + dcore.getName() + "]", e);
@ -893,6 +895,17 @@ public class CoreContainer {
} }
/**
* @return a Set containing the names of all collections with a core hosted in this container
*/
public Set<String> getLocalCollections() {
Set<String> collections = getCoreDescriptors().stream()
.filter(cd -> cd.getCollectionName() != null)
.map(CoreDescriptor::getCollectionName)
.collect(Collectors.toSet());
return collections;
}
/** /**
* Returns an immutable Map of Exceptions that occured when initializing * Returns an immutable Map of Exceptions that occured when initializing
* SolrCores (either at startup, or do to runtime requests to create cores) * SolrCores (either at startup, or do to runtime requests to create cores)

View File

@ -1104,7 +1104,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
deleteAsyncId(requestId).process(client); deleteAsyncId(requestId).process(client);
return state; return state;
} }
TimeUnit.SECONDS.sleep(1); TimeUnit.MILLISECONDS.sleep(100);
} }
return state; return state;
} }

View File

@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
@ -259,4 +260,26 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
} }
return replicas; return replicas;
} }
/**
* Get all the replicas on a particular node
*/
public List<Replica> getReplicasOnNode(String nodeName) {
return getReplicas().stream()
.filter(replica -> replica.getNodeName().equals(nodeName))
.collect(Collectors.toList());
}
/**
* Get the shardId of a core on a specific node
*/
public String getShardId(String nodeName, String coreName) {
for (Slice slice : this) {
for (Replica replica : slice) {
if (Objects.equals(replica.getNodeName(), nodeName) && Objects.equals(replica.getCoreName(), coreName))
return slice.getName();
}
}
return null;
}
} }

View File

@ -51,8 +51,9 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
@BeforeClass @BeforeClass
public static void startCluster() throws Exception { public static void startCluster() throws Exception {
configureCluster(CLUSTER_SIZE) configureCluster(CLUSTER_SIZE)
.addConfig("config", getFile("solrj/solr/collection1/conf").toPath()) .addConfig("config", getFile("solrj/solr/configsets/streaming/conf").toPath())
.configure(); .configure();
cluster.getSolrClient().connect();
} }
@AfterClass @AfterClass
@ -259,7 +260,7 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
final CloudSolrClient client = cluster.getSolrClient(); final CloudSolrClient client = cluster.getSolrClient();
Future<Boolean> future = waitInBackground("stateformat1", 10, TimeUnit.SECONDS, Future<Boolean> future = waitInBackground("stateformat1", 30, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1) CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1)