From a2c506acd3d841ac279668fbe52a54b847d0d5cd Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 10 Jun 2016 13:39:47 +0200 Subject: [PATCH] Fix sync flush total shards statistics (#18766) --- .../indices/flush/SyncedFlushService.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/core/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index f3cb76199dc..16e4d62a721 100644 --- a/core/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java +++ b/core/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -114,11 +114,9 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL final ClusterState state = clusterService.state(); final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, indicesOptions, aliasesOrIndices); final Map> results = ConcurrentCollections.newConcurrentMap(); - int totalNumberOfShards = 0; int numberOfShards = 0; for (Index index : concreteIndices) { final IndexMetaData indexMetaData = state.metaData().getIndexSafe(index); - totalNumberOfShards += indexMetaData.getTotalNumberOfShards(); numberOfShards += indexMetaData.getNumberOfShards(); results.put(index.getName(), Collections.synchronizedList(new ArrayList<>())); @@ -127,7 +125,6 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL listener.onResponse(new SyncedFlushResponse(results)); return; } - final int finalTotalNumberOfShards = totalNumberOfShards; final CountDown countDown = new CountDown(numberOfShards); for (final Index concreteIndex : concreteIndices) { @@ -136,7 +133,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL final int indexNumberOfShards = indexMetaData.getNumberOfShards(); for (int shard = 0; shard < indexNumberOfShards; shard++) { final ShardId shardId = new ShardId(indexMetaData.getIndex(), shard); - attemptSyncedFlush(shardId, new ActionListener() { + innerAttemptSyncedFlush(shardId, state, new ActionListener() { @Override public void onResponse(ShardsSyncedFlushResult syncedFlushResult) { results.get(index).add(syncedFlushResult); @@ -148,7 +145,8 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL @Override public void onFailure(Throwable e) { logger.debug("{} unexpected error while executing synced flush", shardId); - results.get(index).add(new ShardsSyncedFlushResult(shardId, finalTotalNumberOfShards, e.getMessage())); + final int totalShards = indexMetaData.getNumberOfReplicas() + 1; + results.get(index).add(new ShardsSyncedFlushResult(shardId, totalShards, e.getMessage())); if (countDown.countDown()) { listener.onResponse(new SyncedFlushResponse(results)); } @@ -185,8 +183,11 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL * Synced flush is a best effort operation. The sync id may be written on all, some or none of the copies. **/ public void attemptSyncedFlush(final ShardId shardId, final ActionListener actionListener) { + innerAttemptSyncedFlush(shardId, clusterService.state(), actionListener); + } + + private void innerAttemptSyncedFlush(final ShardId shardId, final ClusterState state, final ActionListener actionListener) { try { - final ClusterState state = clusterService.state(); final IndexShardRoutingTable shardRoutingTable = getShardRoutingTable(shardId, state); final List activeShards = shardRoutingTable.activeShards(); final int totalShards = shardRoutingTable.getSize();