some more docs and cleanup

This commit is contained in:
Britta Weber 2015-05-15 10:35:15 +02:00
parent 5eafc9198f
commit 36e6718bf5
1 changed files with 34 additions and 10 deletions

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.indices;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
@ -57,7 +58,6 @@ import java.util.concurrent.atomic.AtomicInteger;
public class SyncedFlushService extends AbstractComponent {
// nocommmit: check these are ok
public static final String PRE_SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/pre";
public static final String SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/sync";
public static final String IN_FLIGHT_OPS_ACTION_NAME = "internal:indices/flush/synced/in_flight";
@ -91,7 +91,7 @@ public class SyncedFlushService extends AbstractComponent {
public void onShardInactive(final IndexShard indexShard) {
// we only want to call sync flush once, so only trigger it when we are on a primary
if (indexShard.routingEntry().primary()) {
attemptSyncedFlush(indexShard.shardId(), new ActionListener<SyncedFlushResult>() {
attemptSyncedFlush(indexShard.shardId(), new ActionListener<SyncedFlushResult>() {
@Override
public void onResponse(SyncedFlushResult syncedFlushResult) {
logger.debug("{} sync flush on inactive shard returned successfully for sync_id: {}", syncedFlushResult.getShardId(), syncedFlushResult.syncId());
@ -368,6 +368,9 @@ public class SyncedFlushService extends AbstractComponent {
return new InFlightOpsResponse(opCount);
}
/**
* Result for all copies of a shard
*/
public static class SyncedFlushResult extends TransportResponse {
private String failureReason;
private Map<ShardRouting, SyncedFlushResponse> shardResponses;
@ -375,7 +378,6 @@ public class SyncedFlushService extends AbstractComponent {
private ShardId shardId;
public SyncedFlushResult() {
}
public ShardId getShardId() {
@ -385,29 +387,34 @@ public class SyncedFlushService extends AbstractComponent {
/**
* failure constructor
*/
public SyncedFlushResult(ShardId shardId, String failureReason) {
this.syncId = null;
this.failureReason = failureReason;
this.shardResponses = new HashMap<>();
this.shardResponses = ImmutableMap.of();
this.shardId = shardId;
}
/**
* success constructor
*/
public SyncedFlushResult(ShardId shardId, String syncId, Map<ShardRouting, SyncedFlushResponse> shardResponses) {
this.failureReason = null;
this.shardResponses = shardResponses;
ImmutableMap.Builder<ShardRouting, SyncedFlushResponse> builder = ImmutableMap.builder();
this.shardResponses = builder.putAll(shardResponses).build();
this.syncId = syncId;
this.shardId = shardId;
}
/**
* @return true if one or more shard copies was successful, false if all failed before step three of synced flush
*/
public boolean success() {
return syncId != null;
}
/**
* @return the reason for the failure if synced flush failed before step three of synced flush
*/
public String failureReason() {
return failureReason;
}
@ -417,12 +424,15 @@ public class SyncedFlushService extends AbstractComponent {
}
/**
* total number of shards for which a sync attempt was made
* @return total number of shards for which a sync attempt was made
*/
public int totalShards() {
return shardResponses.size();
}
/**
* @return total number of successful shards
*/
public int successfulShards() {
int i = 0;
for (SyncedFlushResponse result : shardResponses.values()) {
@ -433,6 +443,10 @@ public class SyncedFlushService extends AbstractComponent {
return i;
}
/**
* @return Individual responses for each shard copy with a detailed failure message if the copy failed to perform the synced flush.
* Empty if synced flush failed before step three.
*/
public Map<ShardRouting, SyncedFlushResponse> shardResponses() {
return shardResponses;
}
@ -456,13 +470,14 @@ public class SyncedFlushService extends AbstractComponent {
failureReason = in.readOptionalString();
syncId = in.readOptionalString();
int size = in.readVInt();
shardResponses = new HashMap<>();
ImmutableMap.Builder<ShardRouting, SyncedFlushResponse> builder = ImmutableMap.builder();
for (int i = 0; i < size; i++) {
ImmutableShardRouting shardRouting = ImmutableShardRouting.readShardRoutingEntry(in);
SyncedFlushResponse syncedFlushRsponse = new SyncedFlushResponse();
syncedFlushRsponse.readFrom(in);
shardResponses.put(shardRouting, syncedFlushRsponse);
builder.put(shardRouting, syncedFlushRsponse);
}
shardResponses = builder.build();
shardId = ShardId.readShardId(in);
}
@ -506,6 +521,9 @@ public class SyncedFlushService extends AbstractComponent {
}
}
/**
* Response for first step of synced flush (flush) for one shard copy
*/
final static class PreSyncedFlushResponse extends TransportResponse {
private byte[] commitId;
@ -586,6 +604,9 @@ public class SyncedFlushService extends AbstractComponent {
}
}
/**
* Response for third step of synced flush (writing the sync id) for one shard copy
*/
public static final class SyncedFlushResponse extends TransportResponse {
/**
@ -666,6 +687,9 @@ public class SyncedFlushService extends AbstractComponent {
}
}
/**
* Response for second step of synced flush (check operations in flight)
*/
static final class InFlightOpsResponse extends TransportResponse {
int opCount;