Revert "SOLR-9140: Replace some zk state polling with CollectionStateWatchers"

There's still some places where notifications can be missed, so I'm reverting
this until those are fixed.

This reverts commit d550b1ca43.
This commit is contained in:
Alan Woodward 2016-06-06 13:13:54 +01:00
parent b1fb142af0
commit b64c558e3e
6 changed files with 90 additions and 106 deletions

View File

@ -277,9 +277,6 @@ Optimizations
* SOLR-8744: Overseer operations performed with fine grained mutual exclusion (noble, Scott Blum) * SOLR-8744: Overseer operations performed with fine grained mutual exclusion (noble, Scott Blum)
* SOLR-9140: Replace zk polling in ZkController with CollectionStateWatchers
(Alan Woodward)
Other Changes Other Changes
---------------------- ----------------------
* SOLR-7516: Improve javadocs for JavaBinCodec, ObjectResolver and enforce the single-usage policy. * SOLR-7516: Improve javadocs for JavaBinCodec, ObjectResolver and enforce the single-usage policy.

View File

@ -27,7 +27,6 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.*; import java.util.*;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -683,23 +682,35 @@ public final class ZkController {
InterruptedException { InterruptedException {
publishNodeAsDown(getNodeName()); publishNodeAsDown(getNodeName());
// now wait till the updates are in our state
long now = System.nanoTime();
long timeout = now + TimeUnit.NANOSECONDS.convert(WAIT_DOWN_STATES_TIMEOUT_SECONDS, TimeUnit.SECONDS);
boolean foundStates = true;
Set<String> collections = cc.getLocalCollections(); while (System.nanoTime() < timeout) {
CountDownLatch latch = new CountDownLatch(collections.size()); ClusterState clusterState = zkStateReader.getClusterState();
Map<String, DocCollection> collections = clusterState.getCollectionsMap();
for (String collection : collections) { for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
zkStateReader.registerCollectionStateWatcher(collection, (nodes, state) -> { DocCollection collection = entry.getValue();
for (Replica replica : state.getReplicasOnNode(getNodeName())) { Collection<Slice> slices = collection.getSlices();
if (replica.getState() != Replica.State.DOWN) for (Slice slice : slices) {
return false; Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
if (getNodeName().equals(replica.getNodeName()) && replica.getState() != Replica.State.DOWN) {
foundStates = false;
}
}
} }
latch.countDown(); }
return true;
});
}
if (latch.await(WAIT_DOWN_STATES_TIMEOUT_SECONDS, TimeUnit.SECONDS) == false) { if (foundStates) {
// TODO should we abort here? Thread.sleep(1000);
break;
}
Thread.sleep(1000);
}
if (!foundStates) {
log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state."); log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state.");
} }
@ -1355,7 +1366,7 @@ public final class ZkController {
return zkStateReader; return zkStateReader;
} }
private void doGetShardIdAndNodeNameProcess(CoreDescriptor cd) throws InterruptedException { private void doGetShardIdAndNodeNameProcess(CoreDescriptor cd) {
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName(); final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
if (coreNodeName != null) { if (coreNodeName != null) {
@ -1367,45 +1378,58 @@ public final class ZkController {
} }
} }
private void waitForCoreNodeName(CoreDescriptor descriptor) throws InterruptedException { private void waitForCoreNodeName(CoreDescriptor descriptor) {
log.info("Waiting for coreNodeName for core {} in collection {} to be assigned", int retryCount = 320;
descriptor.getName(), descriptor.getCollectionName()); log.info("look for our core node name");
final String thisNode = getNodeName(); while (retryCount-- > 0) {
try { Map<String, Slice> slicesMap = zkStateReader.getClusterState()
zkStateReader.waitForState(descriptor.getCollectionName(), 320, TimeUnit.SECONDS, (n, c) -> { .getSlicesMap(descriptor.getCloudDescriptor().getCollectionName());
if (c == null) if (slicesMap != null) {
return false;
for (Replica replica : c.getReplicasOnNode(thisNode)) { for (Slice slice : slicesMap.values()) {
if (descriptor.getName().equals(replica.getCoreName())) { for (Replica replica : slice.getReplicas()) {
descriptor.getCloudDescriptor().setCoreNodeName(replica.getName()); // TODO: for really large clusters, we could 'index' on this
return true;
String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
String msgNodeName = getNodeName();
String msgCore = descriptor.getName();
if (msgNodeName.equals(nodeName) && core.equals(msgCore)) {
descriptor.getCloudDescriptor()
.setCoreNodeName(replica.getName());
return;
}
} }
} }
return false; }
}); try {
} catch (TimeoutException e) { Thread.sleep(1000);
throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out getting coreNodeName for " + descriptor.getName()); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} }
} }
private void waitForShardId(CoreDescriptor cd) throws InterruptedException { private void waitForShardId(CoreDescriptor cd) {
log.info("waiting to find shard id in clusterstate for " + cd.getName()); log.info("waiting to find shard id in clusterstate for " + cd.getName());
final String thisNode = getNodeName(); int retryCount = 320;
try { while (retryCount-- > 0) {
zkStateReader.waitForState(cd.getCollectionName(), 320, TimeUnit.SECONDS, (n, c) -> { final String shardId = zkStateReader.getClusterState().getShardId(cd.getCollectionName(), getNodeName(), cd.getName());
if (c == null) if (shardId != null) {
return false; cd.getCloudDescriptor().setShardId(shardId);
String shardId = c.getShardId(thisNode, cd.getName()); return;
if (shardId != null) { }
cd.getCloudDescriptor().setShardId(shardId); try {
return true; Thread.sleep(1000);
} } catch (InterruptedException e) {
return false; Thread.currentThread().interrupt();
}); }
}
catch (TimeoutException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out getting shard id for core: " + cd.getName());
} }
throw new SolrException(ErrorCode.SERVER_ERROR,
"Could not get shard id for core: " + cd.getName());
} }
@ -1419,7 +1443,7 @@ public final class ZkController {
return coreNodeName; return coreNodeName;
} }
public void preRegister(CoreDescriptor cd) throws InterruptedException { public void preRegister(CoreDescriptor cd) {
String coreNodeName = getCoreNodeName(cd); String coreNodeName = getCoreNodeName(cd);

View File

@ -16,6 +16,17 @@
*/ */
package org.apache.solr.core; package org.apache.solr.core;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Collections.EMPTY_MAP;
import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
import static org.apache.solr.common.params.CommonParams.ZK_PATH;
import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
import java.io.IOException; import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.nio.file.Path; import java.nio.file.Path;
@ -26,15 +37,12 @@ import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.http.auth.AuthSchemeProvider; import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.client.CredentialsProvider; import org.apache.http.client.CredentialsProvider;
import org.apache.http.config.Lookup; import org.apache.http.config.Lookup;
@ -48,7 +56,6 @@ import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController; import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils; import org.apache.solr.common.util.IOUtils;
@ -76,16 +83,8 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.ImmutableMap;
import static java.util.Collections.EMPTY_MAP; import com.google.common.collect.Maps;
import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
import static org.apache.solr.common.params.CommonParams.ZK_PATH;
import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
/** /**
@ -845,7 +844,6 @@ public class CoreContainer {
return core; return core;
} catch (Exception e) { } catch (Exception e) {
SolrZkClient.checkInterrupted(e);
coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e)); coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e));
log.error("Error creating core [{}]: {}", dcore.getName(), e.getMessage(), e); log.error("Error creating core [{}]: {}", dcore.getName(), e.getMessage(), e);
final SolrException solrException = new SolrException(ErrorCode.SERVER_ERROR, "Unable to create core [" + dcore.getName() + "]", e); final SolrException solrException = new SolrException(ErrorCode.SERVER_ERROR, "Unable to create core [" + dcore.getName() + "]", e);
@ -895,17 +893,6 @@ public class CoreContainer {
} }
/**
* @return a Set containing the names of all collections with a core hosted in this container
*/
public Set<String> getLocalCollections() {
Set<String> collections = getCoreDescriptors().stream()
.filter(cd -> cd.getCollectionName() != null)
.map(CoreDescriptor::getCollectionName)
.collect(Collectors.toSet());
return collections;
}
/** /**
* Returns an immutable Map of Exceptions that occured when initializing * Returns an immutable Map of Exceptions that occured when initializing
* SolrCores (either at startup, or do to runtime requests to create cores) * SolrCores (either at startup, or do to runtime requests to create cores)

View File

@ -1104,7 +1104,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
deleteAsyncId(requestId).process(client); deleteAsyncId(requestId).process(client);
return state; return state;
} }
TimeUnit.MILLISECONDS.sleep(100); TimeUnit.SECONDS.sleep(1);
} }
return state; return state;
} }

View File

@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
@ -260,26 +259,4 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
} }
return replicas; return replicas;
} }
/**
* Get all the replicas on a particular node
*/
public List<Replica> getReplicasOnNode(String nodeName) {
return getReplicas().stream()
.filter(replica -> replica.getNodeName().equals(nodeName))
.collect(Collectors.toList());
}
/**
* Get the shardId of a core on a specific node
*/
public String getShardId(String nodeName, String coreName) {
for (Slice slice : this) {
for (Replica replica : slice) {
if (Objects.equals(replica.getNodeName(), nodeName) && Objects.equals(replica.getCoreName(), coreName))
return slice.getName();
}
}
return null;
}
} }

View File

@ -51,9 +51,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
@BeforeClass @BeforeClass
public static void startCluster() throws Exception { public static void startCluster() throws Exception {
configureCluster(CLUSTER_SIZE) configureCluster(CLUSTER_SIZE)
.addConfig("config", getFile("solrj/solr/configsets/streaming/conf").toPath()) .addConfig("config", getFile("solrj/solr/collection1/conf").toPath())
.configure(); .configure();
cluster.getSolrClient().connect();
} }
@AfterClass @AfterClass
@ -260,7 +259,7 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
final CloudSolrClient client = cluster.getSolrClient(); final CloudSolrClient client = cluster.getSolrClient();
Future<Boolean> future = waitInBackground("stateformat1", 30, TimeUnit.SECONDS, Future<Boolean> future = waitInBackground("stateformat1", 10, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1) CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1)