Merge remote-tracking branch 'boaz/index_seal_to_flush_sync'

This commit is contained in:
Britta Weber 2015-05-29 10:28:47 +02:00
commit 87a0c76e9c
34 changed files with 976 additions and 1082 deletions

View File

@ -59,7 +59,6 @@ and warmers.
* <<indices-refresh>>
* <<indices-flush>>
* <<indices-optimize>>
* <<indices-seal>>
* <<indices-upgrade>>
--

View File

@ -10,8 +10,9 @@ trigger flush operations as required in order to clear memory.
[source,js]
--------------------------------------------------
$ curl -XPOST 'http://localhost:9200/twitter/_flush'
POST /twitter/_flush
--------------------------------------------------
// AUTOSENSE
[float]
[[flush-parameters]]
@ -39,7 +40,198 @@ or even on `_all` the indices.
[source,js]
--------------------------------------------------
$ curl -XPOST 'http://localhost:9200/kimchy,elasticsearch/_flush'
POST /kimchy,elasticsearch/_flush
$ curl -XPOST 'http://localhost:9200/_flush'
POST /_flush
--------------------------------------------------
// AUTOSENSE
[[indices-synced-flush]]
=== Synced Flush
Elasticsearch tracks the indexing activity of each shard. Shards that have not
received any indexing operations for 30 minutes are automatically marked as inactive. This presents
an opportunity for Elasticsearch to reduce shard resources and also perform
a special kind of flush, called `synced flush`. A synced flush performs a normal flush, then adds
a generated unique marker (sync_id) to all shards.
Since the sync id marker was added when there were no ongoing indexing operations, it can
be used as a quick way to check if the two shards' lucene indices are identical. This quick sync id
comparison (if present) is used during recovery or restarts to skip the first and
most costly phase of the process. In that case, no segment files need to be copied and
the transaction log replay phase of the recovery can start immediately. Note that since the sync id
marker was applied together with a flush, it is very likely that the transaction log will be empty,
speeding up recoveries even more.
This is particularly useful for use cases having lots of indices which are
never or very rarely updated, such as time based data. This use case typically generates lots of indices whose
recovery without the synced flush marker would take a long time.
To check whether a shard has a marker or not, look for the `commit` section of shard stats returned by
the <<indices-stats,indices stats>> API:
[source,bash]
--------------------------------------------------
GET /twitter/_stats/commit?level=shards
--------------------------------------------------
// AUTOSENSE
which returns something similar to:
[source,js]
--------------------------------------------------
{
...
"indices": {
"twitter": {
"primaries": {},
"total": {},
"shards": {
"0": [
{
"routing": {
...
},
"commit": {
"id": "te7zF7C4UsirqvL6jp/vUg==",
"generation": 2,
"user_data": {
"sync_id": "AU2VU0meX-VX2aNbEUsD" <1>,
...
},
"num_docs": 0
}
}
...
],
...
}
}
}
}
--------------------------------------------------
<1> the `sync id` marker
[float]
=== Synced Flush API
The Synced Flush API allows an administrator to initiate a synced flush manually. This can be particularly useful for
a planned (rolling) cluster restart where you can stop indexing and don't want to wait the default 30 minutes for
idle indices to be sync-flushed automatically.
While handy, there are a couple of caveats for this API:
1. Synced flush is a best effort operation. Any ongoing indexing operations will cause
the synced flush to fail on that shard. This means that some shards may be synced flushed while others aren't. See below for more.
2. The `sync_id` marker is removed as soon as the shard is flushed again. That is because a flush replaces the low level
lucene commit point where the marker is stored. Uncommitted operations in the transaction log do not remove the marker.
In practice, one should consider any indexing operation on an index as removing the marker as a flush can be triggered by Elasticsearch
at any time.
NOTE: It is harmless to request a synced flush while there is ongoing indexing. Shards that are idle will succeed and shards
that are not will fail. Any shards that succeeded will have faster recovery times.
[source,bash]
--------------------------------------------------
POST /twitter/_flush/synced
--------------------------------------------------
// AUTOSENSE
The response contains details about how many shards were successfully sync-flushed and information about any failure.
Here is what it looks like when all shards of a two shards and one replica index successfully
sync-flushed:
[source,js]
--------------------------------------------------
{
"_shards": {
"total": 4,
"successful": 4,
"failed": 0
},
"twitter": {
"total": 4,
"successful": 4,
"failed": 0
}
}
--------------------------------------------------
Here is what it looks like when one shard group failed due to pending operations:
[source,js]
--------------------------------------------------
{
"_shards": {
"total": 4,
"successful": 2,
"failed": 2
},
"twitter": {
"total": 4,
"successful": 2,
"failed": 2,
"failures": [
{
"shard": 1,
"reason": "[2] ongoing operations on primary"
}
]
}
}
--------------------------------------------------
NOTE: The above error is shown when the synced flush failes due to concurrent indexing operations. The HTTP
status code in that case will be `409 CONFLICT`.
Sometimes the failures are specific to a shard copy. The copies that failed will not be eligible for
fast recovery but those that succeeded still will be. This case is reported as follows:
[source,js]
--------------------------------------------------
{
"_shards": {
"total": 4,
"successful": 1,
"failed": 1
},
"twitter": {
"total": 4,
"successful": 3,
"failed": 1,
"failures": [
{
"shard": 1,
"reason": "unexpected error",
"routing": {
"state": "STARTED",
"primary": false,
"node": "SZNr2J_ORxKTLUCydGX4zA",
"relocating_node": null,
"shard": 1,
"index": "twitter"
}
}
]
}
}
--------------------------------------------------
NOTE: When a shard copy fails to sync-flush, the HTTP status code returned will be `409 CONFLICT`.
The synced flush API can be applied to more than one index with a single call,
or even on `_all` the indices.
[source,js]
--------------------------------------------------
POST /kimchy,elasticsearch/_flush/synced
POST /_flush/synced
--------------------------------------------------
// AUTOSENSE

View File

@ -1,91 +0,0 @@
[[indices-seal]]
== Seal
The seal API flushes and adds a "seal" marker to the shards of one or more
indices. The seal is used during recovery or restarts to skip the first and
most costly phase of the process if all copies of the shard have the same seal.
No segment files need to be copied and the transaction log replay phase of the
recovery can start immediately which makes recovery much faster.
There are two important points about seals:
1. They are best effort in that if there are any outstanding write operations
while the seal operation is being performed then the shards which those writes
target won't be sealed but all others will be. See below for more.
2. The seal breaks as soon as the shard issues a new lucene commit. Uncommitted
operations in the transaction log do not break the seal. That is because a seal
marks a point in time snapshot of the segments, a low level lucene commit.
Practically that means that every write operation on the index will remove the
seal.
[source,bash]
--------------------------------------------------
$ curl -XPOST 'http://localhost:9200/twitter/_seal'
--------------------------------------------------
The response contains details about which shards wrote the seal and the reason
in case they failed to write the seal.
Here is what it looks like when all copies single shard index successfully
wrote the seal:
[source,js]
--------------------------------------------------
{
"twitter": [
{
"shard_id": 0,
"responses": {
"5wjOIntuRqy9F_7JRrrLwA": "success",
"M2iCBe-nS5yaInE8volfSg": "success"
},
"message": "success"
}
}
--------------------------------------------------
Here is what it looks like when one copy fails:
[source,js]
--------------------------------------------------
{
"twitter": [
{
"shard_id": 0,
"responses": {
"M2iCBe-nS5yaInE8volfSg": "pending operations",
"5wjOIntuRqy9F_7JRrrLwA": "success"
},
"message": "failed on some copies"
}
}
--------------------------------------------------
Sometimes the failures can be shard wide and they'll look like this:
[source,js]
--------------------------------------------------
{
"twitter": [
{
"shard_id": 0,
"message": "operation counter on primary is non zero [2]"
}
}
--------------------------------------------------
[float]
[[seal-multi-index]]
=== Multi Index
The seal API can be applied to more than one index with a single call,
or even on `_all` the indices.
[source,js]
--------------------------------------------------
curl -XPOST 'http://localhost:9200/kimchy,elasticsearch/_seal'
curl -XPOST 'http://localhost:9200/_seal'
--------------------------------------------------

View File

@ -85,13 +85,27 @@ This syntax applies to Elasticsearch 1.0 and later:
[source,sh]
--------------------------------------------------
curl -XPUT localhost:9200/_cluster/settings -d '{
"transient" : {
"cluster.routing.allocation.enable" : "none"
}
}'
curl -XPUT localhost:9200/_cluster/settings -d '{
"transient" : {
"cluster.routing.allocation.enable" : "none"
}
}'
--------------------------------------------------
* There is no problem continuing to index while doing the upgrade. However, you can speed the process considerably
by *temporarily* stopping non-essential indexing and issuing a manual <<indices-synced-flush, synced flush>>.
A synced flush is special kind of flush which can seriously speed up recovery of shards. Elasticsearch automatically
uses it when an index has been inactive for a while (default is `30m`) but you can manuallky trigger it using the following command:
[source,sh]
--------------------------------------------------
curl -XPOST localhost:9200/_all/_flush/synced
--------------------------------------------------
Note that a synced flush call is a best effort operation. It will fail there are any pending indexing operations. It is safe to issue
it multiple times if needed.
* Shut down a single node within the cluster.
* Confirm that all shards are correctly reallocated to the remaining running nodes.
@ -110,11 +124,11 @@ This syntax applies to Elasticsearch 1.0 and later:
[source,sh]
--------------------------------------------------
curl -XPUT localhost:9200/_cluster/settings -d '{
"transient" : {
"cluster.routing.allocation.enable" : "all"
}
}'
curl -XPUT localhost:9200/_cluster/settings -d '{
"transient" : {
"cluster.routing.allocation.enable" : "all"
}
}'
--------------------------------------------------
* Observe that all shards are properly allocated on all nodes. Balancing may take some time.
@ -150,11 +164,11 @@ This syntax is from versions prior to 1.0:
[source,sh]
--------------------------------------------------
curl -XPUT localhost:9200/_cluster/settings -d '{
"persistent" : {
"cluster.routing.allocation.disable_allocation" : true
}
}'
curl -XPUT localhost:9200/_cluster/settings -d '{
"persistent" : {
"cluster.routing.allocation.disable_allocation" : true
}
}'
--------------------------------------------------
* Stop all Elasticsearch services on all nodes in the cluster.
@ -169,12 +183,12 @@ This syntax is from versions prior to 1.0:
This syntax is from release 1.0 and later:
[source,sh]
------------------------------------------------------
curl -XPUT localhost:9200/_cluster/settings -d '{
"persistent" : {
"cluster.routing.allocation.disable_allocation": false,
"cluster.routing.allocation.enable" : "all"
}
}'
curl -XPUT localhost:9200/_cluster/settings -d '{
"persistent" : {
"cluster.routing.allocation.disable_allocation": false,
"cluster.routing.allocation.enable" : "all"
}
}'
------------------------------------------------------
The cluster upgrade can be streamlined by installing the software before stopping cluster services. If this is done, testing must be performed to ensure that no production data or configuration files are overwritten prior to restart.

View File

@ -0,0 +1,39 @@
{
"indices.flush.synced": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/indices-flush.html",
"methods": ["POST", "GET"],
"url": {
"path": "/_flush/synced",
"paths": [
"/_flush/synced",
"/{index}/_flush/synced"
],
"parts": {
"index": {
"type" : "list",
"description" : "A comma-separated list of index names; use `_all` or empty string for all indices"
},
"ignore_unavailable": {
"type": "boolean",
"description": "Whether specified concrete indices should be ignored when unavailable (missing or closed)"
},
"allow_no_indices": {
"type": "boolean",
"description": "Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified)"
},
"expand_wildcards": {
"type": "enum",
"options": [
"open",
"closed",
"none",
"all"
],
"default": "open",
"description": "Whether to expand wildcard expression to concrete indices that are open, closed or both."
}
}
},
"body": null
}
}

View File

@ -1,17 +0,0 @@
{
"indices.seal": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/indices-seal.html",
"methods": ["POST", "GET"],
"url": {
"path": "/_seal",
"paths": ["/_seal", "/{index}/_seal"],
"parts": {
"index": {
"type" : "list",
"description" : "A comma-separated list of index names; use `_all` or empty string for all indices"
}
}
},
"body": null
}
}

View File

@ -1,5 +1,5 @@
---
"Index seal rest test":
"Index synced flush rest test":
- do:
indices.create:
index: testing
@ -8,8 +8,11 @@
cluster.health:
wait_for_status: yellow
- do:
indices.seal:
indices.flush.synced:
index: testing
- is_false: _shards.failed
- do:
indices.stats: {level: shards}

View File

@ -103,8 +103,6 @@ import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettin
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction;
import org.elasticsearch.action.admin.indices.seal.SealIndicesAction;
import org.elasticsearch.action.admin.indices.seal.TransportSealIndicesAction;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.delete.TransportDeleteIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesAction;
@ -260,7 +258,6 @@ public class ActionModule extends AbstractModule {
registerAction(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class);
registerAction(RefreshAction.INSTANCE, TransportRefreshAction.class);
registerAction(FlushAction.INSTANCE, TransportFlushAction.class);
registerAction(SealIndicesAction.INSTANCE, TransportSealIndicesAction.class);
registerAction(OptimizeAction.INSTANCE, TransportOptimizeAction.class);
registerAction(UpgradeAction.INSTANCE, TransportUpgradeAction.class);
registerAction(UpgradeStatusAction.INSTANCE, TransportUpgradeStatusAction.class);

View File

@ -1,45 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.seal;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
/**
*/
public class SealIndicesAction extends Action<SealIndicesRequest, SealIndicesResponse, SealIndicesRequestBuilder> {
public static final SealIndicesAction INSTANCE = new SealIndicesAction();
public static final String NAME = "indices:admin/seal";
private SealIndicesAction() {
super(NAME);
}
@Override
public SealIndicesResponse newResponse() {
return new SealIndicesResponse();
}
@Override
public SealIndicesRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new SealIndicesRequestBuilder(client, this);
}
}

View File

@ -1,49 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.seal;
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import java.util.Arrays;
/**
* A request to seal one or more indices.
*/
public class SealIndicesRequest extends BroadcastRequest {
SealIndicesRequest() {
}
/**
* Constructs a seal request against one or more indices. If nothing is provided, all indices will
* be sealed.
*/
public SealIndicesRequest(String... indices) {
super(indices);
}
@Override
public String toString() {
return "SealIndicesRequest{" +
"indices=" + Arrays.toString(indices) +
", indicesOptions=" + indicesOptions() +
'}';
}
}

View File

@ -1,38 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.seal;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
/**
*
*/
public class SealIndicesRequestBuilder extends ActionRequestBuilder<SealIndicesRequest, SealIndicesResponse, SealIndicesRequestBuilder> {
public SealIndicesRequestBuilder(ElasticsearchClient client, SealIndicesAction action) {
super(client, action, new SealIndicesRequest());
}
public SealIndicesRequestBuilder indices(String ... indices) {
request.indices(indices);
return this;
}
}

View File

@ -1,171 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.seal;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.indices.SyncedFlushService;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.*;
/**
* A response to a seal action on several indices.
*/
public class SealIndicesResponse extends ActionResponse implements ToXContent {
final private Set<SyncedFlushService.SyncedFlushResult> results;
private RestStatus restStatus;
SealIndicesResponse() {
results = new HashSet<>();
}
SealIndicesResponse(Set<SyncedFlushService.SyncedFlushResult> results) {
this.results = results;
if (allShardsFailed()) {
restStatus = RestStatus.CONFLICT;
} else if (someShardsFailed()) {
restStatus = RestStatus.PARTIAL_CONTENT;
} else {
restStatus = RestStatus.OK;
}
}
public RestStatus status() {
return restStatus;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
results.clear();
for (int i = 0; i < size; i++) {
SyncedFlushService.SyncedFlushResult syncedFlushResult = new SyncedFlushService.SyncedFlushResult();
syncedFlushResult.readFrom(in);
results.add(syncedFlushResult);
}
restStatus = RestStatus.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(results.size());
for (SyncedFlushService.SyncedFlushResult syncedFlushResult : results) {
syncedFlushResult.writeTo(out);
}
RestStatus.writeTo(out, restStatus);
}
public Set<SyncedFlushService.SyncedFlushResult> results() {
return results;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Map<String, Map<Integer, Object>> allResults = new HashMap<>();
// first, sort everything by index and shard id
for (SyncedFlushService.SyncedFlushResult result : results) {
String indexName = result.getShardId().index().name();
int shardId = result.getShardId().getId();
if (allResults.get(indexName) == null) {
// no results yet for this index
allResults.put(indexName, new TreeMap<Integer, Object>());
}
if (result.shardResponses().size() > 0) {
Map<ShardRouting, SyncedFlushService.SyncedFlushResponse> shardResponses = new HashMap<>();
for (Map.Entry<ShardRouting, SyncedFlushService.SyncedFlushResponse> shardResponse : result.shardResponses().entrySet()) {
shardResponses.put(shardResponse.getKey(), shardResponse.getValue());
}
allResults.get(indexName).put(shardId, shardResponses);
} else {
allResults.get(indexName).put(shardId, result.failureReason());
}
}
for (Map.Entry<String, Map<Integer, Object>> result : allResults.entrySet()) {
builder.startArray(result.getKey());
for (Map.Entry<Integer, Object> shardResponse : result.getValue().entrySet()) {
builder.startObject();
builder.field("shard_id", shardResponse.getKey());
if (shardResponse.getValue() instanceof Map) {
builder.startObject("responses");
Map<ShardRouting, SyncedFlushService.SyncedFlushResponse> results = (Map<ShardRouting, SyncedFlushService.SyncedFlushResponse>) shardResponse.getValue();
boolean success = true;
for (Map.Entry<ShardRouting, SyncedFlushService.SyncedFlushResponse> shardCopy : results.entrySet()) {
builder.field(shardCopy.getKey().currentNodeId(), shardCopy.getValue().success() ? "success" : shardCopy.getValue().failureReason());
if (shardCopy.getValue().success() == false) {
success = false;
}
}
builder.endObject();
builder.field("message", success ? "success" : "failed on some copies");
} else {
builder.field("message", shardResponse.getValue()); // must be a string
}
builder.endObject();
}
builder.endArray();
}
return builder;
}
public boolean allShardsFailed() {
for (SyncedFlushService.SyncedFlushResult result : results) {
if (result.success()) {
return false;
}
if (result.shardResponses().size() > 0) {
for (Map.Entry<ShardRouting, SyncedFlushService.SyncedFlushResponse> shardResponse : result.shardResponses().entrySet()) {
if (shardResponse.getValue().success()) {
return false;
}
}
}
}
return true;
}
public boolean someShardsFailed() {
for (SyncedFlushService.SyncedFlushResult result : results) {
if (result.success() == false) {
return true;
}
if (result.shardResponses().size() > 0) {
for (Map.Entry<ShardRouting, SyncedFlushService.SyncedFlushResponse> shardResponse : result.shardResponses().entrySet()) {
if (shardResponse.getValue().success() == false) {
return true;
}
}
}
}
return false;
}
}

View File

@ -1,94 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.seal;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.SyncedFlushService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.util.Set;
/**
*/
public class TransportSealIndicesAction extends HandledTransportAction<SealIndicesRequest, SealIndicesResponse> {
final private SyncedFlushService syncedFlushService;
final private ClusterService clusterService;
@Inject
public TransportSealIndicesAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, SyncedFlushService syncedFlushService, ClusterService clusterService) {
super(settings, SealIndicesAction.NAME, threadPool, transportService, actionFilters, SealIndicesRequest.class);
this.syncedFlushService = syncedFlushService;
this.clusterService = clusterService;
}
@Override
protected void doExecute(final SealIndicesRequest request, final ActionListener<SealIndicesResponse> listener) {
ClusterState state = clusterService.state();
String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices());
GroupShardsIterator primaries = state.routingTable().activePrimaryShardsGrouped(concreteIndices, true);
final Set<SyncedFlushService.SyncedFlushResult> results = ConcurrentCollections.newConcurrentSet();
final CountDown countDown = new CountDown(primaries.size());
for (final ShardIterator shard : primaries) {
if (shard.size() == 0) {
results.add(new SyncedFlushService.SyncedFlushResult(shard.shardId(), "no active primary available"));
if (countDown.countDown()) {
listener.onResponse(new SealIndicesResponse(results));
}
} else {
final ShardId shardId = shard.shardId();
syncedFlushService.attemptSyncedFlush(shardId, new ActionListener<SyncedFlushService.SyncedFlushResult>() {
@Override
public void onResponse(SyncedFlushService.SyncedFlushResult syncedFlushResult) {
results.add(syncedFlushResult);
if (countDown.countDown()) {
listener.onResponse(new SealIndicesResponse(results));
}
}
@Override
public void onFailure(Throwable e) {
logger.debug("{} unexpected error while executing synced flush", shardId);
results.add(new SyncedFlushService.SyncedFlushResult(shardId, e.getMessage()));
if (countDown.countDown()) {
listener.onResponse(new SealIndicesResponse(results));
}
}
});
}
}
}
}

View File

@ -84,9 +84,6 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRespons
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.seal.SealIndicesRequest;
import org.elasticsearch.action.admin.indices.seal.SealIndicesRequestBuilder;
import org.elasticsearch.action.admin.indices.seal.SealIndicesResponse;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequestBuilder;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse;
@ -123,7 +120,6 @@ import org.elasticsearch.common.Nullable;
*/
public interface IndicesAdminClient extends ElasticsearchClient {
/**
* Indices Exists.
*
@ -368,27 +364,6 @@ public interface IndicesAdminClient extends ElasticsearchClient {
*/
FlushRequestBuilder prepareFlush(String... indices);
/**
* Explicitly sync flush one or more indices
*
* @param request The seal indices request
* @return A result future
*/
ActionFuture<SealIndicesResponse> sealIndices(SealIndicesRequest request);
/**
* Explicitly sync flush one or more indices
*
* @param request The seal indices request
* @param listener A listener to be notified with a result
*/
void sealIndices(SealIndicesRequest request, ActionListener<SealIndicesResponse> listener);
/**
* Explicitly seal one or more indices
*/
SealIndicesRequestBuilder prepareSealIndices(String... indices);
/**
* Explicitly optimize one or more indices into a the number of segments.
*

View File

@ -180,10 +180,6 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.seal.SealIndicesAction;
import org.elasticsearch.action.admin.indices.seal.SealIndicesRequest;
import org.elasticsearch.action.admin.indices.seal.SealIndicesRequestBuilder;
import org.elasticsearch.action.admin.indices.seal.SealIndicesResponse;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequestBuilder;
@ -1348,21 +1344,6 @@ public abstract class AbstractClient extends AbstractComponent implements Client
return new FlushRequestBuilder(this, FlushAction.INSTANCE).setIndices(indices);
}
@Override
public ActionFuture<SealIndicesResponse> sealIndices(SealIndicesRequest request) {
return execute(SealIndicesAction.INSTANCE, request);
}
@Override
public void sealIndices(SealIndicesRequest request, ActionListener<SealIndicesResponse> listener) {
execute(SealIndicesAction.INSTANCE, request, listener);
}
@Override
public SealIndicesRequestBuilder prepareSealIndices(String... indices) {
return new SealIndicesRequestBuilder(this, SealIndicesAction.INSTANCE).indices(indices);
}
@Override
public void getMappings(GetMappingsRequest request, ActionListener<GetMappingsResponse> listener) {
execute(GetMappingsAction.INSTANCE, request, listener);

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index.shard;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.search.Query;
@ -1397,7 +1396,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
public int getOperationsCount() {
return indexShardOperationCounter.refCount();
return Math.max(0, indexShardOperationCounter.refCount() - 1); // refCount is incremented on creation and decremented on close
}
/**

View File

@ -32,6 +32,7 @@ import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
import org.elasticsearch.indices.flush.SyncedFlushService;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.indices.recovery.RecoverySettings;

View File

@ -0,0 +1,153 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.flush;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* The result of performing a sync flush operation on all shards of multiple indices
*/
public class IndicesSyncedFlushResult implements ToXContent {
final Map<String, List<ShardsSyncedFlushResult>> shardsResultPerIndex;
final ShardCounts shardCounts;
public IndicesSyncedFlushResult(Map<String, List<ShardsSyncedFlushResult>> shardsResultPerIndex) {
this.shardsResultPerIndex = ImmutableMap.copyOf(shardsResultPerIndex);
this.shardCounts = calculateShardCounts(Iterables.concat(shardsResultPerIndex.values()));
}
/** total number shards, including replicas, both assigned and unassigned */
public int totalShards() {
return shardCounts.total;
}
/** total number of shards for which the operation failed */
public int failedShards() {
return shardCounts.failed;
}
/** total number of shards which were successfully sync-flushed */
public int successfulShards() {
return shardCounts.successful;
}
public RestStatus restStatus() {
return failedShards() == 0 ? RestStatus.OK : RestStatus.CONFLICT;
}
public Map<String, List<ShardsSyncedFlushResult>> getShardsResultPerIndex() {
return shardsResultPerIndex;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields._SHARDS);
shardCounts.toXContent(builder, params);
builder.endObject();
for (Map.Entry<String, List<ShardsSyncedFlushResult>> indexEntry : shardsResultPerIndex.entrySet()) {
List<ShardsSyncedFlushResult> indexResult = indexEntry.getValue();
builder.startObject(indexEntry.getKey());
ShardCounts indexShardCounts = calculateShardCounts(indexResult);
indexShardCounts.toXContent(builder, params);
if (indexShardCounts.failed > 0) {
builder.startArray(Fields.FAILURES);
for (ShardsSyncedFlushResult shardResults : indexResult) {
if (shardResults.failed()) {
builder.startObject();
builder.field(Fields.SHARD, shardResults.shardId().id());
builder.field(Fields.REASON, shardResults.failureReason());
builder.endObject();
continue;
}
Map<ShardRouting, SyncedFlushService.SyncedFlushResponse> failedShards = shardResults.failedShards();
for (Map.Entry<ShardRouting, SyncedFlushService.SyncedFlushResponse> shardEntry : failedShards.entrySet()) {
builder.startObject();
builder.field(Fields.SHARD, shardResults.shardId().id());
builder.field(Fields.REASON, shardEntry.getValue().failureReason());
builder.field(Fields.ROUTING, shardEntry.getKey());
builder.endObject();
}
}
builder.endArray();
}
builder.endObject();
}
return builder;
}
static ShardCounts calculateShardCounts(Iterable<ShardsSyncedFlushResult> results) {
int total = 0, successful = 0, failed = 0;
for (ShardsSyncedFlushResult result : results) {
total += result.totalShards();
successful += result.successfulShards();
if (result.failed()) {
// treat all shard copies as failed
failed += result.totalShards();
} else {
// some shards may have failed during the sync phase
failed += result.failedShards().size();
}
}
return new ShardCounts(total, successful, failed);
}
static final class ShardCounts implements ToXContent {
public final int total;
public final int successful;
public final int failed;
ShardCounts(int total, int successful, int failed) {
this.total = total;
this.successful = successful;
this.failed = failed;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.TOTAL, total);
builder.field(Fields.SUCCESSFUL, successful);
builder.field(Fields.FAILED, failed);
return builder;
}
}
static final class Fields {
static final XContentBuilderString _SHARDS = new XContentBuilderString("_shards");
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
static final XContentBuilderString SUCCESSFUL = new XContentBuilderString("successful");
static final XContentBuilderString FAILED = new XContentBuilderString("failed");
static final XContentBuilderString FAILURES = new XContentBuilderString("failures");
static final XContentBuilderString SHARD = new XContentBuilderString("shard");
static final XContentBuilderString ROUTING = new XContentBuilderString("routing");
static final XContentBuilderString REASON = new XContentBuilderString("reason");
}
}

View File

@ -0,0 +1,132 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.flush;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.shard.ShardId;
import java.util.HashMap;
import java.util.Map;
/**
* Result for all copies of a shard
*/
public class ShardsSyncedFlushResult {
private String failureReason;
private Map<ShardRouting, SyncedFlushService.SyncedFlushResponse> shardResponses;
private String syncId;
private ShardId shardId;
// some shards may be unassigned, so we need this as state
private int totalShards;
public ShardsSyncedFlushResult() {
}
public ShardId getShardId() {
return shardId;
}
/**
* failure constructor
*/
public ShardsSyncedFlushResult(ShardId shardId, int totalShards, String failureReason) {
this.syncId = null;
this.failureReason = failureReason;
this.shardResponses = ImmutableMap.of();
this.shardId = shardId;
this.totalShards = totalShards;
}
/**
* success constructor
*/
public ShardsSyncedFlushResult(ShardId shardId, String syncId, int totalShards, Map<ShardRouting, SyncedFlushService.SyncedFlushResponse> shardResponses) {
this.failureReason = null;
ImmutableMap.Builder<ShardRouting, SyncedFlushService.SyncedFlushResponse> builder = ImmutableMap.builder();
this.shardResponses = builder.putAll(shardResponses).build();
this.syncId = syncId;
this.totalShards = totalShards;
this.shardId = shardId;
}
/**
* @return true if the operation failed before reaching step three of synced flush. {@link #failureReason()} can be used for
* more details
*/
public boolean failed() {
return failureReason != null;
}
/**
* @return the reason for the failure if synced flush failed before step three of synced flush
*/
public String failureReason() {
return failureReason;
}
public String syncId() {
return syncId;
}
/**
* @return total number of shards for which a sync attempt was made
*/
public int totalShards() {
return totalShards;
}
/**
* @return total number of successful shards
*/
public int successfulShards() {
int i = 0;
for (SyncedFlushService.SyncedFlushResponse result : shardResponses.values()) {
if (result.success()) {
i++;
}
}
return i;
}
/**
* @return an array of shard failures
*/
public Map<ShardRouting, SyncedFlushService.SyncedFlushResponse> failedShards() {
Map<ShardRouting, SyncedFlushService.SyncedFlushResponse> failures = new HashMap<>();
for (Map.Entry<ShardRouting, SyncedFlushService.SyncedFlushResponse> result : shardResponses.entrySet()) {
if (result.getValue().success() == false) {
failures.put(result.getKey(), result.getValue());
}
}
return failures;
}
/**
* @return Individual responses for each shard copy with a detailed failure message if the copy failed to perform the synced flush.
* Empty if synced flush failed before step three.
*/
public Map<ShardRouting, SyncedFlushService.SyncedFlushResponse> shardResponses() {
return shardResponses;
}
public ShardId shardId() {
return shardId;
}
}

View File

@ -16,17 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices;
package org.elasticsearch.indices.flush;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ImmutableShardRouting;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -44,10 +43,16 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@ -77,10 +82,10 @@ public class SyncedFlushService extends AbstractComponent {
public void onShardInactive(final IndexShard indexShard) {
// we only want to call sync flush once, so only trigger it when we are on a primary
if (indexShard.routingEntry().primary()) {
attemptSyncedFlush(indexShard.shardId(), new ActionListener<SyncedFlushResult>() {
attemptSyncedFlush(indexShard.shardId(), new ActionListener<ShardsSyncedFlushResult>() {
@Override
public void onResponse(SyncedFlushResult syncedFlushResult) {
logger.debug("{} sync flush on inactive shard returned successfully for sync_id: {}", syncedFlushResult.getShardId(), syncedFlushResult.syncId());
public void onResponse(ShardsSyncedFlushResult syncedFlushResult) {
logger.trace("{} sync flush on inactive shard returned successfully for sync_id: {}", syncedFlushResult.getShardId(), syncedFlushResult.syncId());
}
@Override
@ -93,6 +98,56 @@ public class SyncedFlushService extends AbstractComponent {
});
}
/**
* a utility method to perform a synced flush for all shards of multiple indices. see {@link #attemptSyncedFlush(ShardId, ActionListener)}
* for more details.
*/
public void attemptSyncedFlush(final String[] aliasesOrIndices, IndicesOptions indicesOptions, final ActionListener<IndicesSyncedFlushResult> listener) {
final ClusterState state = clusterService.state();
final String[] concreteIndices = state.metaData().concreteIndices(indicesOptions, aliasesOrIndices);
final Map<String, List<ShardsSyncedFlushResult>> results = ConcurrentCollections.newConcurrentMap();
int totalNumberOfShards = 0;
int numberOfShards = 0;
for (String index : concreteIndices) {
final IndexMetaData indexMetaData = state.metaData().index(index);
totalNumberOfShards += indexMetaData.totalNumberOfShards();
numberOfShards += indexMetaData.getNumberOfShards();
results.put(index, Collections.synchronizedList(new ArrayList<ShardsSyncedFlushResult>()));
}
if (numberOfShards == 0) {
listener.onResponse(new IndicesSyncedFlushResult(results));
return;
}
final int finalTotalNumberOfShards = totalNumberOfShards;
final CountDown countDown = new CountDown(numberOfShards);
for (final String index : concreteIndices) {
final int indexNumberOfShards = state.metaData().index(index).getNumberOfShards();
for (int shard = 0; shard < indexNumberOfShards; shard++) {
final ShardId shardId = new ShardId(index, shard);
attemptSyncedFlush(shardId, new ActionListener<ShardsSyncedFlushResult>() {
@Override
public void onResponse(ShardsSyncedFlushResult syncedFlushResult) {
results.get(index).add(syncedFlushResult);
if (countDown.countDown()) {
listener.onResponse(new IndicesSyncedFlushResult(results));
}
}
@Override
public void onFailure(Throwable e) {
logger.debug("{} unexpected error while executing synced flush", shardId);
results.get(index).add(new ShardsSyncedFlushResult(shardId, finalTotalNumberOfShards, e.getMessage()));
if (countDown.countDown()) {
listener.onResponse(new IndicesSyncedFlushResult(results));
}
}
});
}
}
}
/*
* Tries to flush all copies of a shard and write a sync id to it.
* After a synced flush two shard copies may only contain the same sync id if they contain the same documents.
@ -119,28 +174,36 @@ public class SyncedFlushService extends AbstractComponent {
*
* Synced flush is a best effort operation. The sync id may be written on all, some or none of the copies.
**/
public void attemptSyncedFlush(final ShardId shardId, final ActionListener<SyncedFlushResult> actionListener) {
public void attemptSyncedFlush(final ShardId shardId, final ActionListener<ShardsSyncedFlushResult> actionListener) {
try {
final ClusterState state = clusterService.state();
final IndexShardRoutingTable shardRoutingTable = getActiveShardRoutings(shardId, state);
final IndexShardRoutingTable shardRoutingTable = getShardRoutingTable(shardId, state);
final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
final int totalShards = shardRoutingTable.getSize();
if (activeShards.size() == 0) {
actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "no active shards"));
return;
}
final ActionListener<Map<String, Engine.CommitId>> commitIdsListener = new ActionListener<Map<String, Engine.CommitId>>() {
@Override
public void onResponse(final Map<String, Engine.CommitId> commitIds) {
if (commitIds.isEmpty()) {
actionListener.onResponse(new SyncedFlushResult(shardId, "all shards failed to commit on pre-sync"));
actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "all shards failed to commit on pre-sync"));
return;
}
final ActionListener<InFlightOpsResponse> inflightOpsListener = new ActionListener<InFlightOpsResponse>() {
@Override
public void onResponse(InFlightOpsResponse response) {
final int inflight = response.opCount();
assert inflight >= -1;
if (inflight != 1) { // 1 means that there are no write operations are in flight (>1) and the shard is not closed (0).
actionListener.onResponse(new SyncedFlushResult(shardId, "operation counter on primary is non zero [" + inflight + "]"));
assert inflight >= 0;
if (inflight != 0) {
actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + "] ongoing operations on primary"));
} else {
// 3. now send the sync request to all the shards
String syncId = Strings.base64UUID();
sendSyncRequests(syncId, activeShards, state, commitIds, shardId, actionListener);
sendSyncRequests(syncId, activeShards, state, commitIds, shardId, totalShards, actionListener);
}
}
@ -166,7 +229,7 @@ public class SyncedFlushService extends AbstractComponent {
}
}
final IndexShardRoutingTable getActiveShardRoutings(ShardId shardId, ClusterState state) {
final IndexShardRoutingTable getShardRoutingTable(ShardId shardId, ClusterState state) {
final IndexRoutingTable indexRoutingTable = state.routingTable().index(shardId.index().name());
if (indexRoutingTable == null) {
IndexMetaData index = state.getMetaData().index(shardId.index().getName());
@ -183,7 +246,7 @@ public class SyncedFlushService extends AbstractComponent {
}
/**
* returns the number of inflight operations on primary. -1 upon error.
* returns the number of in flight operations on primary. -1 upon error.
*/
protected void getInflightOpsCount(final ShardId shardId, ClusterState state, IndexShardRoutingTable shardRoutingTable, final ActionListener<InFlightOpsResponse> listener) {
try {
@ -209,7 +272,7 @@ public class SyncedFlushService extends AbstractComponent {
@Override
public void handleException(TransportException exp) {
logger.debug("{} unexpected error while retrieving inflight op count", shardId);
logger.debug("{} unexpected error while retrieving in flight op count", shardId);
listener.onFailure(exp);
}
@ -224,7 +287,8 @@ public class SyncedFlushService extends AbstractComponent {
}
void sendSyncRequests(final String syncId, final List<ShardRouting> shards, ClusterState state, Map<String, Engine.CommitId> expectedCommitIds, final ShardId shardId, final ActionListener<SyncedFlushResult> listener) {
void sendSyncRequests(final String syncId, final List<ShardRouting> shards, ClusterState state, Map<String, Engine.CommitId> expectedCommitIds,
final ShardId shardId, final int totalShards, final ActionListener<ShardsSyncedFlushResult> listener) {
final CountDown countDown = new CountDown(shards.size());
final Map<ShardRouting, SyncedFlushResponse> results = ConcurrentCollections.newConcurrentMap();
for (final ShardRouting shard : shards) {
@ -232,14 +296,14 @@ public class SyncedFlushService extends AbstractComponent {
if (node == null) {
logger.trace("{} is assigned to an unknown node. skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
results.put(shard, new SyncedFlushResponse("unknown node"));
contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results);
contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
continue;
}
final Engine.CommitId expectedCommitId = expectedCommitIds.get(shard.currentNodeId());
if (expectedCommitId == null) {
logger.trace("{} can't resolve expected commit id for {}, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
results.put(shard, new SyncedFlushResponse("no commit id from pre-sync flush"));
contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results);
contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
continue;
}
logger.trace("{} sending synced flush request to {}. sync id [{}].", shardId, shard, syncId);
@ -255,14 +319,14 @@ public class SyncedFlushService extends AbstractComponent {
SyncedFlushResponse existing = results.put(shard, response);
assert existing == null : "got two answers for node [" + node + "]";
// count after the assert so we won't decrement twice in handleException
contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results);
contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
}
@Override
public void handleException(TransportException exp) {
logger.trace("{} error while performing synced flush on [{}], skipping", exp, shardId, shard);
results.put(shard, new SyncedFlushResponse(exp.getMessage()));
contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results);
contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
}
@Override
@ -274,10 +338,12 @@ public class SyncedFlushService extends AbstractComponent {
}
private void contDownAndSendResponseIfDone(String syncId, List<ShardRouting> shards, ShardId shardId, ActionListener<SyncedFlushResult> listener, CountDown countDown, Map<ShardRouting, SyncedFlushResponse> results) {
private void contDownAndSendResponseIfDone(String syncId, List<ShardRouting> shards, ShardId shardId, int totalShards,
ActionListener<ShardsSyncedFlushResult> listener, CountDown countDown, Map<ShardRouting,
SyncedFlushResponse> results) {
if (countDown.countDown()) {
assert results.size() == shards.size();
listener.onResponse(new SyncedFlushResult(shardId, syncId, results));
listener.onResponse(new ShardsSyncedFlushResult(shardId, syncId, totalShards, results));
}
}
@ -292,8 +358,8 @@ public class SyncedFlushService extends AbstractComponent {
final DiscoveryNode node = state.nodes().get(shard.currentNodeId());
if (node == null) {
logger.trace("{} shard routing {} refers to an unknown node. skipping.", shardId, shard);
if(countDown.countDown()) {
listener.onResponse(commitIds);
if (countDown.countDown()) {
listener.onResponse(commitIds);
}
continue;
}
@ -308,7 +374,7 @@ public class SyncedFlushService extends AbstractComponent {
Engine.CommitId existing = commitIds.putIfAbsent(node.id(), response.commitId());
assert existing == null : "got two answers for node [" + node + "]";
// count after the assert so we won't decrement twice in handleException
if(countDown.countDown()) {
if (countDown.countDown()) {
listener.onResponse(commitIds);
}
}
@ -316,7 +382,7 @@ public class SyncedFlushService extends AbstractComponent {
@Override
public void handleException(TransportException exp) {
logger.trace("{} error while performing pre synced flush on [{}], skipping", shardId, exp, shard);
if(countDown.countDown()) {
if (countDown.countDown()) {
listener.onResponse(commitIds);
}
}
@ -343,7 +409,7 @@ public class SyncedFlushService extends AbstractComponent {
IndexShard indexShard = indexService.shardSafe(request.shardId().id());
logger.trace("{} performing sync flush. sync id [{}], expected commit id {}", request.shardId(), request.syncId(), request.expectedCommitId());
Engine.SyncedFlushResult result = indexShard.syncFlush(request.syncId(), request.expectedCommitId());
logger.trace("{} sync flush done. sync id [{}], result [{}]", request.shardId(), request.syncId(), result);
logger.trace("{} sync flush done. sync id [{}], result [{}]", request.shardId(), request.syncId(), result);
switch (result) {
case SUCCESS:
return new SyncedFlushResponse();
@ -367,124 +433,6 @@ public class SyncedFlushService extends AbstractComponent {
return new InFlightOpsResponse(opCount);
}
/**
* Result for all copies of a shard
*/
public static class SyncedFlushResult extends TransportResponse {
private String failureReason;
private Map<ShardRouting, SyncedFlushResponse> shardResponses;
private String syncId;
private ShardId shardId;
public SyncedFlushResult() {
}
public ShardId getShardId() {
return shardId;
}
/**
* failure constructor
*/
public SyncedFlushResult(ShardId shardId, String failureReason) {
this.syncId = null;
this.failureReason = failureReason;
this.shardResponses = ImmutableMap.of();
this.shardId = shardId;
}
/**
* success constructor
*/
public SyncedFlushResult(ShardId shardId, String syncId, Map<ShardRouting, SyncedFlushResponse> shardResponses) {
this.failureReason = null;
ImmutableMap.Builder<ShardRouting, SyncedFlushResponse> builder = ImmutableMap.builder();
this.shardResponses = builder.putAll(shardResponses).build();
this.syncId = syncId;
this.shardId = shardId;
}
/**
* @return true if one or more shard copies was successful, false if all failed before step three of synced flush
*/
public boolean success() {
return syncId != null;
}
/**
* @return the reason for the failure if synced flush failed before step three of synced flush
*/
public String failureReason() {
return failureReason;
}
public String syncId() {
return syncId;
}
/**
* @return total number of shards for which a sync attempt was made
*/
public int totalShards() {
return shardResponses.size();
}
/**
* @return total number of successful shards
*/
public int successfulShards() {
int i = 0;
for (SyncedFlushResponse result : shardResponses.values()) {
if (result.success()) {
i++;
}
}
return i;
}
/**
* @return Individual responses for each shard copy with a detailed failure message if the copy failed to perform the synced flush.
* Empty if synced flush failed before step three.
*/
public Map<ShardRouting, SyncedFlushResponse> shardResponses() {
return shardResponses;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(failureReason);
out.writeOptionalString(syncId);
out.writeVInt(shardResponses.size());
for (Map.Entry<ShardRouting, SyncedFlushResponse> result : shardResponses.entrySet()) {
result.getKey().writeTo(out);
result.getValue().writeTo(out);
}
shardId.writeTo(out);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
failureReason = in.readOptionalString();
syncId = in.readOptionalString();
int size = in.readVInt();
ImmutableMap.Builder<ShardRouting, SyncedFlushResponse> builder = ImmutableMap.builder();
for (int i = 0; i < size; i++) {
ImmutableShardRouting shardRouting = ImmutableShardRouting.readShardRoutingEntry(in);
SyncedFlushResponse syncedFlushRsponse = new SyncedFlushResponse();
syncedFlushRsponse.readFrom(in);
builder.put(shardRouting, syncedFlushRsponse);
}
shardResponses = builder.build();
shardId = ShardId.readShardId(in);
}
public ShardId shardId() {
return shardId;
}
}
final static class PreSyncedFlushRequest extends TransportRequest {
private ShardId shardId;

View File

@ -21,7 +21,6 @@ package org.elasticsearch.indices.recovery;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
@ -199,6 +198,8 @@ public class RecoverySourceHandler {
}
// we shortcut recovery here because we have nothing to copy. but we must still start the engine on the target.
// so we don't return here
logger.trace("[{}][{}] skipping [phase1] to {} - identical sync id [{}] found on both source and target", indexName, shardId,
request.targetNode(), recoverySourceSyncId);
} else {
final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot());
for (StoreFileMetaData md : diff.identical) {

View File

@ -235,6 +235,9 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
metadataSnapshot.writeTo(out);
}
/**
* @return commit sync id if exists, else null
*/
public String syncId() {
return metadataSnapshot.getSyncId();
}

View File

@ -23,9 +23,6 @@ import com.google.common.collect.Lists;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.action.admin.indices.seal.RestSealIndicesAction;
import org.elasticsearch.rest.action.admin.indices.upgrade.RestUpgradeAction;
import org.elasticsearch.rest.action.admin.cluster.repositories.verify.RestVerifyRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.health.RestClusterHealthAction;
import org.elasticsearch.rest.action.admin.cluster.node.hotthreads.RestNodesHotThreadsAction;
import org.elasticsearch.rest.action.admin.cluster.node.info.RestNodesInfoAction;
@ -33,6 +30,7 @@ import org.elasticsearch.rest.action.admin.cluster.node.stats.RestNodesStatsActi
import org.elasticsearch.rest.action.admin.cluster.repositories.delete.RestDeleteRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.repositories.get.RestGetRepositoriesAction;
import org.elasticsearch.rest.action.admin.cluster.repositories.put.RestPutRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.repositories.verify.RestVerifyRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.reroute.RestClusterRerouteAction;
import org.elasticsearch.rest.action.admin.cluster.settings.RestClusterGetSettingsAction;
import org.elasticsearch.rest.action.admin.cluster.settings.RestClusterUpdateSettingsAction;
@ -59,6 +57,7 @@ import org.elasticsearch.rest.action.admin.indices.delete.RestDeleteIndexAction;
import org.elasticsearch.rest.action.admin.indices.exists.indices.RestIndicesExistsAction;
import org.elasticsearch.rest.action.admin.indices.exists.types.RestTypesExistsAction;
import org.elasticsearch.rest.action.admin.indices.flush.RestFlushAction;
import org.elasticsearch.rest.action.admin.indices.flush.RestSyncedFlushAction;
import org.elasticsearch.rest.action.admin.indices.get.RestGetIndicesAction;
import org.elasticsearch.rest.action.admin.indices.mapping.get.RestGetFieldMappingAction;
import org.elasticsearch.rest.action.admin.indices.mapping.get.RestGetMappingAction;
@ -75,6 +74,7 @@ import org.elasticsearch.rest.action.admin.indices.template.delete.RestDeleteInd
import org.elasticsearch.rest.action.admin.indices.template.get.RestGetIndexTemplateAction;
import org.elasticsearch.rest.action.admin.indices.template.head.RestHeadIndexTemplateAction;
import org.elasticsearch.rest.action.admin.indices.template.put.RestPutIndexTemplateAction;
import org.elasticsearch.rest.action.admin.indices.upgrade.RestUpgradeAction;
import org.elasticsearch.rest.action.admin.indices.validate.query.RestValidateQueryAction;
import org.elasticsearch.rest.action.admin.indices.warmer.delete.RestDeleteWarmerAction;
import org.elasticsearch.rest.action.admin.indices.warmer.get.RestGetWarmerAction;
@ -183,7 +183,7 @@ public class RestActionModule extends AbstractModule {
bind(RestRefreshAction.class).asEagerSingleton();
bind(RestFlushAction.class).asEagerSingleton();
bind(RestSealIndicesAction.class).asEagerSingleton();
bind(RestSyncedFlushAction.class).asEagerSingleton();
bind(RestOptimizeAction.class).asEagerSingleton();
bind(RestUpgradeAction.class).asEagerSingleton();
bind(RestClearIndicesCacheAction.class).asEagerSingleton();

View File

@ -17,17 +17,16 @@
* under the License.
*/
package org.elasticsearch.rest.action.admin.indices.seal;
package org.elasticsearch.rest.action.admin.indices.flush;
import org.elasticsearch.action.admin.indices.seal.SealIndicesAction;
import org.elasticsearch.action.admin.indices.seal.SealIndicesRequest;
import org.elasticsearch.action.admin.indices.seal.SealIndicesResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.indices.flush.IndicesSyncedFlushResult;
import org.elasticsearch.indices.flush.SyncedFlushService;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestBuilderListener;
@ -37,29 +36,33 @@ import static org.elasticsearch.rest.RestRequest.Method.POST;
/**
*
*/
public class RestSealIndicesAction extends BaseRestHandler {
public class RestSyncedFlushAction extends BaseRestHandler {
private final SyncedFlushService syncedFlushService;
@Inject
public RestSealIndicesAction(Settings settings, RestController controller, Client client) {
public RestSyncedFlushAction(Settings settings, RestController controller, Client client, SyncedFlushService syncedFlushService) {
super(settings, controller, client);
controller.registerHandler(POST, "/_seal", this);
controller.registerHandler(POST, "/{index}/_seal", this);
this.syncedFlushService = syncedFlushService;
controller.registerHandler(POST, "/_flush/synced", this);
controller.registerHandler(POST, "/{index}/_flush/synced", this);
controller.registerHandler(GET, "/_seal", this);
controller.registerHandler(GET, "/{index}/_seal", this);
controller.registerHandler(GET, "/_flush/synced", this);
controller.registerHandler(GET, "/{index}/_flush/synced", this);
}
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
SealIndicesRequest sealIndicesRequest = new SealIndicesRequest(indices);
client.admin().indices().execute(SealIndicesAction.INSTANCE, sealIndicesRequest, new RestBuilderListener<SealIndicesResponse>(channel) {
IndicesOptions indicesOptions = IndicesOptions.fromRequest(request, IndicesOptions.lenientExpandOpen());
syncedFlushService.attemptSyncedFlush(indices, indicesOptions, new RestBuilderListener<IndicesSyncedFlushResult>(channel) {
@Override
public RestResponse buildResponse(SealIndicesResponse response, XContentBuilder builder) throws Exception {
public RestResponse buildResponse(IndicesSyncedFlushResult results, XContentBuilder builder) throws Exception {
builder.startObject();
builder = response.toXContent(builder, ToXContent.EMPTY_PARAMS);
results.toXContent(builder, request);
builder.endObject();
return new BytesRestResponse(response.status(), builder);
return new BytesRestResponse(results.restStatus(), builder);
}
});
}

View File

@ -1,116 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.seal;
import org.elasticsearch.cluster.routing.ImmutableShardRouting;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.SyncedFlushService;
import org.elasticsearch.test.ElasticsearchTestCase;
import java.io.IOException;
import java.util.*;
import static org.elasticsearch.test.XContentTestUtils.convertToMap;
import static org.hamcrest.Matchers.equalTo;
public class SealIndicesTests extends ElasticsearchTestCase {
public void testSealIndicesResponseStreaming() throws IOException {
Set<SyncedFlushService.SyncedFlushResult> shardResults = new HashSet<>();
// add one result where one shard failed and one succeeded
SyncedFlushService.SyncedFlushResult syncedFlushResult = createSyncedFlushResult(0, "test");
shardResults.add(syncedFlushResult);
// add one result where all failed
syncedFlushResult = new SyncedFlushService.SyncedFlushResult(new ShardId("test", 1), "all failed :(");
shardResults.add(syncedFlushResult);
SealIndicesResponse sealIndicesResponse = new SealIndicesResponse(shardResults);
BytesStreamOutput out = new BytesStreamOutput();
sealIndicesResponse.writeTo(out);
out.close();
StreamInput in = StreamInput.wrap(out.bytes());
SealIndicesResponse readResponse = new SealIndicesResponse();
readResponse.readFrom(in);
Map<String, Object> asMap = convertToMap(readResponse);
assertResponse(asMap);
}
public void testXContentResponse() throws IOException {
Set<SyncedFlushService.SyncedFlushResult> shardResults = new HashSet<>();
// add one result where one shard failed and one succeeded
SyncedFlushService.SyncedFlushResult syncedFlushResult = createSyncedFlushResult(0, "test");
shardResults.add(syncedFlushResult);
// add one result where all failed
syncedFlushResult = new SyncedFlushService.SyncedFlushResult(new ShardId("test", 1), "all failed :(");
shardResults.add(syncedFlushResult);
SealIndicesResponse sealIndicesResponse = new SealIndicesResponse(shardResults);
Map<String, Object> asMap = convertToMap(sealIndicesResponse);
assertResponse(asMap);
}
protected void assertResponse(Map<String, Object> asMap) {
assertNotNull(asMap.get("test"));
assertThat((Integer) (((HashMap) ((ArrayList) asMap.get("test")).get(0)).get("shard_id")), equalTo(0));
assertThat((String) (((HashMap) ((ArrayList) asMap.get("test")).get(0)).get("message")), equalTo("failed on some copies"));
HashMap<String, String> shardResponses = (HashMap<String, String>) ((HashMap) ((ArrayList) asMap.get("test")).get(0)).get("responses");
assertThat(shardResponses.get("node_1"), equalTo("failed for some reason"));
assertThat(shardResponses.get("node_2"), equalTo("success"));
HashMap<String, Object> failedShard = (HashMap<String, Object>) (((ArrayList) asMap.get("test")).get(1));
assertThat((Integer) (failedShard.get("shard_id")), equalTo(1));
assertThat((String) (failedShard.get("message")), equalTo("all failed :("));
}
public void testXContentResponseSortsShards() throws IOException {
Set<SyncedFlushService.SyncedFlushResult> shardResults = new HashSet<>();
// add one result where one shard failed and one succeeded
SyncedFlushService.SyncedFlushResult syncedFlushResult;
for (int i = 100000; i >= 0; i--) {
if (randomBoolean()) {
syncedFlushResult = createSyncedFlushResult(i, "test");
shardResults.add(syncedFlushResult);
} else {
syncedFlushResult = new SyncedFlushService.SyncedFlushResult(new ShardId("test", i), "all failed :(");
shardResults.add(syncedFlushResult);
}
}
SealIndicesResponse sealIndicesResponse = new SealIndicesResponse(shardResults);
Map<String, Object> asMap = convertToMap(sealIndicesResponse);
assertNotNull(asMap.get("test"));
for (int i = 0; i < 100000; i++) {
assertThat((Integer) (((HashMap) ((ArrayList) asMap.get("test")).get(i)).get("shard_id")), equalTo(i));
}
}
protected SyncedFlushService.SyncedFlushResult createSyncedFlushResult(int shardId, String index) {
Map<ShardRouting, SyncedFlushService.SyncedFlushResponse> responses = new HashMap<>();
ImmutableShardRouting shardRouting = new ImmutableShardRouting(index, shardId, "node_1", false, ShardRoutingState.RELOCATING, 2);
SyncedFlushService.SyncedFlushResponse syncedFlushResponse = new SyncedFlushService.SyncedFlushResponse("failed for some reason");
responses.put(shardRouting, syncedFlushResponse);
shardRouting = new ImmutableShardRouting(index, shardId, "node_2", false, ShardRoutingState.RELOCATING, 2);
syncedFlushResponse = new SyncedFlushService.SyncedFlushResponse();
responses.put(shardRouting, syncedFlushResponse);
return new SyncedFlushService.SyncedFlushResult(new ShardId(index, shardId), "some_sync_id", responses);
}
}

View File

@ -32,13 +32,11 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.SyncedFlushService;
import org.elasticsearch.indices.flush.SyncedFlushUtil;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
import org.elasticsearch.indices.SyncedFlushUtil;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.junit.Test;
@ -397,11 +395,7 @@ public class RecoveryFromGatewayTests extends ElasticsearchIntegrationTest {
ensureGreen();
} else {
logger.info("--> trying to sync flush");
int numShards = Integer.parseInt(client().admin().indices().prepareGetSettings("test").get().getSetting("test", "index.number_of_shards"));
SyncedFlushService syncedFlushService = internalCluster().getInstance(SyncedFlushService.class);
for (int i = 0; i < numShards; i++) {
assertTrue(SyncedFlushUtil.attemptSyncedFlush(syncedFlushService, new ShardId("test", i)).success());
}
assertEquals(SyncedFlushUtil.attemptSyncedFlush(internalCluster(), "test").failedShards(), 0);
assertSyncIdsNotNull();
}

View File

@ -18,8 +18,8 @@
*/
package org.elasticsearch.index.shard;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -254,13 +254,14 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexServiceSafe("test");
IndexShard indexShard = indexService.shard(0);
assertEquals(0, indexShard.getOperationsCount());
indexShard.incrementOperationCounter();
assertEquals(1, indexShard.getOperationsCount());
indexShard.incrementOperationCounter();
assertEquals(2, indexShard.getOperationsCount());
indexShard.incrementOperationCounter();
assertEquals(3, indexShard.getOperationsCount());
indexShard.decrementOperationCounter();
indexShard.decrementOperationCounter();
assertEquals(1, indexShard.getOperationsCount());
assertEquals(0, indexShard.getOperationsCount());
}
@Test

View File

@ -1,49 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices;
import org.elasticsearch.action.admin.indices.seal.SealIndicesResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import static java.lang.Thread.sleep;
import static org.hamcrest.Matchers.equalTo;
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0)
public class SealTests extends ElasticsearchIntegrationTest {
@Test
public void testUnallocatedShardsDoesNotHang() throws InterruptedException {
Settings.Builder settingsBuilder = Settings.builder()
.put("node.data", false)
.put("node.master", true)
.put("path.data", createTempDir().toString());
internalCluster().startNode(settingsBuilder.build());
// create an index but because no data nodes are available no shards will be allocated
createIndex("test");
// this should not hang but instead immediately return with empty result set
SealIndicesResponse sealIndicesResponse = client().admin().indices().prepareSealIndices("test").get();
// just to make sure the test actually tests the right thing
int numShards = client().admin().indices().prepareGetSettings("test").get().getIndexToSettings().get("test").getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, -1);
assertThat(sealIndicesResponse.results().size(), equalTo(numShards));
assertThat(sealIndicesResponse.results().iterator().next().failureReason(), equalTo("no active primary available"));
}
}

View File

@ -16,13 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices;
package org.elasticsearch.indices.flush;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.admin.indices.seal.SealIndicesResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -36,6 +35,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@ -43,7 +43,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static java.lang.Thread.sleep;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
@ -97,8 +96,16 @@ public class FlushTest extends ElasticsearchIntegrationTest {
assertNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}
SyncedFlushService.SyncedFlushResult result = SyncedFlushUtil.attemptSyncedFlush(internalCluster().getInstance(SyncedFlushService.class), new ShardId("test", 0));
assertTrue(result.success());
ShardsSyncedFlushResult result;
if (randomBoolean()) {
logger.info("--> sync flushing shard 0");
result = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), new ShardId("test", 0));
} else {
logger.info("--> sync flushing index [test]");
IndicesSyncedFlushResult indicesResult = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), "test");
result = indicesResult.getShardsResultPerIndex().get("test").get(0);
}
assertFalse(result.failed());
assertThat(result.totalShards(), equalTo(indexStats.getShards().length));
assertThat(result.successfulShards(), equalTo(indexStats.getShards().length));
@ -140,26 +147,7 @@ public class FlushTest extends ElasticsearchIntegrationTest {
}
@TestLogging("indices:TRACE")
public void testSyncedFlushWithApi() throws ExecutionException, InterruptedException, IOException {
createIndex("test");
ensureGreen();
IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
for (ShardStats shardStats : indexStats.getShards()) {
assertNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}
logger.info("--> trying sync flush");
SealIndicesResponse sealIndicesResponse = client().admin().indices().prepareSealIndices("test").get();
logger.info("--> sync flush done");
indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
for (ShardStats shardStats : indexStats.getShards()) {
assertNotNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}
}
@TestLogging("indices:TRACE")
public void testSyncedFlushWithApiAndConcurrentIndexing() throws Exception {
public void testSyncedFlushWithConcurrentIndexing() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(3);
createIndex("test");
@ -186,14 +174,12 @@ public class FlushTest extends ElasticsearchIntegrationTest {
assertNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}
logger.info("--> trying sync flush");
SealIndicesResponse sealIndicesResponse = client().admin().indices().prepareSealIndices("test").get();
IndicesSyncedFlushResult syncedFlushResult = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), "test");
logger.info("--> sync flush done");
stop.set(true);
indexingThread.join();
indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
for (ShardStats shardStats : indexStats.getShards()) {
assertFlushResponseEqualsShardStats(shardStats, sealIndicesResponse);
}
assertFlushResponseEqualsShardStats(indexStats.getShards(), syncedFlushResult.getShardsResultPerIndex().get("test"));
refresh();
assertThat(client().prepareCount().get().getCount(), equalTo((long) numDocs.get()));
logger.info("indexed {} docs", client().prepareCount().get().getCount());
@ -203,22 +189,38 @@ public class FlushTest extends ElasticsearchIntegrationTest {
assertThat(client().prepareCount().get().getCount(), equalTo((long) numDocs.get()));
}
private void assertFlushResponseEqualsShardStats(ShardStats shardStats, SealIndicesResponse sealIndicesResponse) {
private void assertFlushResponseEqualsShardStats(ShardStats[] shardsStats, List<ShardsSyncedFlushResult> syncedFlushResults) {
for (SyncedFlushService.SyncedFlushResult shardResult : sealIndicesResponse.results()) {
if (shardStats.getShardRouting().getId() == shardResult.shardId().getId()) {
for (Map.Entry<ShardRouting, SyncedFlushService.SyncedFlushResponse> singleResponse : shardResult.shardResponses().entrySet()) {
if (singleResponse.getKey().currentNodeId().equals(shardStats.getShardRouting().currentNodeId())) {
if (singleResponse.getValue().success()) {
assertNotNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
logger.info("sync flushed {} on node {}", singleResponse.getKey().shardId(), singleResponse.getKey().currentNodeId());
} else {
assertNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
logger.info("sync flush failed for {} on node {}", singleResponse.getKey().shardId(), singleResponse.getKey().currentNodeId());
for (final ShardStats shardStats : shardsStats) {
for (final ShardsSyncedFlushResult shardResult : syncedFlushResults) {
if (shardStats.getShardRouting().getId() == shardResult.shardId().getId()) {
for (Map.Entry<ShardRouting, SyncedFlushService.SyncedFlushResponse> singleResponse : shardResult.shardResponses().entrySet()) {
if (singleResponse.getKey().currentNodeId().equals(shardStats.getShardRouting().currentNodeId())) {
if (singleResponse.getValue().success()) {
logger.info("{} sync flushed on node {}", singleResponse.getKey().shardId(), singleResponse.getKey().currentNodeId());
assertNotNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
} else {
logger.info("{} sync flush failed for on node {}", singleResponse.getKey().shardId(), singleResponse.getKey().currentNodeId());
assertNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}
}
}
}
}
}
}
@Test
public void testUnallocatedShardsDoesNotHang() throws InterruptedException {
// create an index but disallow allocation
prepareCreate("test").setSettings(Settings.builder().put("index.routing.allocation.include._name", "nonexistent")).get();
// this should not hang but instead immediately return with empty result set
List<ShardsSyncedFlushResult> shardsResult = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), "test").getShardsResultPerIndex().get("test");
// just to make sure the test actually tests the right thing
int numShards = client().admin().indices().prepareGetSettings("test").get().getIndexToSettings().get("test").getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, -1);
assertThat(shardsResult.size(), equalTo(numShards));
assertThat(shardsResult.get(0).failureReason(), equalTo("no active shards"));
}
}

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices;
package org.elasticsearch.indices.flush;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
@ -27,17 +27,17 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import java.util.List;
import java.util.Map;
/**
*/
public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
public class SyncedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
public void testModificationPreventsSealing() throws InterruptedException {
public void testModificationPreventsFlushing() throws InterruptedException {
createIndex("test");
client().prepareIndex("test", "test", "1").setSource("{}").get();
IndexService test = getInstanceFromNode(IndicesService.class).indexService("test");
@ -46,18 +46,18 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class);
final ShardId shardId = shard.shardId();
final ClusterState state = getInstanceFromNode(ClusterService.class).state();
final IndexShardRoutingTable shardRoutingTable = flushService.getActiveShardRoutings(shardId, state);
final IndexShardRoutingTable shardRoutingTable = flushService.getShardRoutingTable(shardId, state);
final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
assertEquals("exactly one active shard", 1, activeShards.size());
Map<String, Engine.CommitId> commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId);
assertEquals("exactly one commit id", 1, commitIds.size());
client().prepareIndex("test", "test", "2").setSource("{}").get();
String syncId = Strings.base64UUID();
SyncedFlushUtil.LatchedListener<SyncedFlushService.SyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener();
flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId,listener);
SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener<>();
flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener);
listener.latch.await();
assertNull(listener.error);
SyncedFlushService.SyncedFlushResult syncedFlushResult = listener.result;
ShardsSyncedFlushResult syncedFlushResult = listener.result;
assertNotNull(syncedFlushResult);
assertEquals(0, syncedFlushResult.successfulShards());
assertEquals(1, syncedFlushResult.totalShards());
@ -66,9 +66,9 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
assertFalse(syncedFlushResult.shardResponses().get(activeShards.get(0)).success());
assertEquals("pending operations", syncedFlushResult.shardResponses().get(activeShards.get(0)).failureReason());
SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); // pull another commit and make sure we can't seal with the old one
SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); // pull another commit and make sure we can't sync-flush with the old one
listener = new SyncedFlushUtil.LatchedListener();
flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId,listener);
flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener);
listener.latch.await();
assertNull(listener.error);
syncedFlushResult = listener.result;
@ -79,7 +79,6 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
assertNotNull(syncedFlushResult.shardResponses().get(activeShards.get(0)));
assertFalse(syncedFlushResult.shardResponses().get(activeShards.get(0)).success());
assertEquals("commit has changed", syncedFlushResult.shardResponses().get(activeShards.get(0)).failureReason());
ElasticsearchAssertions.assertVersionSerializable(syncedFlushResult);
}
public void testSingleShardSuccess() throws InterruptedException {
@ -90,17 +89,16 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class);
final ShardId shardId = shard.shardId();
SyncedFlushUtil.LatchedListener<SyncedFlushService.SyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener();
SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener();
flushService.attemptSyncedFlush(shardId, listener);
listener.latch.await();
assertNull(listener.error);
SyncedFlushService.SyncedFlushResult syncedFlushResult = listener.result;
ShardsSyncedFlushResult syncedFlushResult = listener.result;
assertNotNull(syncedFlushResult);
assertEquals(1, syncedFlushResult.successfulShards());
assertEquals(1, syncedFlushResult.totalShards());
SyncedFlushService.SyncedFlushResponse response = syncedFlushResult.shardResponses().values().iterator().next();
assertTrue(response.success());
ElasticsearchAssertions.assertVersionSerializable(syncedFlushResult);
}
public void testSyncFailsIfOperationIsInFlight() throws InterruptedException {
@ -113,16 +111,15 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
final ShardId shardId = shard.shardId();
shard.incrementOperationCounter();
try {
SyncedFlushUtil.LatchedListener<SyncedFlushService.SyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener();
SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener<>();
flushService.attemptSyncedFlush(shardId, listener);
listener.latch.await();
assertNull(listener.error);
SyncedFlushService.SyncedFlushResult syncedFlushResult = listener.result;
ShardsSyncedFlushResult syncedFlushResult = listener.result;
assertNotNull(syncedFlushResult);
assertEquals(0, syncedFlushResult.successfulShards());
assertEquals(0, syncedFlushResult.totalShards());
assertEquals("operation counter on primary is non zero [2]", syncedFlushResult.failureReason());
ElasticsearchAssertions.assertVersionSerializable(syncedFlushResult);
assertNotEquals(0, syncedFlushResult.totalShards());
assertEquals("[1] ongoing operations on primary", syncedFlushResult.failureReason());
} finally {
shard.decrementOperationCounter();
}
@ -168,7 +165,7 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class);
final ShardId shardId = shard.shardId();
final ClusterState state = getInstanceFromNode(ClusterService.class).state();
final IndexShardRoutingTable shardRoutingTable = flushService.getActiveShardRoutings(shardId, state);
final IndexShardRoutingTable shardRoutingTable = flushService.getShardRoutingTable(shardId, state);
final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
assertEquals("exactly one active shard", 1, activeShards.size());
Map<String, Engine.CommitId> commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId);
@ -178,11 +175,11 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
}
client().admin().indices().prepareFlush("test").setForce(true).get();
String syncId = Strings.base64UUID();
final SyncedFlushUtil.LatchedListener<SyncedFlushService.SyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener();
flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, listener);
final SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener();
flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener);
listener.latch.await();
assertNull(listener.error);
SyncedFlushService.SyncedFlushResult syncedFlushResult = listener.result;
ShardsSyncedFlushResult syncedFlushResult = listener.result;
assertNotNull(syncedFlushResult);
assertEquals(0, syncedFlushResult.successfulShards());
assertEquals(1, syncedFlushResult.totalShards());
@ -190,7 +187,6 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
assertNotNull(syncedFlushResult.shardResponses().get(activeShards.get(0)));
assertFalse(syncedFlushResult.shardResponses().get(activeShards.get(0)).success());
assertEquals("commit has changed", syncedFlushResult.shardResponses().get(activeShards.get(0)).failureReason());
ElasticsearchAssertions.assertVersionSerializable(syncedFlushResult);
}
public void testFailWhenCommitIsMissing() throws InterruptedException {
@ -202,18 +198,18 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class);
final ShardId shardId = shard.shardId();
final ClusterState state = getInstanceFromNode(ClusterService.class).state();
final IndexShardRoutingTable shardRoutingTable = flushService.getActiveShardRoutings(shardId, state);
final IndexShardRoutingTable shardRoutingTable = flushService.getShardRoutingTable(shardId, state);
final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
assertEquals("exactly one active shard", 1, activeShards.size());
Map<String, Engine.CommitId> commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId);
assertEquals("exactly one commit id", 1, commitIds.size());
commitIds.clear(); // wipe it...
String syncId = Strings.base64UUID();
SyncedFlushUtil.LatchedListener<SyncedFlushService.SyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener();
flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, listener);
SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener();
flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener);
listener.latch.await();
assertNull(listener.error);
SyncedFlushService.SyncedFlushResult syncedFlushResult = listener.result;
ShardsSyncedFlushResult syncedFlushResult = listener.result;
assertNotNull(syncedFlushResult);
assertEquals(0, syncedFlushResult.successfulShards());
assertEquals(1, syncedFlushResult.totalShards());
@ -221,7 +217,6 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest {
assertNotNull(syncedFlushResult.shardResponses().get(activeShards.get(0)));
assertFalse(syncedFlushResult.shardResponses().get(activeShards.get(0)).success());
assertEquals("no commit id from pre-sync flush", syncedFlushResult.shardResponses().get(activeShards.get(0)).failureReason());
ElasticsearchAssertions.assertVersionSerializable(syncedFlushResult);
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.flush;
import com.carrotsearch.hppc.ObjectIntHashMap;
import com.carrotsearch.hppc.ObjectIntMap;
import org.elasticsearch.cluster.routing.ImmutableShardRouting;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.flush.IndicesSyncedFlushResult.ShardCounts;
import org.elasticsearch.indices.flush.SyncedFlushService.SyncedFlushResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ElasticsearchTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.test.XContentTestUtils.convertToMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
public class SyncedFlushUnitTests extends ElasticsearchTestCase {
private static class TestPlan {
public ShardCounts totalCounts;
public Map<String, ShardCounts> countsPerIndex = new HashMap<>();
public ObjectIntMap<String> expectedFailuresPerIndex = new ObjectIntHashMap<>();
public IndicesSyncedFlushResult result;
}
public void testIndicesSyncedFlushResult() throws IOException {
final TestPlan testPlan = createTestPlan();
assertThat(testPlan.result.totalShards(), equalTo(testPlan.totalCounts.total));
assertThat(testPlan.result.successfulShards(), equalTo(testPlan.totalCounts.successful));
assertThat(testPlan.result.failedShards(), equalTo(testPlan.totalCounts.failed));
assertThat(testPlan.result.restStatus(), equalTo(testPlan.totalCounts.failed > 0 ? RestStatus.CONFLICT : RestStatus.OK));
Map<String, Object> asMap = convertToMap(testPlan.result);
assertShardCount("_shards header", (Map<String, Object>) asMap.get("_shards"), testPlan.totalCounts);
assertThat("unexpected number of indices", asMap.size(), equalTo(1 + testPlan.countsPerIndex.size())); // +1 for the shards header
for (String index : testPlan.countsPerIndex.keySet()) {
Map<String, Object> indexMap = (Map<String, Object>) asMap.get(index);
assertShardCount(index, indexMap, testPlan.countsPerIndex.get(index));
List<Map<String, Object>> failureList = (List<Map<String, Object>>) indexMap.get("failures");
final int expectedFailures = testPlan.expectedFailuresPerIndex.get(index);
if (expectedFailures == 0) {
assertNull(index + " has unexpected failures", failureList);
} else {
assertNotNull(index + " should have failures", failureList);
assertThat(failureList, hasSize(expectedFailures));
}
}
}
private void assertShardCount(String name, Map<String, Object> header, ShardCounts expectedCounts) {
assertThat(name + " has unexpected total count", (Integer) header.get("total"), equalTo(expectedCounts.total));
assertThat(name + " has unexpected successful count", (Integer) header.get("successful"), equalTo(expectedCounts.successful));
assertThat(name + " has unexpected failed count", (Integer) header.get("failed"), equalTo(expectedCounts.failed));
}
protected TestPlan createTestPlan() {
final TestPlan testPlan = new TestPlan();
final Map<String, List<ShardsSyncedFlushResult>> indicesResults = new HashMap<>();
final int indexCount = randomIntBetween(1, 10);
int totalShards = 0;
int totalSuccesful = 0;
int totalFailed = 0;
for (int i = 0; i < indexCount; i++) {
final String index = "index_" + i;
int shards = randomIntBetween(1, 4);
int replicas = randomIntBetween(0, 2);
int successful = 0;
int failed = 0;
int failures = 0;
List<ShardsSyncedFlushResult> shardsResults = new ArrayList<>();
for (int shard = 0; shard < shards; shard++) {
final ShardId shardId = new ShardId(index, shard);
if (randomInt(5) < 2) {
// total shard failure
failed += replicas + 1;
failures++;
shardsResults.add(new ShardsSyncedFlushResult(shardId, replicas + 1, "simulated total failure"));
} else {
Map<ShardRouting, SyncedFlushResponse> shardResponses = new HashMap<>();
for (int copy = 0; copy < replicas + 1; copy++) {
final ShardRouting shardRouting = new ImmutableShardRouting(index, shard, "node_" + shardId + "_" + copy, null,
copy == 0, ShardRoutingState.STARTED, 0);
if (randomInt(5) < 2) {
// shard copy failure
failed++;
failures++;
shardResponses.put(shardRouting, new SyncedFlushResponse("copy failure " + shardId));
} else {
successful++;
shardResponses.put(shardRouting, new SyncedFlushResponse());
}
}
shardsResults.add(new ShardsSyncedFlushResult(shardId, "_sync_id_" + shard, replicas + 1, shardResponses));
}
}
indicesResults.put(index, shardsResults);
testPlan.countsPerIndex.put(index, new ShardCounts(shards * (replicas + 1), successful, failed));
testPlan.expectedFailuresPerIndex.put(index, failures);
totalFailed += failed;
totalShards += shards * (replicas + 1);
totalSuccesful += successful;
}
testPlan.result = new IndicesSyncedFlushResult(indicesResults);
testPlan.totalCounts = new ShardCounts(totalShards, totalSuccesful, totalFailed);
return testPlan;
}
}

View File

@ -16,16 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices;
package org.elasticsearch.indices.flush;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.SyncedFlushService;
import org.elasticsearch.test.InternalTestCluster;
import java.util.List;
import java.util.Map;
@ -38,11 +38,31 @@ public class SyncedFlushUtil {
}
/**
* Blocking single index version of {@link SyncedFlushService#attemptSyncedFlush(String[], IndicesOptions, ActionListener)}
*/
public static IndicesSyncedFlushResult attemptSyncedFlush(InternalTestCluster cluster, String index) {
SyncedFlushService service = cluster.getInstance(SyncedFlushService.class);
LatchedListener<IndicesSyncedFlushResult> listener = new LatchedListener();
service.attemptSyncedFlush(new String[]{index}, IndicesOptions.lenientExpandOpen(), listener);
try {
listener.latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (listener.error != null) {
throw ExceptionsHelper.convertToElastic(listener.error);
}
return listener.result;
}
/**
* Blocking version of {@link SyncedFlushService#attemptSyncedFlush(ShardId, ActionListener)}
*/
public static SyncedFlushService.SyncedFlushResult attemptSyncedFlush(SyncedFlushService service, ShardId shardId) {
LatchedListener<SyncedFlushService.SyncedFlushResult> listener = new LatchedListener();
public static ShardsSyncedFlushResult attemptSyncedFlush(InternalTestCluster cluster, ShardId shardId) {
SyncedFlushService service = cluster.getInstance(SyncedFlushService.class);
LatchedListener<ShardsSyncedFlushResult> listener = new LatchedListener();
service.attemptSyncedFlush(shardId, listener);
try {
listener.latch.await();

View File

@ -18,14 +18,15 @@
*/
package org.elasticsearch.test;
import com.carrotsearch.randomizedtesting.*;
import com.carrotsearch.randomizedtesting.RandomizedContext;
import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.carrotsearch.randomizedtesting.Randomness;
import com.carrotsearch.randomizedtesting.annotations.TestGroup;
import com.carrotsearch.randomizedtesting.generators.RandomInts;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.impl.client.HttpClients;
import org.apache.lucene.store.StoreRateLimiting;
@ -49,7 +50,6 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.optimize.OptimizeResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.seal.SealIndicesResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
@ -101,21 +101,18 @@ import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.FieldMapper.Loading;
import org.elasticsearch.index.mapper.internal.SizeFieldMapper;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.merge.policy.AbstractMergePolicyProvider;
import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
import org.elasticsearch.index.merge.policy.LogDocMergePolicyProvider;
import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider;
import org.elasticsearch.index.merge.policy.*;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogWriter;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.flush.IndicesSyncedFlushResult;
import org.elasticsearch.indices.flush.SyncedFlushService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.node.Node;
@ -128,45 +125,28 @@ import org.elasticsearch.test.rest.client.http.HttpRequestBuilder;
import org.elasticsearch.transport.netty.NettyTransport;
import org.hamcrest.Matchers;
import org.joda.time.DateTimeZone;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.*;
import java.io.IOException;
import java.io.InputStream;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.annotation.*;
import java.net.InetSocketAddress;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.XContentTestUtils.convertToMap;
import static org.elasticsearch.test.XContentTestUtils.mapsEqualIgnoringArrayOrder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.hamcrest.Matchers.*;
/**
* {@link ElasticsearchIntegrationTest} is an abstract base class to run integration
@ -244,7 +224,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
/**
* Annotation for third-party integration tests.
* <p>
* <p/>
* These are tests the require a third-party service in order to run. They
* may require the user to manually configure an external process (such as rabbitmq),
* or may additionally require some external configuration (e.g. AWS credentials)
@ -408,56 +388,56 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
}
mappings.startArray("dynamic_templates")
.startObject()
.startObject("template-strings")
.field("match_mapping_type", "string")
.startObject("template-strings")
.field("match_mapping_type", "string")
.startObject("mapping")
.startObject("fielddata")
.field(FieldDataType.FORMAT_KEY, randomFrom("paged_bytes", "fst"))
.field(Loading.KEY, randomLoadingValues())
.startObject("fielddata")
.field(FieldDataType.FORMAT_KEY, randomFrom("paged_bytes", "fst"))
.field(Loading.KEY, randomLoadingValues())
.endObject()
.endObject()
.endObject()
.endObject()
.startObject()
.startObject("template-longs")
.field("match_mapping_type", "long")
.startObject("mapping")
.field("doc_values", randomBoolean())
.startObject("fielddata")
.field(Loading.KEY, randomFrom(Loading.LAZY, Loading.EAGER))
.startObject("template-longs")
.field("match_mapping_type", "long")
.startObject("mapping")
.field("doc_values", randomBoolean())
.startObject("fielddata")
.field(Loading.KEY, randomFrom(Loading.LAZY, Loading.EAGER))
.endObject()
.endObject()
.endObject()
.endObject()
.startObject()
.startObject("template-doubles")
.field("match_mapping_type", "double")
.startObject("mapping")
.field("doc_values", randomBoolean())
.startObject("fielddata")
.field(Loading.KEY, randomFrom(Loading.LAZY, Loading.EAGER))
.startObject("template-doubles")
.field("match_mapping_type", "double")
.startObject("mapping")
.field("doc_values", randomBoolean())
.startObject("fielddata")
.field(Loading.KEY, randomFrom(Loading.LAZY, Loading.EAGER))
.endObject()
.endObject()
.endObject()
.endObject()
.startObject()
.startObject("template-geo_points")
.field("match_mapping_type", "geo_point")
.startObject("mapping")
.field("doc_values", randomBoolean())
.startObject("fielddata")
.field(Loading.KEY, randomFrom(Loading.LAZY, Loading.EAGER))
.startObject("template-geo_points")
.field("match_mapping_type", "geo_point")
.startObject("mapping")
.field("doc_values", randomBoolean())
.startObject("fielddata")
.field(Loading.KEY, randomFrom(Loading.LAZY, Loading.EAGER))
.endObject()
.endObject()
.endObject()
.endObject()
.startObject()
.startObject("template-booleans")
.field("match_mapping_type", "boolean")
.startObject("mapping")
.startObject("fielddata")
.field(FieldDataType.FORMAT_KEY, randomFrom("array", "doc_values"))
.field(Loading.KEY, randomFrom(Loading.LAZY, Loading.EAGER))
.startObject("template-booleans")
.field("match_mapping_type", "boolean")
.startObject("mapping")
.startObject("fielddata")
.field(FieldDataType.FORMAT_KEY, randomFrom("array", "doc_values"))
.field(Loading.KEY, randomFrom(Loading.LAZY, Loading.EAGER))
.endObject()
.endObject()
.endObject()
@ -512,7 +492,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
}
if (random.nextBoolean()) {
builder.put(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, RandomPicks.randomFrom(random, TranslogWriter.Type.values()).name());
builder.put(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, RandomPicks.randomFrom(random, TranslogWriter.Type.values()).name());
}
if (random.nextBoolean()) {
@ -652,9 +632,9 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
if (currentClusterScope != Scope.TEST) {
MetaData metaData = client().admin().cluster().prepareState().execute().actionGet().getState().getMetaData();
assertThat("test leaves persistent cluster metadata behind: " + metaData.persistentSettings().getAsMap(), metaData
.persistentSettings().getAsMap().size(), equalTo(0));
.persistentSettings().getAsMap().size(), equalTo(0));
assertThat("test leaves transient cluster metadata behind: " + metaData.transientSettings().getAsMap(), metaData
.transientSettings().getAsMap().size(), equalTo(0));
.transientSettings().getAsMap().size(), equalTo(0));
}
ensureClusterSizeConsistency();
ensureClusterStateConsistency();
@ -870,11 +850,11 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
public void run() {
for (Client client : clients()) {
ClusterHealthResponse clusterHealth = client.admin().cluster().prepareHealth().setLocal(true).get();
assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0));
assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0));
PendingClusterTasksResponse pendingTasks = client.admin().cluster().preparePendingClusterTasks().setLocal(true).get();
assertThat("client " + client + " still has pending tasks " + pendingTasks.prettyPrint(), pendingTasks, Matchers.emptyIterable());
clusterHealth = client.admin().cluster().prepareHealth().setLocal(true).get();
assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0));
assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0));
}
}
});
@ -961,7 +941,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
* It is useful to ensure that all action on the cluster have finished and all shards that were currently relocating
* are now allocated and started.
*/
public ClusterHealthStatus ensureGreen(String... indices) {
public ClusterHealthStatus ensureGreen(String... indices) {
return ensureGreen(TimeValue.timeValueSeconds(30), indices);
}
@ -1248,11 +1228,11 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
/**
* Syntactic sugar for:
*
* <p/>
* <pre>
* return client().prepareIndex(index, type, id).setSource(source).execute().actionGet();
* </pre>
*
* <p/>
* where source is a String.
*/
protected final IndexResponse index(String index, String type, String id, String source) {
@ -1375,7 +1355,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
* @param forceRefresh if <tt>true</tt> all involved indices are refreshed once the documents are indexed.
* @param dummyDocuments if <tt>true</tt> some empty dummy documents may be randomly inserted into the document list and deleted once
* all documents are indexed. This is useful to produce deleted documents on the server side.
* @param maybeFlush if <tt>true</tt> this method may randomly execute full flushes after index operations.
* @param maybeFlush if <tt>true</tt> this method may randomly execute full flushes after index operations.
* @param builders the documents to index.
*/
public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean maybeFlush, List<IndexRequestBuilder> builders) throws InterruptedException, ExecutionException {
@ -1509,8 +1489,8 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
client().admin().indices().prepareFlush(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute(
new LatchedActionListener<FlushResponse>(newLatch(inFlightAsyncOperations)));
} else {
client().admin().indices().prepareSealIndices(indices).execute(
new LatchedActionListener<SealIndicesResponse>(newLatch(inFlightAsyncOperations)));
internalCluster().getInstance(SyncedFlushService.class).attemptSyncedFlush(indices, IndicesOptions.lenientExpandOpen(),
new LatchedActionListener<IndicesSyncedFlushResult>(newLatch(inFlightAsyncOperations)));
}
} else if (rarely()) {
client().admin().indices().prepareOptimize(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).setMaxNumSegments(between(1, 10)).setFlush(maybeFlush && randomBoolean()).execute(
@ -1669,7 +1649,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
private int getMinNumDataNodes() {
ClusterScope annotation = getAnnotation(this.getClass());
return annotation == null || annotation.minNumDataNodes() == -1 ? InternalTestCluster.DEFAULT_MIN_NUM_DATA_NODES : annotation.minNumDataNodes();
return annotation == null || annotation.minNumDataNodes() == -1 ? InternalTestCluster.DEFAULT_MIN_NUM_DATA_NODES : annotation.minNumDataNodes();
}
private int getMaxNumDataNodes() {
@ -1702,7 +1682,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "1b")
.put("script.indexed", "on")
.put("script.inline", "on")
// wait short time for other active shards before actually deleting, default 30s not needed in tests
// wait short time for other active shards before actually deleting, default 30s not needed in tests
.put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(1, TimeUnit.SECONDS))
.build();
}
@ -1886,7 +1866,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
if (shardRouting.currentNodeId() != null && index.equals(shardRouting.getIndex())) {
if (shardRouting.currentNodeId() != null && index.equals(shardRouting.getIndex())) {
String name = clusterState.nodes().get(shardRouting.currentNodeId()).name();
nodes.add(name);
assertThat("Allocated on new node: " + name, Regex.simpleMatch(pattern, name), is(true));

View File

@ -78,7 +78,6 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardModule;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStoreModule;
import org.elasticsearch.index.translog.TranslogConfig;
@ -975,7 +974,7 @@ public final class InternalTestCluster extends TestCluster {
@Override
public void beforeIndexDeletion() {
// Check that the operations counter on index shard has reached 1.
// Check that the operations counter on index shard has reached 0.
// The assumption here is that after a test there are no ongoing write operations.
// test that have ongoing write operations after the test (for example because ttl is used
// and not all docs have been purged after the test) and inherit from
@ -1018,10 +1017,7 @@ public final class InternalTestCluster extends TestCluster {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
assertThat("index shard counter on shard " + indexShard.shardId() + " on node " + nodeAndClient.name + " not 0 or 1 ", indexShard.getOperationsCount(), anyOf(equalTo(1), equalTo(0)));
if (indexShard.getOperationsCount() == 0) {
assertThat(indexShard.state(), equalTo(IndexShardState.CLOSED));
}
assertThat("index shard counter on shard " + indexShard.shardId() + " on node " + nodeAndClient.name + " not 0", indexShard.getOperationsCount(), equalTo(0));
}
}
}