diff --git a/src/main/java/org/elasticsearch/indices/SyncedFlushService.java b/src/main/java/org/elasticsearch/indices/SyncedFlushService.java index 79b33b9e0de..2974b55ae72 100644 --- a/src/main/java/org/elasticsearch/indices/SyncedFlushService.java +++ b/src/main/java/org/elasticsearch/indices/SyncedFlushService.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.indices; +import com.google.common.collect.ImmutableMap; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; @@ -57,7 +58,6 @@ import java.util.concurrent.atomic.AtomicInteger; public class SyncedFlushService extends AbstractComponent { - // nocommmit: check these are ok public static final String PRE_SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/pre"; public static final String SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/sync"; public static final String IN_FLIGHT_OPS_ACTION_NAME = "internal:indices/flush/synced/in_flight"; @@ -91,7 +91,7 @@ public class SyncedFlushService extends AbstractComponent { public void onShardInactive(final IndexShard indexShard) { // we only want to call sync flush once, so only trigger it when we are on a primary if (indexShard.routingEntry().primary()) { - attemptSyncedFlush(indexShard.shardId(), new ActionListener() { + attemptSyncedFlush(indexShard.shardId(), new ActionListener() { @Override public void onResponse(SyncedFlushResult syncedFlushResult) { logger.debug("{} sync flush on inactive shard returned successfully for sync_id: {}", syncedFlushResult.getShardId(), syncedFlushResult.syncId()); @@ -368,6 +368,9 @@ public class SyncedFlushService extends AbstractComponent { return new InFlightOpsResponse(opCount); } + /** + * Result for all copies of a shard + */ public static class SyncedFlushResult extends TransportResponse { private String failureReason; private Map shardResponses; @@ -375,7 +378,6 @@ public class SyncedFlushService extends AbstractComponent { private ShardId shardId; public SyncedFlushResult() { - } public ShardId getShardId() { @@ -385,29 +387,34 @@ public class SyncedFlushService extends AbstractComponent { /** * failure constructor */ - public SyncedFlushResult(ShardId shardId, String failureReason) { this.syncId = null; this.failureReason = failureReason; - this.shardResponses = new HashMap<>(); + this.shardResponses = ImmutableMap.of(); this.shardId = shardId; } /** * success constructor */ - public SyncedFlushResult(ShardId shardId, String syncId, Map shardResponses) { this.failureReason = null; - this.shardResponses = shardResponses; + ImmutableMap.Builder builder = ImmutableMap.builder(); + this.shardResponses = builder.putAll(shardResponses).build(); this.syncId = syncId; this.shardId = shardId; } + /** + * @return true if one or more shard copies was successful, false if all failed before step three of synced flush + */ public boolean success() { return syncId != null; } + /** + * @return the reason for the failure if synced flush failed before step three of synced flush + */ public String failureReason() { return failureReason; } @@ -417,12 +424,15 @@ public class SyncedFlushService extends AbstractComponent { } /** - * total number of shards for which a sync attempt was made + * @return total number of shards for which a sync attempt was made */ public int totalShards() { return shardResponses.size(); } + /** + * @return total number of successful shards + */ public int successfulShards() { int i = 0; for (SyncedFlushResponse result : shardResponses.values()) { @@ -433,6 +443,10 @@ public class SyncedFlushService extends AbstractComponent { return i; } + /** + * @return Individual responses for each shard copy with a detailed failure message if the copy failed to perform the synced flush. + * Empty if synced flush failed before step three. + */ public Map shardResponses() { return shardResponses; } @@ -456,13 +470,14 @@ public class SyncedFlushService extends AbstractComponent { failureReason = in.readOptionalString(); syncId = in.readOptionalString(); int size = in.readVInt(); - shardResponses = new HashMap<>(); + ImmutableMap.Builder builder = ImmutableMap.builder(); for (int i = 0; i < size; i++) { ImmutableShardRouting shardRouting = ImmutableShardRouting.readShardRoutingEntry(in); SyncedFlushResponse syncedFlushRsponse = new SyncedFlushResponse(); syncedFlushRsponse.readFrom(in); - shardResponses.put(shardRouting, syncedFlushRsponse); + builder.put(shardRouting, syncedFlushRsponse); } + shardResponses = builder.build(); shardId = ShardId.readShardId(in); } @@ -506,6 +521,9 @@ public class SyncedFlushService extends AbstractComponent { } } + /** + * Response for first step of synced flush (flush) for one shard copy + */ final static class PreSyncedFlushResponse extends TransportResponse { private byte[] commitId; @@ -586,6 +604,9 @@ public class SyncedFlushService extends AbstractComponent { } } + /** + * Response for third step of synced flush (writing the sync id) for one shard copy + */ public static final class SyncedFlushResponse extends TransportResponse { /** @@ -666,6 +687,9 @@ public class SyncedFlushService extends AbstractComponent { } } + /** + * Response for second step of synced flush (check operations in flight) + */ static final class InFlightOpsResponse extends TransportResponse { int opCount;