From 6d269cbf4d808f50a4953a0c60e1de08cb911276 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 27 May 2015 10:29:37 +0300 Subject: [PATCH] feedback --- docs/reference/indices/flush.asciidoc | 19 +++++------ docs/reference/setup/upgrade.asciidoc | 6 ++-- .../flush/IndicesSyncedFlushResult.java | 5 +++ .../flush/ShardsSyncedFlushResult.java | 32 ------------------- .../indices/flush/SyncedFlushService.java | 8 +++++ .../indices/flush/RestSyncedFlushAction.java | 2 +- .../indices/flush/SyncedFlushUnitTests.java | 2 ++ 7 files changed, 29 insertions(+), 45 deletions(-) diff --git a/docs/reference/indices/flush.asciidoc b/docs/reference/indices/flush.asciidoc index 99aa589d866..a1ebfd1d866 100644 --- a/docs/reference/indices/flush.asciidoc +++ b/docs/reference/indices/flush.asciidoc @@ -50,7 +50,7 @@ POST /_flush === Synced Flush Elasticsearch tracks the indexing activity of each shards. Shards that have not -received any indexing operations for, by default, 30m are automatically marked as inactive. This presents +received any indexing operations for 30 minutes (configurable) are automatically marked as inactive. This presents an opportunity for Elasticsearch to reduce shard resources and also perform a special kind of flush, called `synced flush`. A synced flush performs normal flushing and adds a special uniquely generated marker (`sync_id`) to all shards. @@ -79,18 +79,18 @@ GET /twitter/_stats/commit?level=shards [float] === Synced Flush API -The Synced Flush API allows an administrator to initiate a synced flush manually. This can particularly useful for -a planned (rolling) cluster restart where one can stop indexing and doesn't want to wait for the default 30m to pass +The Synced Flush API allows an administrator to initiate a synced flush manually. This can be particularly useful for +a planned (rolling) cluster restart where one can stop indexing and doesn't want to wait for the default 30 minutes to pass when the synced flush will be performed automatically. While handy, there are a couple of caveats for this API: 1. Synced flush is a best effort operation. Any ongoing indexing operations will cause the synced flush to fail. This means that some shards may be synced flushed while others aren't. See below for more. -2. The `sync_id` marker is removed as soon as the shard is flushed again. Uncommitted -operations in the transaction log do not remove the marker. That is because the marker is store as part -of a low level lucene commit, representing a point in time snapshot of the segments. In practice, one should consider -any indexing operation on an index as removing the marker. +2. The `sync_id` marker is removed as soon as the shard is flushed again. That is because a flush replaces the low level +lucene commit point where the marker is stored. Uncommitted operations in the transaction log do not remove the marker. +In practice, one should consider any indexing operation on an index as removing the marker as a flush can be triggered by Elasticsearch +at any time. [source,bash] @@ -99,7 +99,7 @@ POST /twitter/_flush/synced -------------------------------------------------- // AUTOSENSE -The response contains details about how many shards were successfully synced-flushed and information about any failure. +The response contains details about how many shards were successfully sync-flushed and information about any failure. Here is what it looks like when all shards of a two shards and one replica index successfully sync-flushed: @@ -146,7 +146,8 @@ Here is what it looks like when one shard group failed due to pending operations -------------------------------------------------- -Sometimes the failures are specific to a shard copy, in which case they will be reported as follows: +Sometimes the failures are specific to a shard copy. The copies that failed will not be eligible for +fast recovery but those that succeeded still will be. This case is reported as follows: [source,js] -------------------------------------------------- diff --git a/docs/reference/setup/upgrade.asciidoc b/docs/reference/setup/upgrade.asciidoc index 61708755006..9f46fbaf059 100644 --- a/docs/reference/setup/upgrade.asciidoc +++ b/docs/reference/setup/upgrade.asciidoc @@ -92,10 +92,10 @@ curl -XPUT localhost:9200/_cluster/settings -d '{ }' -------------------------------------------------- -* There is no problem to continue indexing while doing the upgrade. However, you can speed the process considerably -by stopping indexing temporarily to non-essential indices and issuing a manual <>. +* There is no problem continuing to index while doing the upgrade. However, you can speed the process considerably +by *temporarily* stopping non-essential indexing and issuing a manual <>. A synced flush is special kind of flush which can seriously speed up recovery of shards. Elasticsearch automatically -uses it when an index has been inactive for a while (default is `30m`) but you can manually trigger it using the following command: +uses it when an index has been inactive for a while (default is `30m`) but you can manuallky trigger it using the following command: [source,sh] -------------------------------------------------- diff --git a/src/main/java/org/elasticsearch/indices/flush/IndicesSyncedFlushResult.java b/src/main/java/org/elasticsearch/indices/flush/IndicesSyncedFlushResult.java index 7c0a680b383..f625f04484a 100644 --- a/src/main/java/org/elasticsearch/indices/flush/IndicesSyncedFlushResult.java +++ b/src/main/java/org/elasticsearch/indices/flush/IndicesSyncedFlushResult.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.rest.RestStatus; import java.io.IOException; import java.util.List; @@ -58,6 +59,10 @@ public class IndicesSyncedFlushResult implements ToXContent { return shardCounts.successful; } + public RestStatus restStatus() { + return failedShards() == 0 ? RestStatus.OK : RestStatus.CONFLICT; + } + public Map> getShardsResultPerIndex() { return shardsResultPerIndex; } diff --git a/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java b/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java index cdf8a2495d8..1388373ff36 100644 --- a/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java +++ b/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java @@ -126,38 +126,6 @@ public class ShardsSyncedFlushResult { return shardResponses; } -// @Override -// public void writeTo(StreamOutput out) throws IOException { -// super.writeTo(out); -// out.writeOptionalString(failureReason); -// out.writeOptionalString(syncId); -// out.writeVInt(totalShards); -// out.writeVInt(shardResponses.size()); -// for (Map.Entry result : shardResponses.entrySet()) { -// result.getKey().writeTo(out); -// result.getValue().writeTo(out); -// } -// shardId.writeTo(out); -// } - -// @Override -// public void readFrom(StreamInput in) throws IOException { -// super.readFrom(in); -// failureReason = in.readOptionalString(); -// syncId = in.readOptionalString(); -// totalShards = in.readVInt(); -// int size = in.readVInt(); -// ImmutableMap.Builder builder = ImmutableMap.builder(); -// for (int i = 0; i < size; i++) { -// ImmutableShardRouting shardRouting = ImmutableShardRouting.readShardRoutingEntry(in); -// SyncedFlushService.SyncedFlushResponse syncedFlushRsponse = new SyncedFlushService.SyncedFlushResponse(); -// syncedFlushRsponse.readFrom(in); -// builder.put(shardRouting, syncedFlushRsponse); -// } -// shardResponses = builder.build(); -// shardId = ShardId.readShardId(in); -// } - public ShardId shardId() { return shardId; } diff --git a/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index b9447ed01f6..537392a9c98 100644 --- a/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java +++ b/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -98,6 +98,10 @@ public class SyncedFlushService extends AbstractComponent { }); } + /** + * a utility method to perform a synced flush for all shards of multiple indices. see {@link #attemptSyncedFlush(ShardId, ActionListener)} + * for more details. + */ public void attemptSyncedFlush(final String[] aliasesOrIndices, IndicesOptions indicesOptions, final ActionListener listener) { final ClusterState state = clusterService.state(); final String[] concreteIndices = state.metaData().concreteIndices(indicesOptions, aliasesOrIndices); @@ -111,6 +115,10 @@ public class SyncedFlushService extends AbstractComponent { results.put(index, Collections.synchronizedList(new ArrayList())); } + if (numberOfShards == 0) { + listener.onResponse(new IndicesSyncedFlushResult(results)); + return; + } final int finalTotalNumberOfShards = totalNumberOfShards; final CountDown countDown = new CountDown(numberOfShards); diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/flush/RestSyncedFlushAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/flush/RestSyncedFlushAction.java index 82a1d5f8fd3..9a3f844abb1 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/flush/RestSyncedFlushAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/flush/RestSyncedFlushAction.java @@ -62,7 +62,7 @@ public class RestSyncedFlushAction extends BaseRestHandler { builder.startObject(); results.toXContent(builder, request); builder.endObject(); - return new BytesRestResponse(RestStatus.OK, builder); + return new BytesRestResponse(results.restStatus(), builder); } }); } diff --git a/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUnitTests.java b/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUnitTests.java index fcf80c19d67..426ec36d608 100644 --- a/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUnitTests.java +++ b/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUnitTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.flush.IndicesSyncedFlushResult.ShardCounts; import org.elasticsearch.indices.flush.SyncedFlushService.SyncedFlushResponse; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ElasticsearchTestCase; import java.io.IOException; @@ -56,6 +57,7 @@ public class SyncedFlushUnitTests extends ElasticsearchTestCase { assertThat(testPlan.result.totalShards(), equalTo(testPlan.totalCounts.total)); assertThat(testPlan.result.successfulShards(), equalTo(testPlan.totalCounts.successful)); assertThat(testPlan.result.failedShards(), equalTo(testPlan.totalCounts.failed)); + assertThat(testPlan.result.restStatus(), equalTo(testPlan.totalCounts.failed > 0 ? RestStatus.CONFLICT : RestStatus.OK)); Map asMap = convertToMap(testPlan.result); assertShardCount("_shards header", (Map) asMap.get("_shards"), testPlan.totalCounts);