Async Fetch: Better logging classification + log when ignored

This commit is contained in:
Shay Banon 2015-05-22 11:59:44 +02:00
parent afb7aabea7
commit 08e87bd81e
3 changed files with 14 additions and 12 deletions

View File

@ -58,6 +58,7 @@ public abstract class AsyncShardFetch<T extends NodeOperationResponse> implement
}
protected final ESLogger logger;
protected final String type;
private final ShardId shardId;
private final List<NodesOperationResponse<T>, T> action;
private final Map<String, NodeEntry<T>> cache = new HashMap<>();
@ -65,8 +66,9 @@ public abstract class AsyncShardFetch<T extends NodeOperationResponse> implement
private boolean closed;
@SuppressWarnings("unchecked")
protected AsyncShardFetch(ESLogger logger, ShardId shardId, List<? extends NodesOperationResponse<T>, T> action) {
protected AsyncShardFetch(ESLogger logger, String type, ShardId shardId, List<? extends NodesOperationResponse<T>, T> action) {
this.logger = logger;
this.type = type;
this.shardId = shardId;
this.action = (List<NodesOperationResponse<T>, T>) action;
}
@ -148,7 +150,7 @@ public abstract class AsyncShardFetch<T extends NodeOperationResponse> implement
// if at least one node failed, make sure to have a protective reroute
// here, just case this round won't find anything, and we need to retry fetching data
if (failedNodes.isEmpty() == false || allIgnoreNodes.isEmpty() == false) {
reroute(shardId, "at_least_one_node_failed");
reroute(shardId, "nodes failed [" + failedNodes.size() + "], ignored [" + allIgnoreNodes.size() + "]");
}
return new FetchResult<>(shardId, fetchData, failedNodes, allIgnoreNodes);
}
@ -184,7 +186,7 @@ public abstract class AsyncShardFetch<T extends NodeOperationResponse> implement
if (unwrappedCause instanceof EsRejectedExecutionException || unwrappedCause instanceof ReceiveTimeoutTransportException || unwrappedCause instanceof ElasticsearchTimeoutException) {
nodeEntry.restartFetching();
} else {
logger.warn("{}: failed to list shard for {} on node [{}]", failure, shardId, getClass().getSimpleName(), failure.nodeId());
logger.warn("{}: failed to list shard for {} on node [{}]", failure, shardId, type, failure.nodeId());
nodeEntry.doneFetching(failure.getCause());
}
}

View File

@ -166,12 +166,12 @@ public class GatewayAllocator extends AbstractComponent {
AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetch = asyncFetchStarted.get(shard.shardId());
if (fetch == null) {
fetch = new InternalAsyncFetch<>(logger, shard.shardId(), startedAction, clusterService, allocationService);
fetch = new InternalAsyncFetch<>(logger, "shard_started", shard.shardId(), startedAction, clusterService, allocationService);
asyncFetchStarted.put(shard.shardId(), fetch);
}
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetch.fetchData(nodes, metaData, allocation.getIgnoreNodes(shard.shardId()));
if (shardState.hasData() == false) {
// still fetching data, remove from the unassigned, and try the next
logger.trace("{}: ignoring allocation, still fetching shard started state");
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
continue;
@ -395,7 +395,7 @@ public class GatewayAllocator extends AbstractComponent {
}
if (!canBeAllocatedToAtLeastOneNode) {
// still fetching data, remove from the unassigned, and try the next
logger.trace("{}: ignoring allocation, can't be allocated on any node");
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
continue;
@ -403,12 +403,12 @@ public class GatewayAllocator extends AbstractComponent {
AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetch = asyncFetchStore.get(shard.shardId());
if (fetch == null) {
fetch = new InternalAsyncFetch<>(logger, shard.shardId(), storeAction, clusterService, allocationService);
fetch = new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), storeAction, clusterService, allocationService);
asyncFetchStore.put(shard.shardId(), fetch);
}
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores = fetch.fetchData(nodes, metaData, allocation.getIgnoreNodes(shard.shardId()));
if (shardStores.hasData() == false) {
// still fetching data, remove from the unassigned, and try the next
logger.trace("{}: ignoring allocation, still fetching shard stores");
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
continue; // still fetching
@ -518,16 +518,16 @@ public class GatewayAllocator extends AbstractComponent {
private final ClusterService clusterService;
private final AllocationService allocationService;
public InternalAsyncFetch(ESLogger logger, ShardId shardId, List<? extends NodesOperationResponse<T>, T> action,
public InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, List<? extends NodesOperationResponse<T>, T> action,
ClusterService clusterService, AllocationService allocationService) {
super(logger, shardId, action);
super(logger, type, shardId, action);
this.clusterService = clusterService;
this.allocationService = allocationService;
}
@Override
protected void reroute(ShardId shardId, String reason) {
clusterService.submitStateUpdateTask("async_shard_fetch(" + getClass().getSimpleName() + ") " + shardId + ", reasons (" + reason + ")", Priority.HIGH, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("async_shard_fetch(" + type + ") " + shardId + ", reasons (" + reason + ")", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
if (currentState.nodes().masterNode() == null) {

View File

@ -252,7 +252,7 @@ public class AsyncShardFetchTests extends ElasticsearchTestCase {
private AtomicInteger reroute = new AtomicInteger();
public TestFetch(ThreadPool threadPool) {
super(Loggers.getLogger(TestFetch.class), new ShardId("test", 1), null);
super(Loggers.getLogger(TestFetch.class), "test", new ShardId("test", 1), null);
this.threadPool = threadPool;
}