This commit is contained in:
Boaz Leskes 2015-05-27 10:29:37 +03:00
parent b376a3fbfb
commit 6d269cbf4d
7 changed files with 29 additions and 45 deletions

View File

@ -50,7 +50,7 @@ POST /_flush
=== Synced Flush === Synced Flush
Elasticsearch tracks the indexing activity of each shards. Shards that have not Elasticsearch tracks the indexing activity of each shards. Shards that have not
received any indexing operations for, by default, 30m are automatically marked as inactive. This presents received any indexing operations for 30 minutes (configurable) are automatically marked as inactive. This presents
an opportunity for Elasticsearch to reduce shard resources and also perform an opportunity for Elasticsearch to reduce shard resources and also perform
a special kind of flush, called `synced flush`. A synced flush performs normal a special kind of flush, called `synced flush`. A synced flush performs normal
flushing and adds a special uniquely generated marker (`sync_id`) to all shards. flushing and adds a special uniquely generated marker (`sync_id`) to all shards.
@ -79,18 +79,18 @@ GET /twitter/_stats/commit?level=shards
[float] [float]
=== Synced Flush API === Synced Flush API
The Synced Flush API allows an administrator to initiate a synced flush manually. This can particularly useful for The Synced Flush API allows an administrator to initiate a synced flush manually. This can be particularly useful for
a planned (rolling) cluster restart where one can stop indexing and doesn't want to wait for the default 30m to pass a planned (rolling) cluster restart where one can stop indexing and doesn't want to wait for the default 30 minutes to pass
when the synced flush will be performed automatically. when the synced flush will be performed automatically.
While handy, there are a couple of caveats for this API: While handy, there are a couple of caveats for this API:
1. Synced flush is a best effort operation. Any ongoing indexing operations will cause 1. Synced flush is a best effort operation. Any ongoing indexing operations will cause
the synced flush to fail. This means that some shards may be synced flushed while others aren't. See below for more. the synced flush to fail. This means that some shards may be synced flushed while others aren't. See below for more.
2. The `sync_id` marker is removed as soon as the shard is flushed again. Uncommitted 2. The `sync_id` marker is removed as soon as the shard is flushed again. That is because a flush replaces the low level
operations in the transaction log do not remove the marker. That is because the marker is store as part lucene commit point where the marker is stored. Uncommitted operations in the transaction log do not remove the marker.
of a low level lucene commit, representing a point in time snapshot of the segments. In practice, one should consider In practice, one should consider any indexing operation on an index as removing the marker as a flush can be triggered by Elasticsearch
any indexing operation on an index as removing the marker. at any time.
[source,bash] [source,bash]
@ -99,7 +99,7 @@ POST /twitter/_flush/synced
-------------------------------------------------- --------------------------------------------------
// AUTOSENSE // AUTOSENSE
The response contains details about how many shards were successfully synced-flushed and information about any failure. The response contains details about how many shards were successfully sync-flushed and information about any failure.
Here is what it looks like when all shards of a two shards and one replica index successfully Here is what it looks like when all shards of a two shards and one replica index successfully
sync-flushed: sync-flushed:
@ -146,7 +146,8 @@ Here is what it looks like when one shard group failed due to pending operations
-------------------------------------------------- --------------------------------------------------
Sometimes the failures are specific to a shard copy, in which case they will be reported as follows: Sometimes the failures are specific to a shard copy. The copies that failed will not be eligible for
fast recovery but those that succeeded still will be. This case is reported as follows:
[source,js] [source,js]
-------------------------------------------------- --------------------------------------------------

View File

@ -92,10 +92,10 @@ curl -XPUT localhost:9200/_cluster/settings -d '{
}' }'
-------------------------------------------------- --------------------------------------------------
* There is no problem to continue indexing while doing the upgrade. However, you can speed the process considerably * There is no problem continuing to index while doing the upgrade. However, you can speed the process considerably
by stopping indexing temporarily to non-essential indices and issuing a manual <<indices-synced-flush, synced flush>>. by *temporarily* stopping non-essential indexing and issuing a manual <<indices-synced-flush, synced flush>>.
A synced flush is special kind of flush which can seriously speed up recovery of shards. Elasticsearch automatically A synced flush is special kind of flush which can seriously speed up recovery of shards. Elasticsearch automatically
uses it when an index has been inactive for a while (default is `30m`) but you can manually trigger it using the following command: uses it when an index has been inactive for a while (default is `30m`) but you can manuallky trigger it using the following command:
[source,sh] [source,sh]
-------------------------------------------------- --------------------------------------------------

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
@ -58,6 +59,10 @@ public class IndicesSyncedFlushResult implements ToXContent {
return shardCounts.successful; return shardCounts.successful;
} }
public RestStatus restStatus() {
return failedShards() == 0 ? RestStatus.OK : RestStatus.CONFLICT;
}
public Map<String, List<ShardsSyncedFlushResult>> getShardsResultPerIndex() { public Map<String, List<ShardsSyncedFlushResult>> getShardsResultPerIndex() {
return shardsResultPerIndex; return shardsResultPerIndex;
} }

View File

@ -126,38 +126,6 @@ public class ShardsSyncedFlushResult {
return shardResponses; return shardResponses;
} }
// @Override
// public void writeTo(StreamOutput out) throws IOException {
// super.writeTo(out);
// out.writeOptionalString(failureReason);
// out.writeOptionalString(syncId);
// out.writeVInt(totalShards);
// out.writeVInt(shardResponses.size());
// for (Map.Entry<ShardRouting, SyncedFlushService.SyncedFlushResponse> result : shardResponses.entrySet()) {
// result.getKey().writeTo(out);
// result.getValue().writeTo(out);
// }
// shardId.writeTo(out);
// }
// @Override
// public void readFrom(StreamInput in) throws IOException {
// super.readFrom(in);
// failureReason = in.readOptionalString();
// syncId = in.readOptionalString();
// totalShards = in.readVInt();
// int size = in.readVInt();
// ImmutableMap.Builder<ShardRouting, SyncedFlushService.SyncedFlushResponse> builder = ImmutableMap.builder();
// for (int i = 0; i < size; i++) {
// ImmutableShardRouting shardRouting = ImmutableShardRouting.readShardRoutingEntry(in);
// SyncedFlushService.SyncedFlushResponse syncedFlushRsponse = new SyncedFlushService.SyncedFlushResponse();
// syncedFlushRsponse.readFrom(in);
// builder.put(shardRouting, syncedFlushRsponse);
// }
// shardResponses = builder.build();
// shardId = ShardId.readShardId(in);
// }
public ShardId shardId() { public ShardId shardId() {
return shardId; return shardId;
} }

View File

@ -98,6 +98,10 @@ public class SyncedFlushService extends AbstractComponent {
}); });
} }
/**
* a utility method to perform a synced flush for all shards of multiple indices. see {@link #attemptSyncedFlush(ShardId, ActionListener)}
* for more details.
*/
public void attemptSyncedFlush(final String[] aliasesOrIndices, IndicesOptions indicesOptions, final ActionListener<IndicesSyncedFlushResult> listener) { public void attemptSyncedFlush(final String[] aliasesOrIndices, IndicesOptions indicesOptions, final ActionListener<IndicesSyncedFlushResult> listener) {
final ClusterState state = clusterService.state(); final ClusterState state = clusterService.state();
final String[] concreteIndices = state.metaData().concreteIndices(indicesOptions, aliasesOrIndices); final String[] concreteIndices = state.metaData().concreteIndices(indicesOptions, aliasesOrIndices);
@ -111,6 +115,10 @@ public class SyncedFlushService extends AbstractComponent {
results.put(index, Collections.synchronizedList(new ArrayList<ShardsSyncedFlushResult>())); results.put(index, Collections.synchronizedList(new ArrayList<ShardsSyncedFlushResult>()));
} }
if (numberOfShards == 0) {
listener.onResponse(new IndicesSyncedFlushResult(results));
return;
}
final int finalTotalNumberOfShards = totalNumberOfShards; final int finalTotalNumberOfShards = totalNumberOfShards;
final CountDown countDown = new CountDown(numberOfShards); final CountDown countDown = new CountDown(numberOfShards);

View File

@ -62,7 +62,7 @@ public class RestSyncedFlushAction extends BaseRestHandler {
builder.startObject(); builder.startObject();
results.toXContent(builder, request); results.toXContent(builder, request);
builder.endObject(); builder.endObject();
return new BytesRestResponse(RestStatus.OK, builder); return new BytesRestResponse(results.restStatus(), builder);
} }
}); });
} }

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.flush.IndicesSyncedFlushResult.ShardCounts; import org.elasticsearch.indices.flush.IndicesSyncedFlushResult.ShardCounts;
import org.elasticsearch.indices.flush.SyncedFlushService.SyncedFlushResponse; import org.elasticsearch.indices.flush.SyncedFlushService.SyncedFlushResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.ElasticsearchTestCase;
import java.io.IOException; import java.io.IOException;
@ -56,6 +57,7 @@ public class SyncedFlushUnitTests extends ElasticsearchTestCase {
assertThat(testPlan.result.totalShards(), equalTo(testPlan.totalCounts.total)); assertThat(testPlan.result.totalShards(), equalTo(testPlan.totalCounts.total));
assertThat(testPlan.result.successfulShards(), equalTo(testPlan.totalCounts.successful)); assertThat(testPlan.result.successfulShards(), equalTo(testPlan.totalCounts.successful));
assertThat(testPlan.result.failedShards(), equalTo(testPlan.totalCounts.failed)); assertThat(testPlan.result.failedShards(), equalTo(testPlan.totalCounts.failed));
assertThat(testPlan.result.restStatus(), equalTo(testPlan.totalCounts.failed > 0 ? RestStatus.CONFLICT : RestStatus.OK));
Map<String, Object> asMap = convertToMap(testPlan.result); Map<String, Object> asMap = convertToMap(testPlan.result);
assertShardCount("_shards header", (Map<String, Object>) asMap.get("_shards"), testPlan.totalCounts); assertShardCount("_shards header", (Map<String, Object>) asMap.get("_shards"), testPlan.totalCounts);