From 08e87bd81e0c007247a6eaaaa61e6db85efcd5d9 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Fri, 22 May 2015 11:59:44 +0200 Subject: [PATCH] Async Fetch: Better logging classification + log when ignored --- .../elasticsearch/gateway/AsyncShardFetch.java | 8 +++++--- .../elasticsearch/gateway/GatewayAllocator.java | 16 ++++++++-------- .../gateway/AsyncShardFetchTests.java | 2 +- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java b/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java index daf3862b777..3b7c765d7d1 100644 --- a/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java +++ b/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java @@ -58,6 +58,7 @@ public abstract class AsyncShardFetch implement } protected final ESLogger logger; + protected final String type; private final ShardId shardId; private final List, T> action; private final Map> cache = new HashMap<>(); @@ -65,8 +66,9 @@ public abstract class AsyncShardFetch implement private boolean closed; @SuppressWarnings("unchecked") - protected AsyncShardFetch(ESLogger logger, ShardId shardId, List, T> action) { + protected AsyncShardFetch(ESLogger logger, String type, ShardId shardId, List, T> action) { this.logger = logger; + this.type = type; this.shardId = shardId; this.action = (List, T>) action; } @@ -148,7 +150,7 @@ public abstract class AsyncShardFetch implement // if at least one node failed, make sure to have a protective reroute // here, just case this round won't find anything, and we need to retry fetching data if (failedNodes.isEmpty() == false || allIgnoreNodes.isEmpty() == false) { - reroute(shardId, "at_least_one_node_failed"); + reroute(shardId, "nodes failed [" + failedNodes.size() + "], ignored [" + allIgnoreNodes.size() + "]"); } return new FetchResult<>(shardId, fetchData, failedNodes, allIgnoreNodes); } @@ -184,7 +186,7 @@ public abstract class AsyncShardFetch implement if (unwrappedCause instanceof EsRejectedExecutionException || unwrappedCause instanceof ReceiveTimeoutTransportException || unwrappedCause instanceof ElasticsearchTimeoutException) { nodeEntry.restartFetching(); } else { - logger.warn("{}: failed to list shard for {} on node [{}]", failure, shardId, getClass().getSimpleName(), failure.nodeId()); + logger.warn("{}: failed to list shard for {} on node [{}]", failure, shardId, type, failure.nodeId()); nodeEntry.doneFetching(failure.getCause()); } } diff --git a/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 05ee4c8cd67..a7647f73e50 100644 --- a/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -166,12 +166,12 @@ public class GatewayAllocator extends AbstractComponent { AsyncShardFetch fetch = asyncFetchStarted.get(shard.shardId()); if (fetch == null) { - fetch = new InternalAsyncFetch<>(logger, shard.shardId(), startedAction, clusterService, allocationService); + fetch = new InternalAsyncFetch<>(logger, "shard_started", shard.shardId(), startedAction, clusterService, allocationService); asyncFetchStarted.put(shard.shardId(), fetch); } AsyncShardFetch.FetchResult shardState = fetch.fetchData(nodes, metaData, allocation.getIgnoreNodes(shard.shardId())); if (shardState.hasData() == false) { - // still fetching data, remove from the unassigned, and try the next + logger.trace("{}: ignoring allocation, still fetching shard started state"); unassignedIterator.remove(); routingNodes.ignoredUnassigned().add(shard); continue; @@ -395,7 +395,7 @@ public class GatewayAllocator extends AbstractComponent { } if (!canBeAllocatedToAtLeastOneNode) { - // still fetching data, remove from the unassigned, and try the next + logger.trace("{}: ignoring allocation, can't be allocated on any node"); unassignedIterator.remove(); routingNodes.ignoredUnassigned().add(shard); continue; @@ -403,12 +403,12 @@ public class GatewayAllocator extends AbstractComponent { AsyncShardFetch fetch = asyncFetchStore.get(shard.shardId()); if (fetch == null) { - fetch = new InternalAsyncFetch<>(logger, shard.shardId(), storeAction, clusterService, allocationService); + fetch = new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), storeAction, clusterService, allocationService); asyncFetchStore.put(shard.shardId(), fetch); } AsyncShardFetch.FetchResult shardStores = fetch.fetchData(nodes, metaData, allocation.getIgnoreNodes(shard.shardId())); if (shardStores.hasData() == false) { - // still fetching data, remove from the unassigned, and try the next + logger.trace("{}: ignoring allocation, still fetching shard stores"); unassignedIterator.remove(); routingNodes.ignoredUnassigned().add(shard); continue; // still fetching @@ -518,16 +518,16 @@ public class GatewayAllocator extends AbstractComponent { private final ClusterService clusterService; private final AllocationService allocationService; - public InternalAsyncFetch(ESLogger logger, ShardId shardId, List, T> action, + public InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, List, T> action, ClusterService clusterService, AllocationService allocationService) { - super(logger, shardId, action); + super(logger, type, shardId, action); this.clusterService = clusterService; this.allocationService = allocationService; } @Override protected void reroute(ShardId shardId, String reason) { - clusterService.submitStateUpdateTask("async_shard_fetch(" + getClass().getSimpleName() + ") " + shardId + ", reasons (" + reason + ")", Priority.HIGH, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("async_shard_fetch(" + type + ") " + shardId + ", reasons (" + reason + ")", Priority.HIGH, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { if (currentState.nodes().masterNode() == null) { diff --git a/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java b/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java index 45d998918a0..b8e97efe31d 100644 --- a/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java +++ b/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java @@ -252,7 +252,7 @@ public class AsyncShardFetchTests extends ElasticsearchTestCase { private AtomicInteger reroute = new AtomicInteger(); public TestFetch(ThreadPool threadPool) { - super(Loggers.getLogger(TestFetch.class), new ShardId("test", 1), null); + super(Loggers.getLogger(TestFetch.class), "test", new ShardId("test", 1), null); this.threadPool = threadPool; }