Fix sync flush total shards statistics (#18766)
This commit is contained in:
parent
6ea89004cd
commit
a2c506acd3
|
@ -114,11 +114,9 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
|
||||||
final ClusterState state = clusterService.state();
|
final ClusterState state = clusterService.state();
|
||||||
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, indicesOptions, aliasesOrIndices);
|
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, indicesOptions, aliasesOrIndices);
|
||||||
final Map<String, List<ShardsSyncedFlushResult>> results = ConcurrentCollections.newConcurrentMap();
|
final Map<String, List<ShardsSyncedFlushResult>> results = ConcurrentCollections.newConcurrentMap();
|
||||||
int totalNumberOfShards = 0;
|
|
||||||
int numberOfShards = 0;
|
int numberOfShards = 0;
|
||||||
for (Index index : concreteIndices) {
|
for (Index index : concreteIndices) {
|
||||||
final IndexMetaData indexMetaData = state.metaData().getIndexSafe(index);
|
final IndexMetaData indexMetaData = state.metaData().getIndexSafe(index);
|
||||||
totalNumberOfShards += indexMetaData.getTotalNumberOfShards();
|
|
||||||
numberOfShards += indexMetaData.getNumberOfShards();
|
numberOfShards += indexMetaData.getNumberOfShards();
|
||||||
results.put(index.getName(), Collections.synchronizedList(new ArrayList<>()));
|
results.put(index.getName(), Collections.synchronizedList(new ArrayList<>()));
|
||||||
|
|
||||||
|
@ -127,7 +125,6 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
|
||||||
listener.onResponse(new SyncedFlushResponse(results));
|
listener.onResponse(new SyncedFlushResponse(results));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
final int finalTotalNumberOfShards = totalNumberOfShards;
|
|
||||||
final CountDown countDown = new CountDown(numberOfShards);
|
final CountDown countDown = new CountDown(numberOfShards);
|
||||||
|
|
||||||
for (final Index concreteIndex : concreteIndices) {
|
for (final Index concreteIndex : concreteIndices) {
|
||||||
|
@ -136,7 +133,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
|
||||||
final int indexNumberOfShards = indexMetaData.getNumberOfShards();
|
final int indexNumberOfShards = indexMetaData.getNumberOfShards();
|
||||||
for (int shard = 0; shard < indexNumberOfShards; shard++) {
|
for (int shard = 0; shard < indexNumberOfShards; shard++) {
|
||||||
final ShardId shardId = new ShardId(indexMetaData.getIndex(), shard);
|
final ShardId shardId = new ShardId(indexMetaData.getIndex(), shard);
|
||||||
attemptSyncedFlush(shardId, new ActionListener<ShardsSyncedFlushResult>() {
|
innerAttemptSyncedFlush(shardId, state, new ActionListener<ShardsSyncedFlushResult>() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(ShardsSyncedFlushResult syncedFlushResult) {
|
public void onResponse(ShardsSyncedFlushResult syncedFlushResult) {
|
||||||
results.get(index).add(syncedFlushResult);
|
results.get(index).add(syncedFlushResult);
|
||||||
|
@ -148,7 +145,8 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable e) {
|
public void onFailure(Throwable e) {
|
||||||
logger.debug("{} unexpected error while executing synced flush", shardId);
|
logger.debug("{} unexpected error while executing synced flush", shardId);
|
||||||
results.get(index).add(new ShardsSyncedFlushResult(shardId, finalTotalNumberOfShards, e.getMessage()));
|
final int totalShards = indexMetaData.getNumberOfReplicas() + 1;
|
||||||
|
results.get(index).add(new ShardsSyncedFlushResult(shardId, totalShards, e.getMessage()));
|
||||||
if (countDown.countDown()) {
|
if (countDown.countDown()) {
|
||||||
listener.onResponse(new SyncedFlushResponse(results));
|
listener.onResponse(new SyncedFlushResponse(results));
|
||||||
}
|
}
|
||||||
|
@ -185,8 +183,11 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
|
||||||
* Synced flush is a best effort operation. The sync id may be written on all, some or none of the copies.
|
* Synced flush is a best effort operation. The sync id may be written on all, some or none of the copies.
|
||||||
**/
|
**/
|
||||||
public void attemptSyncedFlush(final ShardId shardId, final ActionListener<ShardsSyncedFlushResult> actionListener) {
|
public void attemptSyncedFlush(final ShardId shardId, final ActionListener<ShardsSyncedFlushResult> actionListener) {
|
||||||
|
innerAttemptSyncedFlush(shardId, clusterService.state(), actionListener);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void innerAttemptSyncedFlush(final ShardId shardId, final ClusterState state, final ActionListener<ShardsSyncedFlushResult> actionListener) {
|
||||||
try {
|
try {
|
||||||
final ClusterState state = clusterService.state();
|
|
||||||
final IndexShardRoutingTable shardRoutingTable = getShardRoutingTable(shardId, state);
|
final IndexShardRoutingTable shardRoutingTable = getShardRoutingTable(shardId, state);
|
||||||
final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
|
final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
|
||||||
final int totalShards = shardRoutingTable.getSize();
|
final int totalShards = shardRoutingTable.getSize();
|
||||||
|
|
Loading…
Reference in New Issue