SearchWhileCreatingIndexTests didn't always supply enough node for write consistency requirements and thus hanged.

Added more debug logging to TransportShardReplicationOperationAction

*Changed* node naming in tests from GUIDs to node# based naming as they are much easier to read
This commit is contained in:
Boaz Leskes 2013-08-30 20:53:54 +02:00
parent be00437c65
commit be09103258
3 changed files with 57 additions and 32 deletions

View File

@ -342,6 +342,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
ClusterBlockException blockException = checkGlobalBlock(clusterState, request);
if (blockException != null) {
if (blockException.retryable()) {
logger.debug("Cluster is blocked ({}), scheduling a retry", blockException.getMessage());
retry(fromClusterEvent, blockException);
return false;
} else {
@ -355,6 +356,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
blockException = checkRequestBlock(clusterState, request);
if (blockException != null) {
if (blockException.retryable()) {
logger.debug("Cluster is blocked ({}), scheduling a retry", blockException.getMessage());
retry(fromClusterEvent, blockException);
return false;
} else {
@ -369,6 +371,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// no shardIt, might be in the case between index gateway recovery and shardIt initialization
if (shardIt.size() == 0) {
logger.debug("No shard instances known for index [{}]. Scheduling a retry", shardIt.shardId());
retry(fromClusterEvent, null);
return false;
}
@ -382,6 +386,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
continue;
}
if (!shard.active() || !clusterState.nodes().nodeExists(shard.currentNodeId())) {
logger.debug("primary shard [{}] is not yet active or we do not know that node it is assigned to [{}]. Scheduling a retry.", shard.shardId(), shard.currentNodeId());
retry(fromClusterEvent, null);
return false;
}
@ -401,6 +406,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
if (shardIt.sizeActive() < requiredNumber) {
logger.debug("Not enough active copies of shard [{}] to meet write consistency of [{}] (have {}, needed {}). Scheduling a retry.",
shard.shardId(), consistencyLevel, shardIt.sizeActive(), requiredNumber);
retry(fromClusterEvent, null);
return false;
}
@ -458,6 +465,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
primaryOperationStarted.set(false);
// we already marked it as started when we executed it (removed the listener) so pass false
// to re-add to the cluster listener
logger.debug("received an error from node the primary was assigned to ({}). Scheduling a retry", exp.getMessage());
retry(false, null);
} else {
listener.onFailure(exp);
@ -469,6 +477,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
// we won't find a primary if there are no shards in the shard iterator, retry...
if (!foundPrimary) {
logger.debug("Couldn't find a eligible primary shard. Scheduling for retry.");
retry(fromClusterEvent, null);
return false;
}
@ -478,11 +487,13 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
void retry(boolean fromClusterEvent, @Nullable final Throwable failure) {
if (!fromClusterEvent) {
// make it threaded operation so we fork on the discovery listener thread
request.beforeLocalFork();
request.operationThreaded(true);
clusterService.add(request.timeout(), new TimeoutClusterStateListener() {
@Override
public void postAdded() {
logger.debug("ShardRepOp: listener to cluster state added. Trying again");
if (start(true)) {
// if we managed to start and perform the operation on the primary, we can remove this listener
clusterService.remove(this);
@ -497,6 +508,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
@Override
public void clusterChanged(ClusterChangedEvent event) {
logger.debug("ShardRepOp: cluster changed (version {}). Trying again", event.state().version());
if (start(true)) {
// if we managed to start and perform the operation on the primary, we can remove this listener
clusterService.remove(this);
@ -522,6 +534,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
listener.onFailure(listenerFailure);
}
});
} else {
logger.debug("ShardRepOp: retry scheduling ignored as it was executed from an active cluster state listener");
}
}
@ -533,6 +547,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// shard has not been allocated yet, retry it here
if (retryPrimaryException(e)) {
primaryOperationStarted.set(false);
logger.debug("Had an error while performing operation on primary ({}). Scheduling a retry.", e.getMessage());
retry(fromDiscoveryListener, null);
return;
}

View File

@ -47,14 +47,15 @@ import java.io.Closeable;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.collect.Maps.newHashMap;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
public class TestCluster {
/* some random options to consider
* "action.auto_create_index"
* "node.local"
@ -64,23 +65,24 @@ public class TestCluster {
private Map<String, NodeAndClient> nodes = newHashMap();
private final String clusterName;
private final AtomicBoolean open = new AtomicBoolean(true);
private final Settings defaultSettings;
private NodeAndClient clientNode;
private Random random;
private ClientFactory clientFactory;
private AtomicInteger nextNodeId = new AtomicInteger(0);
public TestCluster(Random random) {
this(random, "shared-test-cluster-" + NetworkUtils.getLocalAddress().getHostName() + "CHILD_VM=[" + ElasticsearchTestCase.CHILD_VM_ID + "]"+ "_" + System.currentTimeMillis(), ImmutableSettings.settingsBuilder().build());
this(random, "shared-test-cluster-" + NetworkUtils.getLocalAddress().getHostName() + "CHILD_VM=[" + ElasticsearchTestCase.CHILD_VM_ID + "]" + "_" + System.currentTimeMillis(), ImmutableSettings.settingsBuilder().build());
}
private TestCluster(Random random, String clusterName, Settings defaultSettings) {
@ -117,6 +119,7 @@ public class TestCluster {
public void ensureAtLeastNumNodes(int num) {
int size = nodes.size();
for (int i = size; i < num; i++) {
logger.info("increasing cluster size from {} to {}", size, num);
buildNode().start();
}
}
@ -133,8 +136,9 @@ public class TestCluster {
return;
}
Collection<NodeAndClient> values = nodes.values();
Iterator<NodeAndClient> limit = Iterators.limit(values.iterator(), nodes.size() - num);
while(limit.hasNext()) {
Iterator<NodeAndClient> limit = Iterators.limit(values.iterator(), nodes.size() - num);
logger.info("reducing cluster size from {} to {}", nodes.size() - num, num);
while (limit.hasNext()) {
NodeAndClient next = limit.next();
limit.remove();
next.close();
@ -163,7 +167,7 @@ public class TestCluster {
public Node buildNode(Settings settings) {
ensureOpen();
String name = UUID.randomUUID().toString();
String name = "node_" + nextNodeId.getAndIncrement();
String settingsSource = getClass().getName().replace('.', '/') + ".yml";
Settings finalSettings = settingsBuilder().loadFromClasspath(settingsSource).put(defaultSettings).put(settings).put("name", name)
.build();
@ -279,7 +283,7 @@ public class TestCluster {
public static class RandomClientFactory extends ClientFactory {
private final Random random;
public RandomClientFactory(Random random) {
this.random = random;
}
@ -287,12 +291,12 @@ public class TestCluster {
@Override
public Client client(Node node, String clusterName) {
switch (random.nextInt(10)) {
case 5:
return TransportClientFactory.NO_SNIFF_CLIENT_FACTORY.client(node, clusterName);
case 3:
return TransportClientFactory.SNIFF_CLIENT_FACTORY.client(node, clusterName);
default:
return node.client();
case 5:
return TransportClientFactory.NO_SNIFF_CLIENT_FACTORY.client(node, clusterName);
case 3:
return TransportClientFactory.SNIFF_CLIENT_FACTORY.client(node, clusterName);
default:
return node.client();
}
}
}
@ -353,7 +357,7 @@ public class TestCluster {
};
}
public Set<String> allButN(int numNodes) {
return nRandomNodes(numNodes() - numNodes);
}
@ -373,28 +377,28 @@ public class TestCluster {
Node node = nodeBuilder().settings(finalSettings).build();
node.start();
this.clientNode = new NodeAndClient(name, node, clientFactory);
}
return clientNode.client();
}
public Set<String> nodesInclude(String index) {
if (clusterService().state().routingTable().hasIndex(index)) {
List<ShardRouting> allShards = clusterService().state().routingTable().allShards(index);
DiscoveryNodes discoveryNodes = clusterService().state().getNodes();
Set<String> nodes = new HashSet<String>();
for (ShardRouting shardRouting : allShards) {
if (shardRouting.assignedToNode()) {
DiscoveryNode discoveryNode = discoveryNodes.get(shardRouting.currentNodeId());
nodes.add(discoveryNode.getName());
List<ShardRouting> allShards = clusterService().state().routingTable().allShards(index);
DiscoveryNodes discoveryNodes = clusterService().state().getNodes();
Set<String> nodes = new HashSet<String>();
for (ShardRouting shardRouting : allShards) {
if (shardRouting.assignedToNode()) {
DiscoveryNode discoveryNode = discoveryNodes.get(shardRouting.currentNodeId());
nodes.add(discoveryNode.getName());
}
}
return nodes;
}
return nodes;
}
return Collections.emptySet();
}
public Set<String> nodeExclude(String index) {
final Set<String> nodesInclude = nodesInclude(index);
return Sets.newHashSet(Iterators.transform(Iterators.filter(nodes.values().iterator(), new Predicate<NodeAndClient>() {
@ -402,7 +406,7 @@ public class TestCluster {
public boolean apply(NodeAndClient nodeAndClient) {
return !nodesInclude.contains(nodeAndClient.name);
}
}), new Function<NodeAndClient, String>() {
@Override
public String apply(NodeAndClient nodeAndClient) {

View File

@ -64,6 +64,12 @@ public class SearchWhileCreatingIndexTests extends AbstractSharedClusterTest {
}
private void searchWhileCreatingIndex(int numberOfShards, int numberOfReplicas) throws Exception {
// make sure we have enough nodes to guaranty default QUORUM consistency.
// TODO: add a smarter choice based on actual consistency (when that is randomized)
int shardsNo = numberOfReplicas + 1;
int neededNodes = shardsNo <= 2 ? 1 : shardsNo / 2 + 1;
cluster().ensureAtLeastNumNodes(randomIntBetween(neededNodes, shardsNo));
for (int i = 0; i < 20; i++) {
logger.info("running iteration {}", i);
if (numberOfShards > 0) {