diff --git a/docs/reference/indices.asciidoc b/docs/reference/indices.asciidoc index fe1b20b05e9..06a24821440 100644 --- a/docs/reference/indices.asciidoc +++ b/docs/reference/indices.asciidoc @@ -59,7 +59,6 @@ and warmers. * <> * <> * <> -* <> * <> -- diff --git a/docs/reference/indices/flush.asciidoc b/docs/reference/indices/flush.asciidoc index b2b67474623..118a68b7128 100644 --- a/docs/reference/indices/flush.asciidoc +++ b/docs/reference/indices/flush.asciidoc @@ -10,8 +10,9 @@ trigger flush operations as required in order to clear memory. [source,js] -------------------------------------------------- -$ curl -XPOST 'http://localhost:9200/twitter/_flush' +POST /twitter/_flush -------------------------------------------------- +// AUTOSENSE [float] [[flush-parameters]] @@ -39,7 +40,198 @@ or even on `_all` the indices. [source,js] -------------------------------------------------- -$ curl -XPOST 'http://localhost:9200/kimchy,elasticsearch/_flush' +POST /kimchy,elasticsearch/_flush -$ curl -XPOST 'http://localhost:9200/_flush' +POST /_flush -------------------------------------------------- +// AUTOSENSE + +[[indices-synced-flush]] +=== Synced Flush + +Elasticsearch tracks the indexing activity of each shard. Shards that have not +received any indexing operations for 30 minutes are automatically marked as inactive. This presents +an opportunity for Elasticsearch to reduce shard resources and also perform +a special kind of flush, called `synced flush`. A synced flush performs a normal flush, then adds +a generated unique marker (sync_id) to all shards. + +Since the sync id marker was added when there were no ongoing indexing operations, it can +be used as a quick way to check if the two shards' lucene indices are identical. This quick sync id +comparison (if present) is used during recovery or restarts to skip the first and +most costly phase of the process. In that case, no segment files need to be copied and +the transaction log replay phase of the recovery can start immediately. Note that since the sync id +marker was applied together with a flush, it is very likely that the transaction log will be empty, +speeding up recoveries even more. + +This is particularly useful for use cases having lots of indices which are +never or very rarely updated, such as time based data. This use case typically generates lots of indices whose +recovery without the synced flush marker would take a long time. + +To check whether a shard has a marker or not, look for the `commit` section of shard stats returned by +the <> API: + +[source,bash] +-------------------------------------------------- +GET /twitter/_stats/commit?level=shards +-------------------------------------------------- +// AUTOSENSE + + +which returns something similar to: + +[source,js] +-------------------------------------------------- +{ + ... + "indices": { + "twitter": { + "primaries": {}, + "total": {}, + "shards": { + "0": [ + { + "routing": { + ... + }, + "commit": { + "id": "te7zF7C4UsirqvL6jp/vUg==", + "generation": 2, + "user_data": { + "sync_id": "AU2VU0meX-VX2aNbEUsD" <1>, + ... + }, + "num_docs": 0 + } + } + ... + ], + ... + } + } + } +} +-------------------------------------------------- +<1> the `sync id` marker + +[float] +=== Synced Flush API + +The Synced Flush API allows an administrator to initiate a synced flush manually. This can be particularly useful for +a planned (rolling) cluster restart where you can stop indexing and don't want to wait the default 30 minutes for +idle indices to be sync-flushed automatically. + +While handy, there are a couple of caveats for this API: + +1. Synced flush is a best effort operation. Any ongoing indexing operations will cause +the synced flush to fail on that shard. This means that some shards may be synced flushed while others aren't. See below for more. +2. The `sync_id` marker is removed as soon as the shard is flushed again. That is because a flush replaces the low level +lucene commit point where the marker is stored. Uncommitted operations in the transaction log do not remove the marker. +In practice, one should consider any indexing operation on an index as removing the marker as a flush can be triggered by Elasticsearch +at any time. + + +NOTE: It is harmless to request a synced flush while there is ongoing indexing. Shards that are idle will succeed and shards + that are not will fail. Any shards that succeeded will have faster recovery times. + + +[source,bash] +-------------------------------------------------- +POST /twitter/_flush/synced +-------------------------------------------------- +// AUTOSENSE + +The response contains details about how many shards were successfully sync-flushed and information about any failure. + +Here is what it looks like when all shards of a two shards and one replica index successfully +sync-flushed: + +[source,js] +-------------------------------------------------- +{ + "_shards": { + "total": 4, + "successful": 4, + "failed": 0 + }, + "twitter": { + "total": 4, + "successful": 4, + "failed": 0 + } +} +-------------------------------------------------- + + +Here is what it looks like when one shard group failed due to pending operations: + +[source,js] +-------------------------------------------------- +{ + "_shards": { + "total": 4, + "successful": 2, + "failed": 2 + }, + "twitter": { + "total": 4, + "successful": 2, + "failed": 2, + "failures": [ + { + "shard": 1, + "reason": "[2] ongoing operations on primary" + } + ] + } +} +-------------------------------------------------- + +NOTE: The above error is shown when the synced flush failes due to concurrent indexing operations. The HTTP +status code in that case will be `409 CONFLICT`. + +Sometimes the failures are specific to a shard copy. The copies that failed will not be eligible for +fast recovery but those that succeeded still will be. This case is reported as follows: + +[source,js] +-------------------------------------------------- +{ + "_shards": { + "total": 4, + "successful": 1, + "failed": 1 + }, + "twitter": { + "total": 4, + "successful": 3, + "failed": 1, + "failures": [ + { + "shard": 1, + "reason": "unexpected error", + "routing": { + "state": "STARTED", + "primary": false, + "node": "SZNr2J_ORxKTLUCydGX4zA", + "relocating_node": null, + "shard": 1, + "index": "twitter" + } + } + ] + } +} +-------------------------------------------------- + + +NOTE: When a shard copy fails to sync-flush, the HTTP status code returned will be `409 CONFLICT`. + +The synced flush API can be applied to more than one index with a single call, +or even on `_all` the indices. + +[source,js] +-------------------------------------------------- +POST /kimchy,elasticsearch/_flush/synced + +POST /_flush/synced +-------------------------------------------------- +// AUTOSENSE \ No newline at end of file diff --git a/docs/reference/indices/seal.asciidoc b/docs/reference/indices/seal.asciidoc deleted file mode 100644 index 86ad42a40da..00000000000 --- a/docs/reference/indices/seal.asciidoc +++ /dev/null @@ -1,91 +0,0 @@ -[[indices-seal]] -== Seal - -The seal API flushes and adds a "seal" marker to the shards of one or more -indices. The seal is used during recovery or restarts to skip the first and -most costly phase of the process if all copies of the shard have the same seal. -No segment files need to be copied and the transaction log replay phase of the -recovery can start immediately which makes recovery much faster. - -There are two important points about seals: -1. They are best effort in that if there are any outstanding write operations -while the seal operation is being performed then the shards which those writes -target won't be sealed but all others will be. See below for more. -2. The seal breaks as soon as the shard issues a new lucene commit. Uncommitted -operations in the transaction log do not break the seal. That is because a seal -marks a point in time snapshot of the segments, a low level lucene commit. -Practically that means that every write operation on the index will remove the -seal. - -[source,bash] --------------------------------------------------- -$ curl -XPOST 'http://localhost:9200/twitter/_seal' --------------------------------------------------- - -The response contains details about which shards wrote the seal and the reason -in case they failed to write the seal. - -Here is what it looks like when all copies single shard index successfully -wrote the seal: - -[source,js] --------------------------------------------------- -{ - "twitter": [ - { - "shard_id": 0, - "responses": { - "5wjOIntuRqy9F_7JRrrLwA": "success", - "M2iCBe-nS5yaInE8volfSg": "success" - }, - "message": "success" - } -} --------------------------------------------------- - - -Here is what it looks like when one copy fails: - -[source,js] --------------------------------------------------- -{ - "twitter": [ - { - "shard_id": 0, - "responses": { - "M2iCBe-nS5yaInE8volfSg": "pending operations", - "5wjOIntuRqy9F_7JRrrLwA": "success" - }, - "message": "failed on some copies" - } -} --------------------------------------------------- - - -Sometimes the failures can be shard wide and they'll look like this: - -[source,js] --------------------------------------------------- -{ - "twitter": [ - { - "shard_id": 0, - "message": "operation counter on primary is non zero [2]" - } -} --------------------------------------------------- - - -[float] -[[seal-multi-index]] -=== Multi Index - -The seal API can be applied to more than one index with a single call, -or even on `_all` the indices. - -[source,js] --------------------------------------------------- -curl -XPOST 'http://localhost:9200/kimchy,elasticsearch/_seal' - -curl -XPOST 'http://localhost:9200/_seal' --------------------------------------------------- diff --git a/docs/reference/setup/upgrade.asciidoc b/docs/reference/setup/upgrade.asciidoc index c477f5ec9c7..9f46fbaf059 100644 --- a/docs/reference/setup/upgrade.asciidoc +++ b/docs/reference/setup/upgrade.asciidoc @@ -85,13 +85,27 @@ This syntax applies to Elasticsearch 1.0 and later: [source,sh] -------------------------------------------------- - curl -XPUT localhost:9200/_cluster/settings -d '{ - "transient" : { - "cluster.routing.allocation.enable" : "none" - } - }' +curl -XPUT localhost:9200/_cluster/settings -d '{ + "transient" : { + "cluster.routing.allocation.enable" : "none" + } +}' -------------------------------------------------- +* There is no problem continuing to index while doing the upgrade. However, you can speed the process considerably +by *temporarily* stopping non-essential indexing and issuing a manual <>. +A synced flush is special kind of flush which can seriously speed up recovery of shards. Elasticsearch automatically +uses it when an index has been inactive for a while (default is `30m`) but you can manuallky trigger it using the following command: + +[source,sh] +-------------------------------------------------- +curl -XPOST localhost:9200/_all/_flush/synced +-------------------------------------------------- + +Note that a synced flush call is a best effort operation. It will fail there are any pending indexing operations. It is safe to issue +it multiple times if needed. + + * Shut down a single node within the cluster. * Confirm that all shards are correctly reallocated to the remaining running nodes. @@ -110,11 +124,11 @@ This syntax applies to Elasticsearch 1.0 and later: [source,sh] -------------------------------------------------- - curl -XPUT localhost:9200/_cluster/settings -d '{ - "transient" : { - "cluster.routing.allocation.enable" : "all" - } - }' +curl -XPUT localhost:9200/_cluster/settings -d '{ + "transient" : { + "cluster.routing.allocation.enable" : "all" + } +}' -------------------------------------------------- * Observe that all shards are properly allocated on all nodes. Balancing may take some time. @@ -150,11 +164,11 @@ This syntax is from versions prior to 1.0: [source,sh] -------------------------------------------------- - curl -XPUT localhost:9200/_cluster/settings -d '{ - "persistent" : { - "cluster.routing.allocation.disable_allocation" : true - } - }' +curl -XPUT localhost:9200/_cluster/settings -d '{ + "persistent" : { + "cluster.routing.allocation.disable_allocation" : true + } +}' -------------------------------------------------- * Stop all Elasticsearch services on all nodes in the cluster. @@ -169,12 +183,12 @@ This syntax is from versions prior to 1.0: This syntax is from release 1.0 and later: [source,sh] ------------------------------------------------------ - curl -XPUT localhost:9200/_cluster/settings -d '{ - "persistent" : { - "cluster.routing.allocation.disable_allocation": false, - "cluster.routing.allocation.enable" : "all" - } - }' +curl -XPUT localhost:9200/_cluster/settings -d '{ + "persistent" : { + "cluster.routing.allocation.disable_allocation": false, + "cluster.routing.allocation.enable" : "all" + } +}' ------------------------------------------------------ The cluster upgrade can be streamlined by installing the software before stopping cluster services. If this is done, testing must be performed to ensure that no production data or configuration files are overwritten prior to restart. diff --git a/rest-api-spec/api/indices.flush_synced.json b/rest-api-spec/api/indices.flush_synced.json new file mode 100644 index 00000000000..28cad291e6a --- /dev/null +++ b/rest-api-spec/api/indices.flush_synced.json @@ -0,0 +1,39 @@ +{ + "indices.flush.synced": { + "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/indices-flush.html", + "methods": ["POST", "GET"], + "url": { + "path": "/_flush/synced", + "paths": [ + "/_flush/synced", + "/{index}/_flush/synced" + ], + "parts": { + "index": { + "type" : "list", + "description" : "A comma-separated list of index names; use `_all` or empty string for all indices" + }, + "ignore_unavailable": { + "type": "boolean", + "description": "Whether specified concrete indices should be ignored when unavailable (missing or closed)" + }, + "allow_no_indices": { + "type": "boolean", + "description": "Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified)" + }, + "expand_wildcards": { + "type": "enum", + "options": [ + "open", + "closed", + "none", + "all" + ], + "default": "open", + "description": "Whether to expand wildcard expression to concrete indices that are open, closed or both." + } + } + }, + "body": null + } +} diff --git a/rest-api-spec/api/indices.seal.json b/rest-api-spec/api/indices.seal.json deleted file mode 100644 index 3377d6e88a1..00000000000 --- a/rest-api-spec/api/indices.seal.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "indices.seal": { - "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/indices-seal.html", - "methods": ["POST", "GET"], - "url": { - "path": "/_seal", - "paths": ["/_seal", "/{index}/_seal"], - "parts": { - "index": { - "type" : "list", - "description" : "A comma-separated list of index names; use `_all` or empty string for all indices" - } - } - }, - "body": null - } -} diff --git a/rest-api-spec/test/indices.seal/10_basic.yaml b/rest-api-spec/test/indices.flush/10_basic.yaml similarity index 74% rename from rest-api-spec/test/indices.seal/10_basic.yaml rename to rest-api-spec/test/indices.flush/10_basic.yaml index 5277adc528d..f85458da69e 100644 --- a/rest-api-spec/test/indices.seal/10_basic.yaml +++ b/rest-api-spec/test/indices.flush/10_basic.yaml @@ -1,5 +1,5 @@ --- -"Index seal rest test": +"Index synced flush rest test": - do: indices.create: index: testing @@ -8,8 +8,11 @@ cluster.health: wait_for_status: yellow - do: - indices.seal: + indices.flush.synced: index: testing + + - is_false: _shards.failed + - do: indices.stats: {level: shards} diff --git a/src/main/java/org/elasticsearch/action/ActionModule.java b/src/main/java/org/elasticsearch/action/ActionModule.java index 7bb66260a58..ee48de4fc23 100644 --- a/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/src/main/java/org/elasticsearch/action/ActionModule.java @@ -103,8 +103,6 @@ import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettin import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction; import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction; -import org.elasticsearch.action.admin.indices.seal.SealIndicesAction; -import org.elasticsearch.action.admin.indices.seal.TransportSealIndicesAction; import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateAction; import org.elasticsearch.action.admin.indices.template.delete.TransportDeleteIndexTemplateAction; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesAction; @@ -260,7 +258,6 @@ public class ActionModule extends AbstractModule { registerAction(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class); registerAction(RefreshAction.INSTANCE, TransportRefreshAction.class); registerAction(FlushAction.INSTANCE, TransportFlushAction.class); - registerAction(SealIndicesAction.INSTANCE, TransportSealIndicesAction.class); registerAction(OptimizeAction.INSTANCE, TransportOptimizeAction.class); registerAction(UpgradeAction.INSTANCE, TransportUpgradeAction.class); registerAction(UpgradeStatusAction.INSTANCE, TransportUpgradeStatusAction.class); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/seal/SealIndicesAction.java b/src/main/java/org/elasticsearch/action/admin/indices/seal/SealIndicesAction.java deleted file mode 100644 index fbb01b05abe..00000000000 --- a/src/main/java/org/elasticsearch/action/admin/indices/seal/SealIndicesAction.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.seal; - -import org.elasticsearch.action.Action; -import org.elasticsearch.client.ElasticsearchClient; - -/** - */ -public class SealIndicesAction extends Action { - - public static final SealIndicesAction INSTANCE = new SealIndicesAction(); - public static final String NAME = "indices:admin/seal"; - - private SealIndicesAction() { - super(NAME); - } - - @Override - public SealIndicesResponse newResponse() { - return new SealIndicesResponse(); - } - - @Override - public SealIndicesRequestBuilder newRequestBuilder(ElasticsearchClient client) { - return new SealIndicesRequestBuilder(client, this); - } -} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/seal/SealIndicesRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/seal/SealIndicesRequest.java deleted file mode 100644 index 2e8e3ac0cf8..00000000000 --- a/src/main/java/org/elasticsearch/action/admin/indices/seal/SealIndicesRequest.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.seal; - -import org.elasticsearch.action.support.broadcast.BroadcastRequest; - -import java.util.Arrays; - -/** - * A request to seal one or more indices. - */ -public class SealIndicesRequest extends BroadcastRequest { - - SealIndicesRequest() { - } - - /** - * Constructs a seal request against one or more indices. If nothing is provided, all indices will - * be sealed. - */ - public SealIndicesRequest(String... indices) { - super(indices); - } - - @Override - public String toString() { - return "SealIndicesRequest{" + - "indices=" + Arrays.toString(indices) + - ", indicesOptions=" + indicesOptions() + - '}'; - } -} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/seal/SealIndicesRequestBuilder.java b/src/main/java/org/elasticsearch/action/admin/indices/seal/SealIndicesRequestBuilder.java deleted file mode 100644 index a424ab3fc3b..00000000000 --- a/src/main/java/org/elasticsearch/action/admin/indices/seal/SealIndicesRequestBuilder.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.seal; - -import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.client.ElasticsearchClient; - -/** - * - */ -public class SealIndicesRequestBuilder extends ActionRequestBuilder { - - public SealIndicesRequestBuilder(ElasticsearchClient client, SealIndicesAction action) { - super(client, action, new SealIndicesRequest()); - } - - public SealIndicesRequestBuilder indices(String ... indices) { - request.indices(indices); - return this; - } -} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/seal/SealIndicesResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/seal/SealIndicesResponse.java deleted file mode 100644 index 1dfd47795a5..00000000000 --- a/src/main/java/org/elasticsearch/action/admin/indices/seal/SealIndicesResponse.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.seal; - -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.indices.SyncedFlushService; -import org.elasticsearch.rest.RestStatus; - -import java.io.IOException; -import java.util.*; - -/** - * A response to a seal action on several indices. - */ -public class SealIndicesResponse extends ActionResponse implements ToXContent { - - final private Set results; - - private RestStatus restStatus; - - SealIndicesResponse() { - results = new HashSet<>(); - } - - SealIndicesResponse(Set results) { - this.results = results; - if (allShardsFailed()) { - restStatus = RestStatus.CONFLICT; - } else if (someShardsFailed()) { - restStatus = RestStatus.PARTIAL_CONTENT; - } else { - restStatus = RestStatus.OK; - } - } - - public RestStatus status() { - return restStatus; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - int size = in.readVInt(); - results.clear(); - for (int i = 0; i < size; i++) { - SyncedFlushService.SyncedFlushResult syncedFlushResult = new SyncedFlushService.SyncedFlushResult(); - syncedFlushResult.readFrom(in); - results.add(syncedFlushResult); - } - restStatus = RestStatus.readFrom(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeVInt(results.size()); - for (SyncedFlushService.SyncedFlushResult syncedFlushResult : results) { - syncedFlushResult.writeTo(out); - } - RestStatus.writeTo(out, restStatus); - } - - public Set results() { - return results; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - Map> allResults = new HashMap<>(); - - // first, sort everything by index and shard id - for (SyncedFlushService.SyncedFlushResult result : results) { - String indexName = result.getShardId().index().name(); - int shardId = result.getShardId().getId(); - - if (allResults.get(indexName) == null) { - // no results yet for this index - allResults.put(indexName, new TreeMap()); - } - if (result.shardResponses().size() > 0) { - Map shardResponses = new HashMap<>(); - for (Map.Entry shardResponse : result.shardResponses().entrySet()) { - shardResponses.put(shardResponse.getKey(), shardResponse.getValue()); - } - allResults.get(indexName).put(shardId, shardResponses); - } else { - allResults.get(indexName).put(shardId, result.failureReason()); - } - } - for (Map.Entry> result : allResults.entrySet()) { - builder.startArray(result.getKey()); - for (Map.Entry shardResponse : result.getValue().entrySet()) { - builder.startObject(); - builder.field("shard_id", shardResponse.getKey()); - if (shardResponse.getValue() instanceof Map) { - builder.startObject("responses"); - Map results = (Map) shardResponse.getValue(); - boolean success = true; - for (Map.Entry shardCopy : results.entrySet()) { - builder.field(shardCopy.getKey().currentNodeId(), shardCopy.getValue().success() ? "success" : shardCopy.getValue().failureReason()); - if (shardCopy.getValue().success() == false) { - success = false; - } - } - builder.endObject(); - builder.field("message", success ? "success" : "failed on some copies"); - - } else { - builder.field("message", shardResponse.getValue()); // must be a string - } - builder.endObject(); - } - builder.endArray(); - } - return builder; - } - - public boolean allShardsFailed() { - for (SyncedFlushService.SyncedFlushResult result : results) { - if (result.success()) { - return false; - } - if (result.shardResponses().size() > 0) { - for (Map.Entry shardResponse : result.shardResponses().entrySet()) { - if (shardResponse.getValue().success()) { - return false; - } - } - } - } - return true; - } - - public boolean someShardsFailed() { - for (SyncedFlushService.SyncedFlushResult result : results) { - if (result.success() == false) { - return true; - } - if (result.shardResponses().size() > 0) { - for (Map.Entry shardResponse : result.shardResponses().entrySet()) { - if (shardResponse.getValue().success() == false) { - return true; - } - } - } - } - return false; - } -} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/seal/TransportSealIndicesAction.java b/src/main/java/org/elasticsearch/action/admin/indices/seal/TransportSealIndicesAction.java deleted file mode 100644 index 71a91de04b4..00000000000 --- a/src/main/java/org/elasticsearch/action/admin/indices/seal/TransportSealIndicesAction.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.seal; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.CountDown; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.SyncedFlushService; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; - -import java.util.Set; - -/** - */ -public class TransportSealIndicesAction extends HandledTransportAction { - - - final private SyncedFlushService syncedFlushService; - final private ClusterService clusterService; - - @Inject - public TransportSealIndicesAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, SyncedFlushService syncedFlushService, ClusterService clusterService) { - super(settings, SealIndicesAction.NAME, threadPool, transportService, actionFilters, SealIndicesRequest.class); - this.syncedFlushService = syncedFlushService; - this.clusterService = clusterService; - } - - @Override - protected void doExecute(final SealIndicesRequest request, final ActionListener listener) { - ClusterState state = clusterService.state(); - String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices()); - GroupShardsIterator primaries = state.routingTable().activePrimaryShardsGrouped(concreteIndices, true); - final Set results = ConcurrentCollections.newConcurrentSet(); - - final CountDown countDown = new CountDown(primaries.size()); - - for (final ShardIterator shard : primaries) { - if (shard.size() == 0) { - results.add(new SyncedFlushService.SyncedFlushResult(shard.shardId(), "no active primary available")); - if (countDown.countDown()) { - listener.onResponse(new SealIndicesResponse(results)); - } - } else { - final ShardId shardId = shard.shardId(); - syncedFlushService.attemptSyncedFlush(shardId, new ActionListener() { - @Override - public void onResponse(SyncedFlushService.SyncedFlushResult syncedFlushResult) { - results.add(syncedFlushResult); - if (countDown.countDown()) { - listener.onResponse(new SealIndicesResponse(results)); - } - } - - @Override - public void onFailure(Throwable e) { - logger.debug("{} unexpected error while executing synced flush", shardId); - results.add(new SyncedFlushService.SyncedFlushResult(shardId, e.getMessage())); - if (countDown.countDown()) { - listener.onResponse(new SealIndicesResponse(results)); - } - } - }); - } - } - - } -} diff --git a/src/main/java/org/elasticsearch/client/IndicesAdminClient.java b/src/main/java/org/elasticsearch/client/IndicesAdminClient.java index ae16d7b36d2..05bcc56711f 100644 --- a/src/main/java/org/elasticsearch/client/IndicesAdminClient.java +++ b/src/main/java/org/elasticsearch/client/IndicesAdminClient.java @@ -84,9 +84,6 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRespons import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; -import org.elasticsearch.action.admin.indices.seal.SealIndicesRequest; -import org.elasticsearch.action.admin.indices.seal.SealIndicesRequestBuilder; -import org.elasticsearch.action.admin.indices.seal.SealIndicesResponse; import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest; import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequestBuilder; import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse; @@ -123,7 +120,6 @@ import org.elasticsearch.common.Nullable; */ public interface IndicesAdminClient extends ElasticsearchClient { - /** * Indices Exists. * @@ -368,27 +364,6 @@ public interface IndicesAdminClient extends ElasticsearchClient { */ FlushRequestBuilder prepareFlush(String... indices); - /** - * Explicitly sync flush one or more indices - * - * @param request The seal indices request - * @return A result future - */ - ActionFuture sealIndices(SealIndicesRequest request); - - /** - * Explicitly sync flush one or more indices - * - * @param request The seal indices request - * @param listener A listener to be notified with a result - */ - void sealIndices(SealIndicesRequest request, ActionListener listener); - - /** - * Explicitly seal one or more indices - */ - SealIndicesRequestBuilder prepareSealIndices(String... indices); - /** * Explicitly optimize one or more indices into a the number of segments. * diff --git a/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 625a469470d..761acd011d1 100644 --- a/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -180,10 +180,6 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; -import org.elasticsearch.action.admin.indices.seal.SealIndicesAction; -import org.elasticsearch.action.admin.indices.seal.SealIndicesRequest; -import org.elasticsearch.action.admin.indices.seal.SealIndicesRequestBuilder; -import org.elasticsearch.action.admin.indices.seal.SealIndicesResponse; import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateAction; import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest; import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequestBuilder; @@ -1348,21 +1344,6 @@ public abstract class AbstractClient extends AbstractComponent implements Client return new FlushRequestBuilder(this, FlushAction.INSTANCE).setIndices(indices); } - @Override - public ActionFuture sealIndices(SealIndicesRequest request) { - return execute(SealIndicesAction.INSTANCE, request); - } - - @Override - public void sealIndices(SealIndicesRequest request, ActionListener listener) { - execute(SealIndicesAction.INSTANCE, request, listener); - } - - @Override - public SealIndicesRequestBuilder prepareSealIndices(String... indices) { - return new SealIndicesRequestBuilder(this, SealIndicesAction.INSTANCE).indices(indices); - } - @Override public void getMappings(GetMappingsRequest request, ActionListener listener) { execute(GetMappingsAction.INSTANCE, request, listener); diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/IndexShard.java index e6222003651..07b4bf1f940 100644 --- a/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -21,7 +21,6 @@ package org.elasticsearch.index.shard; import com.google.common.base.Charsets; import com.google.common.base.Preconditions; - import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.index.CheckIndex; import org.apache.lucene.search.Query; @@ -1397,7 +1396,7 @@ public class IndexShard extends AbstractIndexShardComponent { } public int getOperationsCount() { - return indexShardOperationCounter.refCount(); + return Math.max(0, indexShardOperationCounter.refCount() - 1); // refCount is incremented on creation and decremented on close } /** diff --git a/src/main/java/org/elasticsearch/indices/IndicesModule.java b/src/main/java/org/elasticsearch/indices/IndicesModule.java index 4cfb9980e87..50bbfa61858 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -32,6 +32,7 @@ import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener; +import org.elasticsearch.indices.flush.SyncedFlushService; import org.elasticsearch.indices.memory.IndexingMemoryController; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.indices.recovery.RecoverySettings; diff --git a/src/main/java/org/elasticsearch/indices/flush/IndicesSyncedFlushResult.java b/src/main/java/org/elasticsearch/indices/flush/IndicesSyncedFlushResult.java new file mode 100644 index 00000000000..f625f04484a --- /dev/null +++ b/src/main/java/org/elasticsearch/indices/flush/IndicesSyncedFlushResult.java @@ -0,0 +1,153 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.indices.flush; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * The result of performing a sync flush operation on all shards of multiple indices + */ +public class IndicesSyncedFlushResult implements ToXContent { + + final Map> shardsResultPerIndex; + final ShardCounts shardCounts; + + + public IndicesSyncedFlushResult(Map> shardsResultPerIndex) { + this.shardsResultPerIndex = ImmutableMap.copyOf(shardsResultPerIndex); + this.shardCounts = calculateShardCounts(Iterables.concat(shardsResultPerIndex.values())); + } + + /** total number shards, including replicas, both assigned and unassigned */ + public int totalShards() { + return shardCounts.total; + } + + /** total number of shards for which the operation failed */ + public int failedShards() { + return shardCounts.failed; + } + + /** total number of shards which were successfully sync-flushed */ + public int successfulShards() { + return shardCounts.successful; + } + + public RestStatus restStatus() { + return failedShards() == 0 ? RestStatus.OK : RestStatus.CONFLICT; + } + + public Map> getShardsResultPerIndex() { + return shardsResultPerIndex; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields._SHARDS); + shardCounts.toXContent(builder, params); + builder.endObject(); + for (Map.Entry> indexEntry : shardsResultPerIndex.entrySet()) { + List indexResult = indexEntry.getValue(); + builder.startObject(indexEntry.getKey()); + ShardCounts indexShardCounts = calculateShardCounts(indexResult); + indexShardCounts.toXContent(builder, params); + if (indexShardCounts.failed > 0) { + builder.startArray(Fields.FAILURES); + for (ShardsSyncedFlushResult shardResults : indexResult) { + if (shardResults.failed()) { + builder.startObject(); + builder.field(Fields.SHARD, shardResults.shardId().id()); + builder.field(Fields.REASON, shardResults.failureReason()); + builder.endObject(); + continue; + } + Map failedShards = shardResults.failedShards(); + for (Map.Entry shardEntry : failedShards.entrySet()) { + builder.startObject(); + builder.field(Fields.SHARD, shardResults.shardId().id()); + builder.field(Fields.REASON, shardEntry.getValue().failureReason()); + builder.field(Fields.ROUTING, shardEntry.getKey()); + builder.endObject(); + } + } + builder.endArray(); + } + builder.endObject(); + } + return builder; + } + + static ShardCounts calculateShardCounts(Iterable results) { + int total = 0, successful = 0, failed = 0; + for (ShardsSyncedFlushResult result : results) { + total += result.totalShards(); + successful += result.successfulShards(); + if (result.failed()) { + // treat all shard copies as failed + failed += result.totalShards(); + } else { + // some shards may have failed during the sync phase + failed += result.failedShards().size(); + } + } + return new ShardCounts(total, successful, failed); + } + + static final class ShardCounts implements ToXContent { + + public final int total; + public final int successful; + public final int failed; + + ShardCounts(int total, int successful, int failed) { + this.total = total; + this.successful = successful; + this.failed = failed; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(Fields.TOTAL, total); + builder.field(Fields.SUCCESSFUL, successful); + builder.field(Fields.FAILED, failed); + return builder; + } + } + + static final class Fields { + static final XContentBuilderString _SHARDS = new XContentBuilderString("_shards"); + static final XContentBuilderString TOTAL = new XContentBuilderString("total"); + static final XContentBuilderString SUCCESSFUL = new XContentBuilderString("successful"); + static final XContentBuilderString FAILED = new XContentBuilderString("failed"); + static final XContentBuilderString FAILURES = new XContentBuilderString("failures"); + static final XContentBuilderString SHARD = new XContentBuilderString("shard"); + static final XContentBuilderString ROUTING = new XContentBuilderString("routing"); + static final XContentBuilderString REASON = new XContentBuilderString("reason"); + } +} diff --git a/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java b/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java new file mode 100644 index 00000000000..1388373ff36 --- /dev/null +++ b/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java @@ -0,0 +1,132 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.indices.flush; + +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.index.shard.ShardId; + +import java.util.HashMap; +import java.util.Map; + +/** + * Result for all copies of a shard + */ +public class ShardsSyncedFlushResult { + private String failureReason; + private Map shardResponses; + private String syncId; + private ShardId shardId; + // some shards may be unassigned, so we need this as state + private int totalShards; + + public ShardsSyncedFlushResult() { + } + + public ShardId getShardId() { + return shardId; + } + + /** + * failure constructor + */ + public ShardsSyncedFlushResult(ShardId shardId, int totalShards, String failureReason) { + this.syncId = null; + this.failureReason = failureReason; + this.shardResponses = ImmutableMap.of(); + this.shardId = shardId; + this.totalShards = totalShards; + } + + /** + * success constructor + */ + public ShardsSyncedFlushResult(ShardId shardId, String syncId, int totalShards, Map shardResponses) { + this.failureReason = null; + ImmutableMap.Builder builder = ImmutableMap.builder(); + this.shardResponses = builder.putAll(shardResponses).build(); + this.syncId = syncId; + this.totalShards = totalShards; + this.shardId = shardId; + } + + /** + * @return true if the operation failed before reaching step three of synced flush. {@link #failureReason()} can be used for + * more details + */ + public boolean failed() { + return failureReason != null; + } + + /** + * @return the reason for the failure if synced flush failed before step three of synced flush + */ + public String failureReason() { + return failureReason; + } + + public String syncId() { + return syncId; + } + + /** + * @return total number of shards for which a sync attempt was made + */ + public int totalShards() { + return totalShards; + } + + /** + * @return total number of successful shards + */ + public int successfulShards() { + int i = 0; + for (SyncedFlushService.SyncedFlushResponse result : shardResponses.values()) { + if (result.success()) { + i++; + } + } + return i; + } + + /** + * @return an array of shard failures + */ + public Map failedShards() { + Map failures = new HashMap<>(); + for (Map.Entry result : shardResponses.entrySet()) { + if (result.getValue().success() == false) { + failures.put(result.getKey(), result.getValue()); + } + } + return failures; + } + + /** + * @return Individual responses for each shard copy with a detailed failure message if the copy failed to perform the synced flush. + * Empty if synced flush failed before step three. + */ + public Map shardResponses() { + return shardResponses; + } + + public ShardId shardId() { + return shardId; + } +} diff --git a/src/main/java/org/elasticsearch/indices/SyncedFlushService.java b/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java similarity index 81% rename from src/main/java/org/elasticsearch/indices/SyncedFlushService.java rename to src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index aa42a408a18..537392a9c98 100644 --- a/src/main/java/org/elasticsearch/indices/SyncedFlushService.java +++ b/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -16,17 +16,16 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.indices; +package org.elasticsearch.indices.flush; -import com.google.common.collect.ImmutableMap; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ImmutableShardRouting; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -44,10 +43,16 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardException; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndexClosedException; +import org.elasticsearch.indices.IndexMissingException; +import org.elasticsearch.indices.IndicesLifecycle; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -77,10 +82,10 @@ public class SyncedFlushService extends AbstractComponent { public void onShardInactive(final IndexShard indexShard) { // we only want to call sync flush once, so only trigger it when we are on a primary if (indexShard.routingEntry().primary()) { - attemptSyncedFlush(indexShard.shardId(), new ActionListener() { + attemptSyncedFlush(indexShard.shardId(), new ActionListener() { @Override - public void onResponse(SyncedFlushResult syncedFlushResult) { - logger.debug("{} sync flush on inactive shard returned successfully for sync_id: {}", syncedFlushResult.getShardId(), syncedFlushResult.syncId()); + public void onResponse(ShardsSyncedFlushResult syncedFlushResult) { + logger.trace("{} sync flush on inactive shard returned successfully for sync_id: {}", syncedFlushResult.getShardId(), syncedFlushResult.syncId()); } @Override @@ -93,6 +98,56 @@ public class SyncedFlushService extends AbstractComponent { }); } + /** + * a utility method to perform a synced flush for all shards of multiple indices. see {@link #attemptSyncedFlush(ShardId, ActionListener)} + * for more details. + */ + public void attemptSyncedFlush(final String[] aliasesOrIndices, IndicesOptions indicesOptions, final ActionListener listener) { + final ClusterState state = clusterService.state(); + final String[] concreteIndices = state.metaData().concreteIndices(indicesOptions, aliasesOrIndices); + final Map> results = ConcurrentCollections.newConcurrentMap(); + int totalNumberOfShards = 0; + int numberOfShards = 0; + for (String index : concreteIndices) { + final IndexMetaData indexMetaData = state.metaData().index(index); + totalNumberOfShards += indexMetaData.totalNumberOfShards(); + numberOfShards += indexMetaData.getNumberOfShards(); + results.put(index, Collections.synchronizedList(new ArrayList())); + + } + if (numberOfShards == 0) { + listener.onResponse(new IndicesSyncedFlushResult(results)); + return; + } + final int finalTotalNumberOfShards = totalNumberOfShards; + final CountDown countDown = new CountDown(numberOfShards); + + for (final String index : concreteIndices) { + final int indexNumberOfShards = state.metaData().index(index).getNumberOfShards(); + for (int shard = 0; shard < indexNumberOfShards; shard++) { + final ShardId shardId = new ShardId(index, shard); + attemptSyncedFlush(shardId, new ActionListener() { + @Override + public void onResponse(ShardsSyncedFlushResult syncedFlushResult) { + results.get(index).add(syncedFlushResult); + if (countDown.countDown()) { + listener.onResponse(new IndicesSyncedFlushResult(results)); + } + } + + @Override + public void onFailure(Throwable e) { + logger.debug("{} unexpected error while executing synced flush", shardId); + results.get(index).add(new ShardsSyncedFlushResult(shardId, finalTotalNumberOfShards, e.getMessage())); + if (countDown.countDown()) { + listener.onResponse(new IndicesSyncedFlushResult(results)); + } + } + }); + } + } + } + /* * Tries to flush all copies of a shard and write a sync id to it. * After a synced flush two shard copies may only contain the same sync id if they contain the same documents. @@ -119,28 +174,36 @@ public class SyncedFlushService extends AbstractComponent { * * Synced flush is a best effort operation. The sync id may be written on all, some or none of the copies. **/ - public void attemptSyncedFlush(final ShardId shardId, final ActionListener actionListener) { + public void attemptSyncedFlush(final ShardId shardId, final ActionListener actionListener) { try { final ClusterState state = clusterService.state(); - final IndexShardRoutingTable shardRoutingTable = getActiveShardRoutings(shardId, state); + final IndexShardRoutingTable shardRoutingTable = getShardRoutingTable(shardId, state); final List activeShards = shardRoutingTable.activeShards(); + final int totalShards = shardRoutingTable.getSize(); + + if (activeShards.size() == 0) { + actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "no active shards")); + return; + } + final ActionListener> commitIdsListener = new ActionListener>() { @Override public void onResponse(final Map commitIds) { if (commitIds.isEmpty()) { - actionListener.onResponse(new SyncedFlushResult(shardId, "all shards failed to commit on pre-sync")); + actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "all shards failed to commit on pre-sync")); + return; } final ActionListener inflightOpsListener = new ActionListener() { @Override public void onResponse(InFlightOpsResponse response) { final int inflight = response.opCount(); - assert inflight >= -1; - if (inflight != 1) { // 1 means that there are no write operations are in flight (>1) and the shard is not closed (0). - actionListener.onResponse(new SyncedFlushResult(shardId, "operation counter on primary is non zero [" + inflight + "]")); + assert inflight >= 0; + if (inflight != 0) { + actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + "] ongoing operations on primary")); } else { // 3. now send the sync request to all the shards String syncId = Strings.base64UUID(); - sendSyncRequests(syncId, activeShards, state, commitIds, shardId, actionListener); + sendSyncRequests(syncId, activeShards, state, commitIds, shardId, totalShards, actionListener); } } @@ -166,7 +229,7 @@ public class SyncedFlushService extends AbstractComponent { } } - final IndexShardRoutingTable getActiveShardRoutings(ShardId shardId, ClusterState state) { + final IndexShardRoutingTable getShardRoutingTable(ShardId shardId, ClusterState state) { final IndexRoutingTable indexRoutingTable = state.routingTable().index(shardId.index().name()); if (indexRoutingTable == null) { IndexMetaData index = state.getMetaData().index(shardId.index().getName()); @@ -183,7 +246,7 @@ public class SyncedFlushService extends AbstractComponent { } /** - * returns the number of inflight operations on primary. -1 upon error. + * returns the number of in flight operations on primary. -1 upon error. */ protected void getInflightOpsCount(final ShardId shardId, ClusterState state, IndexShardRoutingTable shardRoutingTable, final ActionListener listener) { try { @@ -209,7 +272,7 @@ public class SyncedFlushService extends AbstractComponent { @Override public void handleException(TransportException exp) { - logger.debug("{} unexpected error while retrieving inflight op count", shardId); + logger.debug("{} unexpected error while retrieving in flight op count", shardId); listener.onFailure(exp); } @@ -224,7 +287,8 @@ public class SyncedFlushService extends AbstractComponent { } - void sendSyncRequests(final String syncId, final List shards, ClusterState state, Map expectedCommitIds, final ShardId shardId, final ActionListener listener) { + void sendSyncRequests(final String syncId, final List shards, ClusterState state, Map expectedCommitIds, + final ShardId shardId, final int totalShards, final ActionListener listener) { final CountDown countDown = new CountDown(shards.size()); final Map results = ConcurrentCollections.newConcurrentMap(); for (final ShardRouting shard : shards) { @@ -232,14 +296,14 @@ public class SyncedFlushService extends AbstractComponent { if (node == null) { logger.trace("{} is assigned to an unknown node. skipping for sync id [{}]. shard routing {}", shardId, syncId, shard); results.put(shard, new SyncedFlushResponse("unknown node")); - contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results); + contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results); continue; } final Engine.CommitId expectedCommitId = expectedCommitIds.get(shard.currentNodeId()); if (expectedCommitId == null) { logger.trace("{} can't resolve expected commit id for {}, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard); results.put(shard, new SyncedFlushResponse("no commit id from pre-sync flush")); - contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results); + contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results); continue; } logger.trace("{} sending synced flush request to {}. sync id [{}].", shardId, shard, syncId); @@ -255,14 +319,14 @@ public class SyncedFlushService extends AbstractComponent { SyncedFlushResponse existing = results.put(shard, response); assert existing == null : "got two answers for node [" + node + "]"; // count after the assert so we won't decrement twice in handleException - contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results); + contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results); } @Override public void handleException(TransportException exp) { logger.trace("{} error while performing synced flush on [{}], skipping", exp, shardId, shard); results.put(shard, new SyncedFlushResponse(exp.getMessage())); - contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results); + contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results); } @Override @@ -274,10 +338,12 @@ public class SyncedFlushService extends AbstractComponent { } - private void contDownAndSendResponseIfDone(String syncId, List shards, ShardId shardId, ActionListener listener, CountDown countDown, Map results) { + private void contDownAndSendResponseIfDone(String syncId, List shards, ShardId shardId, int totalShards, + ActionListener listener, CountDown countDown, Map results) { if (countDown.countDown()) { assert results.size() == shards.size(); - listener.onResponse(new SyncedFlushResult(shardId, syncId, results)); + listener.onResponse(new ShardsSyncedFlushResult(shardId, syncId, totalShards, results)); } } @@ -292,8 +358,8 @@ public class SyncedFlushService extends AbstractComponent { final DiscoveryNode node = state.nodes().get(shard.currentNodeId()); if (node == null) { logger.trace("{} shard routing {} refers to an unknown node. skipping.", shardId, shard); - if(countDown.countDown()) { - listener.onResponse(commitIds); + if (countDown.countDown()) { + listener.onResponse(commitIds); } continue; } @@ -308,7 +374,7 @@ public class SyncedFlushService extends AbstractComponent { Engine.CommitId existing = commitIds.putIfAbsent(node.id(), response.commitId()); assert existing == null : "got two answers for node [" + node + "]"; // count after the assert so we won't decrement twice in handleException - if(countDown.countDown()) { + if (countDown.countDown()) { listener.onResponse(commitIds); } } @@ -316,7 +382,7 @@ public class SyncedFlushService extends AbstractComponent { @Override public void handleException(TransportException exp) { logger.trace("{} error while performing pre synced flush on [{}], skipping", shardId, exp, shard); - if(countDown.countDown()) { + if (countDown.countDown()) { listener.onResponse(commitIds); } } @@ -343,7 +409,7 @@ public class SyncedFlushService extends AbstractComponent { IndexShard indexShard = indexService.shardSafe(request.shardId().id()); logger.trace("{} performing sync flush. sync id [{}], expected commit id {}", request.shardId(), request.syncId(), request.expectedCommitId()); Engine.SyncedFlushResult result = indexShard.syncFlush(request.syncId(), request.expectedCommitId()); - logger.trace("{} sync flush done. sync id [{}], result [{}]", request.shardId(), request.syncId(), result); + logger.trace("{} sync flush done. sync id [{}], result [{}]", request.shardId(), request.syncId(), result); switch (result) { case SUCCESS: return new SyncedFlushResponse(); @@ -367,124 +433,6 @@ public class SyncedFlushService extends AbstractComponent { return new InFlightOpsResponse(opCount); } - /** - * Result for all copies of a shard - */ - public static class SyncedFlushResult extends TransportResponse { - private String failureReason; - private Map shardResponses; - private String syncId; - private ShardId shardId; - - public SyncedFlushResult() { - } - - public ShardId getShardId() { - return shardId; - } - - /** - * failure constructor - */ - public SyncedFlushResult(ShardId shardId, String failureReason) { - this.syncId = null; - this.failureReason = failureReason; - this.shardResponses = ImmutableMap.of(); - this.shardId = shardId; - } - - /** - * success constructor - */ - public SyncedFlushResult(ShardId shardId, String syncId, Map shardResponses) { - this.failureReason = null; - ImmutableMap.Builder builder = ImmutableMap.builder(); - this.shardResponses = builder.putAll(shardResponses).build(); - this.syncId = syncId; - this.shardId = shardId; - } - - /** - * @return true if one or more shard copies was successful, false if all failed before step three of synced flush - */ - public boolean success() { - return syncId != null; - } - - /** - * @return the reason for the failure if synced flush failed before step three of synced flush - */ - public String failureReason() { - return failureReason; - } - - public String syncId() { - return syncId; - } - - /** - * @return total number of shards for which a sync attempt was made - */ - public int totalShards() { - return shardResponses.size(); - } - - /** - * @return total number of successful shards - */ - public int successfulShards() { - int i = 0; - for (SyncedFlushResponse result : shardResponses.values()) { - if (result.success()) { - i++; - } - } - return i; - } - - /** - * @return Individual responses for each shard copy with a detailed failure message if the copy failed to perform the synced flush. - * Empty if synced flush failed before step three. - */ - public Map shardResponses() { - return shardResponses; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeOptionalString(failureReason); - out.writeOptionalString(syncId); - out.writeVInt(shardResponses.size()); - for (Map.Entry result : shardResponses.entrySet()) { - result.getKey().writeTo(out); - result.getValue().writeTo(out); - } - shardId.writeTo(out); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - failureReason = in.readOptionalString(); - syncId = in.readOptionalString(); - int size = in.readVInt(); - ImmutableMap.Builder builder = ImmutableMap.builder(); - for (int i = 0; i < size; i++) { - ImmutableShardRouting shardRouting = ImmutableShardRouting.readShardRoutingEntry(in); - SyncedFlushResponse syncedFlushRsponse = new SyncedFlushResponse(); - syncedFlushRsponse.readFrom(in); - builder.put(shardRouting, syncedFlushRsponse); - } - shardResponses = builder.build(); - shardId = ShardId.readShardId(in); - } - - public ShardId shardId() { - return shardId; - } - } - final static class PreSyncedFlushRequest extends TransportRequest { private ShardId shardId; diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 2e2497fe7cb..572b784093e 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -21,7 +21,6 @@ package org.elasticsearch.indices.recovery; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; - import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; @@ -199,6 +198,8 @@ public class RecoverySourceHandler { } // we shortcut recovery here because we have nothing to copy. but we must still start the engine on the target. // so we don't return here + logger.trace("[{}][{}] skipping [phase1] to {} - identical sync id [{}] found on both source and target", indexName, shardId, + request.targetNode(), recoverySourceSyncId); } else { final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot()); for (StoreFileMetaData md : diff.identical) { diff --git a/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index 3490f6d2d8c..c574594040e 100644 --- a/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -235,6 +235,9 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction(channel) { + IndicesOptions indicesOptions = IndicesOptions.fromRequest(request, IndicesOptions.lenientExpandOpen()); + + syncedFlushService.attemptSyncedFlush(indices, indicesOptions, new RestBuilderListener(channel) { @Override - public RestResponse buildResponse(SealIndicesResponse response, XContentBuilder builder) throws Exception { + public RestResponse buildResponse(IndicesSyncedFlushResult results, XContentBuilder builder) throws Exception { builder.startObject(); - builder = response.toXContent(builder, ToXContent.EMPTY_PARAMS); + results.toXContent(builder, request); builder.endObject(); - return new BytesRestResponse(response.status(), builder); + return new BytesRestResponse(results.restStatus(), builder); } }); } diff --git a/src/test/java/org/elasticsearch/action/admin/indices/seal/SealIndicesTests.java b/src/test/java/org/elasticsearch/action/admin/indices/seal/SealIndicesTests.java deleted file mode 100644 index 13c376eb250..00000000000 --- a/src/test/java/org/elasticsearch/action/admin/indices/seal/SealIndicesTests.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.seal; - -import org.elasticsearch.cluster.routing.ImmutableShardRouting; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.SyncedFlushService; -import org.elasticsearch.test.ElasticsearchTestCase; - -import java.io.IOException; -import java.util.*; - -import static org.elasticsearch.test.XContentTestUtils.convertToMap; -import static org.hamcrest.Matchers.equalTo; - -public class SealIndicesTests extends ElasticsearchTestCase { - - public void testSealIndicesResponseStreaming() throws IOException { - - Set shardResults = new HashSet<>(); - // add one result where one shard failed and one succeeded - SyncedFlushService.SyncedFlushResult syncedFlushResult = createSyncedFlushResult(0, "test"); - shardResults.add(syncedFlushResult); - // add one result where all failed - syncedFlushResult = new SyncedFlushService.SyncedFlushResult(new ShardId("test", 1), "all failed :("); - shardResults.add(syncedFlushResult); - SealIndicesResponse sealIndicesResponse = new SealIndicesResponse(shardResults); - BytesStreamOutput out = new BytesStreamOutput(); - sealIndicesResponse.writeTo(out); - out.close(); - StreamInput in = StreamInput.wrap(out.bytes()); - SealIndicesResponse readResponse = new SealIndicesResponse(); - readResponse.readFrom(in); - Map asMap = convertToMap(readResponse); - assertResponse(asMap); - } - - public void testXContentResponse() throws IOException { - - Set shardResults = new HashSet<>(); - // add one result where one shard failed and one succeeded - SyncedFlushService.SyncedFlushResult syncedFlushResult = createSyncedFlushResult(0, "test"); - shardResults.add(syncedFlushResult); - // add one result where all failed - syncedFlushResult = new SyncedFlushService.SyncedFlushResult(new ShardId("test", 1), "all failed :("); - shardResults.add(syncedFlushResult); - SealIndicesResponse sealIndicesResponse = new SealIndicesResponse(shardResults); - Map asMap = convertToMap(sealIndicesResponse); - assertResponse(asMap); - } - - protected void assertResponse(Map asMap) { - assertNotNull(asMap.get("test")); - assertThat((Integer) (((HashMap) ((ArrayList) asMap.get("test")).get(0)).get("shard_id")), equalTo(0)); - assertThat((String) (((HashMap) ((ArrayList) asMap.get("test")).get(0)).get("message")), equalTo("failed on some copies")); - HashMap shardResponses = (HashMap) ((HashMap) ((ArrayList) asMap.get("test")).get(0)).get("responses"); - assertThat(shardResponses.get("node_1"), equalTo("failed for some reason")); - assertThat(shardResponses.get("node_2"), equalTo("success")); - HashMap failedShard = (HashMap) (((ArrayList) asMap.get("test")).get(1)); - assertThat((Integer) (failedShard.get("shard_id")), equalTo(1)); - assertThat((String) (failedShard.get("message")), equalTo("all failed :(")); - } - - public void testXContentResponseSortsShards() throws IOException { - Set shardResults = new HashSet<>(); - // add one result where one shard failed and one succeeded - SyncedFlushService.SyncedFlushResult syncedFlushResult; - for (int i = 100000; i >= 0; i--) { - if (randomBoolean()) { - syncedFlushResult = createSyncedFlushResult(i, "test"); - shardResults.add(syncedFlushResult); - } else { - syncedFlushResult = new SyncedFlushService.SyncedFlushResult(new ShardId("test", i), "all failed :("); - shardResults.add(syncedFlushResult); - } - } - SealIndicesResponse sealIndicesResponse = new SealIndicesResponse(shardResults); - Map asMap = convertToMap(sealIndicesResponse); - assertNotNull(asMap.get("test")); - for (int i = 0; i < 100000; i++) { - assertThat((Integer) (((HashMap) ((ArrayList) asMap.get("test")).get(i)).get("shard_id")), equalTo(i)); - } - } - - protected SyncedFlushService.SyncedFlushResult createSyncedFlushResult(int shardId, String index) { - Map responses = new HashMap<>(); - ImmutableShardRouting shardRouting = new ImmutableShardRouting(index, shardId, "node_1", false, ShardRoutingState.RELOCATING, 2); - SyncedFlushService.SyncedFlushResponse syncedFlushResponse = new SyncedFlushService.SyncedFlushResponse("failed for some reason"); - responses.put(shardRouting, syncedFlushResponse); - shardRouting = new ImmutableShardRouting(index, shardId, "node_2", false, ShardRoutingState.RELOCATING, 2); - syncedFlushResponse = new SyncedFlushService.SyncedFlushResponse(); - responses.put(shardRouting, syncedFlushResponse); - return new SyncedFlushService.SyncedFlushResult(new ShardId(index, shardId), "some_sync_id", responses); - } -} diff --git a/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayTests.java b/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayTests.java index 75adcc27c1e..a35397833a0 100644 --- a/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayTests.java +++ b/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayTests.java @@ -32,13 +32,11 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.SyncedFlushService; +import org.elasticsearch.indices.flush.SyncedFlushUtil; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.elasticsearch.test.InternalTestCluster.RestartCallback; -import org.elasticsearch.indices.SyncedFlushUtil; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.store.MockFSDirectoryService; import org.junit.Test; @@ -397,11 +395,7 @@ public class RecoveryFromGatewayTests extends ElasticsearchIntegrationTest { ensureGreen(); } else { logger.info("--> trying to sync flush"); - int numShards = Integer.parseInt(client().admin().indices().prepareGetSettings("test").get().getSetting("test", "index.number_of_shards")); - SyncedFlushService syncedFlushService = internalCluster().getInstance(SyncedFlushService.class); - for (int i = 0; i < numShards; i++) { - assertTrue(SyncedFlushUtil.attemptSyncedFlush(syncedFlushService, new ShardId("test", i)).success()); - } + assertEquals(SyncedFlushUtil.attemptSyncedFlush(internalCluster(), "test").failedShards(), 0); assertSyncIdsNotNull(); } diff --git a/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index a9762e28e42..97e988f819f 100644 --- a/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -18,8 +18,8 @@ */ package org.elasticsearch.index.shard; -import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.ShardRouting; @@ -254,13 +254,14 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService indexService = indicesService.indexServiceSafe("test"); IndexShard indexShard = indexService.shard(0); + assertEquals(0, indexShard.getOperationsCount()); + indexShard.incrementOperationCounter(); + assertEquals(1, indexShard.getOperationsCount()); indexShard.incrementOperationCounter(); assertEquals(2, indexShard.getOperationsCount()); - indexShard.incrementOperationCounter(); - assertEquals(3, indexShard.getOperationsCount()); indexShard.decrementOperationCounter(); indexShard.decrementOperationCounter(); - assertEquals(1, indexShard.getOperationsCount()); + assertEquals(0, indexShard.getOperationsCount()); } @Test diff --git a/src/test/java/org/elasticsearch/indices/SealTests.java b/src/test/java/org/elasticsearch/indices/SealTests.java deleted file mode 100644 index 281de05c387..00000000000 --- a/src/test/java/org/elasticsearch/indices/SealTests.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.indices; - -import org.elasticsearch.action.admin.indices.seal.SealIndicesResponse; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.junit.Test; - -import static java.lang.Thread.sleep; -import static org.hamcrest.Matchers.equalTo; - -@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0) -public class SealTests extends ElasticsearchIntegrationTest { - - @Test - public void testUnallocatedShardsDoesNotHang() throws InterruptedException { - Settings.Builder settingsBuilder = Settings.builder() - .put("node.data", false) - .put("node.master", true) - .put("path.data", createTempDir().toString()); - internalCluster().startNode(settingsBuilder.build()); - // create an index but because no data nodes are available no shards will be allocated - createIndex("test"); - // this should not hang but instead immediately return with empty result set - SealIndicesResponse sealIndicesResponse = client().admin().indices().prepareSealIndices("test").get(); - // just to make sure the test actually tests the right thing - int numShards = client().admin().indices().prepareGetSettings("test").get().getIndexToSettings().get("test").getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, -1); - assertThat(sealIndicesResponse.results().size(), equalTo(numShards)); - assertThat(sealIndicesResponse.results().iterator().next().failureReason(), equalTo("no active primary available")); - } -} diff --git a/src/test/java/org/elasticsearch/indices/FlushTest.java b/src/test/java/org/elasticsearch/indices/flush/FlushTest.java similarity index 73% rename from src/test/java/org/elasticsearch/indices/FlushTest.java rename to src/test/java/org/elasticsearch/indices/flush/FlushTest.java index 1fdb42f051b..5ce34ca4fa7 100644 --- a/src/test/java/org/elasticsearch/indices/FlushTest.java +++ b/src/test/java/org/elasticsearch/indices/flush/FlushTest.java @@ -16,13 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.indices; +package org.elasticsearch.indices.flush; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; -import org.elasticsearch.action.admin.indices.seal.SealIndicesResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; @@ -36,6 +35,7 @@ import org.junit.Test; import java.io.IOException; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -43,7 +43,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static java.lang.Thread.sleep; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; @@ -97,8 +96,16 @@ public class FlushTest extends ElasticsearchIntegrationTest { assertNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); } - SyncedFlushService.SyncedFlushResult result = SyncedFlushUtil.attemptSyncedFlush(internalCluster().getInstance(SyncedFlushService.class), new ShardId("test", 0)); - assertTrue(result.success()); + ShardsSyncedFlushResult result; + if (randomBoolean()) { + logger.info("--> sync flushing shard 0"); + result = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), new ShardId("test", 0)); + } else { + logger.info("--> sync flushing index [test]"); + IndicesSyncedFlushResult indicesResult = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), "test"); + result = indicesResult.getShardsResultPerIndex().get("test").get(0); + } + assertFalse(result.failed()); assertThat(result.totalShards(), equalTo(indexStats.getShards().length)); assertThat(result.successfulShards(), equalTo(indexStats.getShards().length)); @@ -140,26 +147,7 @@ public class FlushTest extends ElasticsearchIntegrationTest { } @TestLogging("indices:TRACE") - public void testSyncedFlushWithApi() throws ExecutionException, InterruptedException, IOException { - - createIndex("test"); - ensureGreen(); - - IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test"); - for (ShardStats shardStats : indexStats.getShards()) { - assertNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); - } - logger.info("--> trying sync flush"); - SealIndicesResponse sealIndicesResponse = client().admin().indices().prepareSealIndices("test").get(); - logger.info("--> sync flush done"); - indexStats = client().admin().indices().prepareStats("test").get().getIndex("test"); - for (ShardStats shardStats : indexStats.getShards()) { - assertNotNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); - } - } - - @TestLogging("indices:TRACE") - public void testSyncedFlushWithApiAndConcurrentIndexing() throws Exception { + public void testSyncedFlushWithConcurrentIndexing() throws Exception { internalCluster().ensureAtLeastNumDataNodes(3); createIndex("test"); @@ -186,14 +174,12 @@ public class FlushTest extends ElasticsearchIntegrationTest { assertNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); } logger.info("--> trying sync flush"); - SealIndicesResponse sealIndicesResponse = client().admin().indices().prepareSealIndices("test").get(); + IndicesSyncedFlushResult syncedFlushResult = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), "test"); logger.info("--> sync flush done"); stop.set(true); indexingThread.join(); indexStats = client().admin().indices().prepareStats("test").get().getIndex("test"); - for (ShardStats shardStats : indexStats.getShards()) { - assertFlushResponseEqualsShardStats(shardStats, sealIndicesResponse); - } + assertFlushResponseEqualsShardStats(indexStats.getShards(), syncedFlushResult.getShardsResultPerIndex().get("test")); refresh(); assertThat(client().prepareCount().get().getCount(), equalTo((long) numDocs.get())); logger.info("indexed {} docs", client().prepareCount().get().getCount()); @@ -203,22 +189,38 @@ public class FlushTest extends ElasticsearchIntegrationTest { assertThat(client().prepareCount().get().getCount(), equalTo((long) numDocs.get())); } - private void assertFlushResponseEqualsShardStats(ShardStats shardStats, SealIndicesResponse sealIndicesResponse) { + private void assertFlushResponseEqualsShardStats(ShardStats[] shardsStats, List syncedFlushResults) { - for (SyncedFlushService.SyncedFlushResult shardResult : sealIndicesResponse.results()) { - if (shardStats.getShardRouting().getId() == shardResult.shardId().getId()) { - for (Map.Entry singleResponse : shardResult.shardResponses().entrySet()) { - if (singleResponse.getKey().currentNodeId().equals(shardStats.getShardRouting().currentNodeId())) { - if (singleResponse.getValue().success()) { - assertNotNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); - logger.info("sync flushed {} on node {}", singleResponse.getKey().shardId(), singleResponse.getKey().currentNodeId()); - } else { - assertNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); - logger.info("sync flush failed for {} on node {}", singleResponse.getKey().shardId(), singleResponse.getKey().currentNodeId()); + for (final ShardStats shardStats : shardsStats) { + for (final ShardsSyncedFlushResult shardResult : syncedFlushResults) { + if (shardStats.getShardRouting().getId() == shardResult.shardId().getId()) { + for (Map.Entry singleResponse : shardResult.shardResponses().entrySet()) { + if (singleResponse.getKey().currentNodeId().equals(shardStats.getShardRouting().currentNodeId())) { + if (singleResponse.getValue().success()) { + logger.info("{} sync flushed on node {}", singleResponse.getKey().shardId(), singleResponse.getKey().currentNodeId()); + assertNotNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); + } else { + logger.info("{} sync flush failed for on node {}", singleResponse.getKey().shardId(), singleResponse.getKey().currentNodeId()); + assertNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); + } } } } } } } + + @Test + public void testUnallocatedShardsDoesNotHang() throws InterruptedException { + // create an index but disallow allocation + prepareCreate("test").setSettings(Settings.builder().put("index.routing.allocation.include._name", "nonexistent")).get(); + + // this should not hang but instead immediately return with empty result set + List shardsResult = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), "test").getShardsResultPerIndex().get("test"); + // just to make sure the test actually tests the right thing + int numShards = client().admin().indices().prepareGetSettings("test").get().getIndexToSettings().get("test").getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, -1); + assertThat(shardsResult.size(), equalTo(numShards)); + assertThat(shardsResult.get(0).failureReason(), equalTo("no active shards")); + } + } diff --git a/src/test/java/org/elasticsearch/indices/SycnedFlushSingleNodeTest.java b/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTest.java similarity index 83% rename from src/test/java/org/elasticsearch/indices/SycnedFlushSingleNodeTest.java rename to src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTest.java index 348c5753c83..5d65c1acb7d 100644 --- a/src/test/java/org/elasticsearch/indices/SycnedFlushSingleNodeTest.java +++ b/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.indices; +package org.elasticsearch.indices.flush; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -27,17 +27,17 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ElasticsearchSingleNodeTest; -import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; import java.util.List; import java.util.Map; /** */ -public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { +public class SyncedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { - public void testModificationPreventsSealing() throws InterruptedException { + public void testModificationPreventsFlushing() throws InterruptedException { createIndex("test"); client().prepareIndex("test", "test", "1").setSource("{}").get(); IndexService test = getInstanceFromNode(IndicesService.class).indexService("test"); @@ -46,18 +46,18 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class); final ShardId shardId = shard.shardId(); final ClusterState state = getInstanceFromNode(ClusterService.class).state(); - final IndexShardRoutingTable shardRoutingTable = flushService.getActiveShardRoutings(shardId, state); + final IndexShardRoutingTable shardRoutingTable = flushService.getShardRoutingTable(shardId, state); final List activeShards = shardRoutingTable.activeShards(); assertEquals("exactly one active shard", 1, activeShards.size()); Map commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); assertEquals("exactly one commit id", 1, commitIds.size()); client().prepareIndex("test", "test", "2").setSource("{}").get(); String syncId = Strings.base64UUID(); - SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); - flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId,listener); + SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener<>(); + flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener); listener.latch.await(); assertNull(listener.error); - SyncedFlushService.SyncedFlushResult syncedFlushResult = listener.result; + ShardsSyncedFlushResult syncedFlushResult = listener.result; assertNotNull(syncedFlushResult); assertEquals(0, syncedFlushResult.successfulShards()); assertEquals(1, syncedFlushResult.totalShards()); @@ -66,9 +66,9 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { assertFalse(syncedFlushResult.shardResponses().get(activeShards.get(0)).success()); assertEquals("pending operations", syncedFlushResult.shardResponses().get(activeShards.get(0)).failureReason()); - SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); // pull another commit and make sure we can't seal with the old one + SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); // pull another commit and make sure we can't sync-flush with the old one listener = new SyncedFlushUtil.LatchedListener(); - flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId,listener); + flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener); listener.latch.await(); assertNull(listener.error); syncedFlushResult = listener.result; @@ -79,7 +79,6 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { assertNotNull(syncedFlushResult.shardResponses().get(activeShards.get(0))); assertFalse(syncedFlushResult.shardResponses().get(activeShards.get(0)).success()); assertEquals("commit has changed", syncedFlushResult.shardResponses().get(activeShards.get(0)).failureReason()); - ElasticsearchAssertions.assertVersionSerializable(syncedFlushResult); } public void testSingleShardSuccess() throws InterruptedException { @@ -90,17 +89,16 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class); final ShardId shardId = shard.shardId(); - SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); + SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); flushService.attemptSyncedFlush(shardId, listener); listener.latch.await(); assertNull(listener.error); - SyncedFlushService.SyncedFlushResult syncedFlushResult = listener.result; + ShardsSyncedFlushResult syncedFlushResult = listener.result; assertNotNull(syncedFlushResult); assertEquals(1, syncedFlushResult.successfulShards()); assertEquals(1, syncedFlushResult.totalShards()); SyncedFlushService.SyncedFlushResponse response = syncedFlushResult.shardResponses().values().iterator().next(); assertTrue(response.success()); - ElasticsearchAssertions.assertVersionSerializable(syncedFlushResult); } public void testSyncFailsIfOperationIsInFlight() throws InterruptedException { @@ -113,16 +111,15 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { final ShardId shardId = shard.shardId(); shard.incrementOperationCounter(); try { - SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); + SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener<>(); flushService.attemptSyncedFlush(shardId, listener); listener.latch.await(); assertNull(listener.error); - SyncedFlushService.SyncedFlushResult syncedFlushResult = listener.result; + ShardsSyncedFlushResult syncedFlushResult = listener.result; assertNotNull(syncedFlushResult); assertEquals(0, syncedFlushResult.successfulShards()); - assertEquals(0, syncedFlushResult.totalShards()); - assertEquals("operation counter on primary is non zero [2]", syncedFlushResult.failureReason()); - ElasticsearchAssertions.assertVersionSerializable(syncedFlushResult); + assertNotEquals(0, syncedFlushResult.totalShards()); + assertEquals("[1] ongoing operations on primary", syncedFlushResult.failureReason()); } finally { shard.decrementOperationCounter(); } @@ -168,7 +165,7 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class); final ShardId shardId = shard.shardId(); final ClusterState state = getInstanceFromNode(ClusterService.class).state(); - final IndexShardRoutingTable shardRoutingTable = flushService.getActiveShardRoutings(shardId, state); + final IndexShardRoutingTable shardRoutingTable = flushService.getShardRoutingTable(shardId, state); final List activeShards = shardRoutingTable.activeShards(); assertEquals("exactly one active shard", 1, activeShards.size()); Map commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); @@ -178,11 +175,11 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { } client().admin().indices().prepareFlush("test").setForce(true).get(); String syncId = Strings.base64UUID(); - final SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); - flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, listener); + final SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); + flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener); listener.latch.await(); assertNull(listener.error); - SyncedFlushService.SyncedFlushResult syncedFlushResult = listener.result; + ShardsSyncedFlushResult syncedFlushResult = listener.result; assertNotNull(syncedFlushResult); assertEquals(0, syncedFlushResult.successfulShards()); assertEquals(1, syncedFlushResult.totalShards()); @@ -190,7 +187,6 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { assertNotNull(syncedFlushResult.shardResponses().get(activeShards.get(0))); assertFalse(syncedFlushResult.shardResponses().get(activeShards.get(0)).success()); assertEquals("commit has changed", syncedFlushResult.shardResponses().get(activeShards.get(0)).failureReason()); - ElasticsearchAssertions.assertVersionSerializable(syncedFlushResult); } public void testFailWhenCommitIsMissing() throws InterruptedException { @@ -202,18 +198,18 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class); final ShardId shardId = shard.shardId(); final ClusterState state = getInstanceFromNode(ClusterService.class).state(); - final IndexShardRoutingTable shardRoutingTable = flushService.getActiveShardRoutings(shardId, state); + final IndexShardRoutingTable shardRoutingTable = flushService.getShardRoutingTable(shardId, state); final List activeShards = shardRoutingTable.activeShards(); assertEquals("exactly one active shard", 1, activeShards.size()); Map commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); assertEquals("exactly one commit id", 1, commitIds.size()); commitIds.clear(); // wipe it... String syncId = Strings.base64UUID(); - SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); - flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, listener); + SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); + flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener); listener.latch.await(); assertNull(listener.error); - SyncedFlushService.SyncedFlushResult syncedFlushResult = listener.result; + ShardsSyncedFlushResult syncedFlushResult = listener.result; assertNotNull(syncedFlushResult); assertEquals(0, syncedFlushResult.successfulShards()); assertEquals(1, syncedFlushResult.totalShards()); @@ -221,7 +217,6 @@ public class SycnedFlushSingleNodeTest extends ElasticsearchSingleNodeTest { assertNotNull(syncedFlushResult.shardResponses().get(activeShards.get(0))); assertFalse(syncedFlushResult.shardResponses().get(activeShards.get(0)).success()); assertEquals("no commit id from pre-sync flush", syncedFlushResult.shardResponses().get(activeShards.get(0)).failureReason()); - ElasticsearchAssertions.assertVersionSerializable(syncedFlushResult); } diff --git a/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUnitTests.java b/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUnitTests.java new file mode 100644 index 00000000000..426ec36d608 --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUnitTests.java @@ -0,0 +1,136 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.flush; + +import com.carrotsearch.hppc.ObjectIntHashMap; +import com.carrotsearch.hppc.ObjectIntMap; +import org.elasticsearch.cluster.routing.ImmutableShardRouting; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.flush.IndicesSyncedFlushResult.ShardCounts; +import org.elasticsearch.indices.flush.SyncedFlushService.SyncedFlushResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ElasticsearchTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.test.XContentTestUtils.convertToMap; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +public class SyncedFlushUnitTests extends ElasticsearchTestCase { + + + private static class TestPlan { + public ShardCounts totalCounts; + public Map countsPerIndex = new HashMap<>(); + public ObjectIntMap expectedFailuresPerIndex = new ObjectIntHashMap<>(); + + public IndicesSyncedFlushResult result; + + } + + public void testIndicesSyncedFlushResult() throws IOException { + final TestPlan testPlan = createTestPlan(); + assertThat(testPlan.result.totalShards(), equalTo(testPlan.totalCounts.total)); + assertThat(testPlan.result.successfulShards(), equalTo(testPlan.totalCounts.successful)); + assertThat(testPlan.result.failedShards(), equalTo(testPlan.totalCounts.failed)); + assertThat(testPlan.result.restStatus(), equalTo(testPlan.totalCounts.failed > 0 ? RestStatus.CONFLICT : RestStatus.OK)); + Map asMap = convertToMap(testPlan.result); + assertShardCount("_shards header", (Map) asMap.get("_shards"), testPlan.totalCounts); + + assertThat("unexpected number of indices", asMap.size(), equalTo(1 + testPlan.countsPerIndex.size())); // +1 for the shards header + for (String index : testPlan.countsPerIndex.keySet()) { + Map indexMap = (Map) asMap.get(index); + assertShardCount(index, indexMap, testPlan.countsPerIndex.get(index)); + List> failureList = (List>) indexMap.get("failures"); + final int expectedFailures = testPlan.expectedFailuresPerIndex.get(index); + if (expectedFailures == 0) { + assertNull(index + " has unexpected failures", failureList); + } else { + assertNotNull(index + " should have failures", failureList); + assertThat(failureList, hasSize(expectedFailures)); + } + } + } + + private void assertShardCount(String name, Map header, ShardCounts expectedCounts) { + assertThat(name + " has unexpected total count", (Integer) header.get("total"), equalTo(expectedCounts.total)); + assertThat(name + " has unexpected successful count", (Integer) header.get("successful"), equalTo(expectedCounts.successful)); + assertThat(name + " has unexpected failed count", (Integer) header.get("failed"), equalTo(expectedCounts.failed)); + } + + protected TestPlan createTestPlan() { + final TestPlan testPlan = new TestPlan(); + final Map> indicesResults = new HashMap<>(); + final int indexCount = randomIntBetween(1, 10); + int totalShards = 0; + int totalSuccesful = 0; + int totalFailed = 0; + for (int i = 0; i < indexCount; i++) { + final String index = "index_" + i; + int shards = randomIntBetween(1, 4); + int replicas = randomIntBetween(0, 2); + int successful = 0; + int failed = 0; + int failures = 0; + List shardsResults = new ArrayList<>(); + for (int shard = 0; shard < shards; shard++) { + final ShardId shardId = new ShardId(index, shard); + if (randomInt(5) < 2) { + // total shard failure + failed += replicas + 1; + failures++; + shardsResults.add(new ShardsSyncedFlushResult(shardId, replicas + 1, "simulated total failure")); + } else { + Map shardResponses = new HashMap<>(); + for (int copy = 0; copy < replicas + 1; copy++) { + final ShardRouting shardRouting = new ImmutableShardRouting(index, shard, "node_" + shardId + "_" + copy, null, + copy == 0, ShardRoutingState.STARTED, 0); + if (randomInt(5) < 2) { + // shard copy failure + failed++; + failures++; + shardResponses.put(shardRouting, new SyncedFlushResponse("copy failure " + shardId)); + } else { + successful++; + shardResponses.put(shardRouting, new SyncedFlushResponse()); + } + } + shardsResults.add(new ShardsSyncedFlushResult(shardId, "_sync_id_" + shard, replicas + 1, shardResponses)); + } + } + indicesResults.put(index, shardsResults); + testPlan.countsPerIndex.put(index, new ShardCounts(shards * (replicas + 1), successful, failed)); + testPlan.expectedFailuresPerIndex.put(index, failures); + totalFailed += failed; + totalShards += shards * (replicas + 1); + totalSuccesful += successful; + } + testPlan.result = new IndicesSyncedFlushResult(indicesResults); + testPlan.totalCounts = new ShardCounts(totalShards, totalSuccesful, totalFailed); + return testPlan; + } +} diff --git a/src/test/java/org/elasticsearch/indices/SyncedFlushUtil.java b/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java similarity index 70% rename from src/test/java/org/elasticsearch/indices/SyncedFlushUtil.java rename to src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java index e16c85b4b7e..fef6c23231e 100644 --- a/src/test/java/org/elasticsearch/indices/SyncedFlushUtil.java +++ b/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java @@ -16,16 +16,16 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.indices; +package org.elasticsearch.indices.flush; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.SyncedFlushService; +import org.elasticsearch.test.InternalTestCluster; import java.util.List; import java.util.Map; @@ -38,11 +38,31 @@ public class SyncedFlushUtil { } + /** + * Blocking single index version of {@link SyncedFlushService#attemptSyncedFlush(String[], IndicesOptions, ActionListener)} + */ + public static IndicesSyncedFlushResult attemptSyncedFlush(InternalTestCluster cluster, String index) { + SyncedFlushService service = cluster.getInstance(SyncedFlushService.class); + LatchedListener listener = new LatchedListener(); + service.attemptSyncedFlush(new String[]{index}, IndicesOptions.lenientExpandOpen(), listener); + try { + listener.latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (listener.error != null) { + throw ExceptionsHelper.convertToElastic(listener.error); + } + return listener.result; + } + + /** * Blocking version of {@link SyncedFlushService#attemptSyncedFlush(ShardId, ActionListener)} */ - public static SyncedFlushService.SyncedFlushResult attemptSyncedFlush(SyncedFlushService service, ShardId shardId) { - LatchedListener listener = new LatchedListener(); + public static ShardsSyncedFlushResult attemptSyncedFlush(InternalTestCluster cluster, ShardId shardId) { + SyncedFlushService service = cluster.getInstance(SyncedFlushService.class); + LatchedListener listener = new LatchedListener(); service.attemptSyncedFlush(shardId, listener); try { listener.latch.await(); diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 0e146d37ae1..81f0e9f39bf 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -18,14 +18,15 @@ */ package org.elasticsearch.test; -import com.carrotsearch.randomizedtesting.*; +import com.carrotsearch.randomizedtesting.RandomizedContext; +import com.carrotsearch.randomizedtesting.RandomizedTest; +import com.carrotsearch.randomizedtesting.Randomness; import com.carrotsearch.randomizedtesting.annotations.TestGroup; import com.carrotsearch.randomizedtesting.generators.RandomInts; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.collect.Lists; - import org.apache.commons.lang3.StringUtils; import org.apache.http.impl.client.HttpClients; import org.apache.lucene.store.StoreRateLimiting; @@ -49,7 +50,6 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.optimize.OptimizeResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; -import org.elasticsearch.action.admin.indices.seal.SealIndicesResponse; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -101,21 +101,18 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.FieldMapper.Loading; import org.elasticsearch.index.mapper.internal.SizeFieldMapper; import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; -import org.elasticsearch.index.merge.policy.AbstractMergePolicyProvider; -import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider; -import org.elasticsearch.index.merge.policy.LogDocMergePolicyProvider; -import org.elasticsearch.index.merge.policy.MergePolicyModule; -import org.elasticsearch.index.merge.policy.MergePolicyProvider; -import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider; +import org.elasticsearch.index.merge.policy.*; import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider; import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogService; -import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogWriter; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; +import org.elasticsearch.indices.flush.IndicesSyncedFlushResult; +import org.elasticsearch.indices.flush.SyncedFlushService; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.node.Node; @@ -128,45 +125,28 @@ import org.elasticsearch.test.rest.client.http.HttpRequestBuilder; import org.elasticsearch.transport.netty.NettyTransport; import org.hamcrest.Matchers; import org.joda.time.DateTimeZone; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Ignore; +import org.junit.*; import java.io.IOException; import java.io.InputStream; -import java.lang.annotation.ElementType; -import java.lang.annotation.Inherited; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; +import java.lang.annotation.*; import java.net.InetSocketAddress; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.*; -import java.util.concurrent.Callable; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.common.settings.Settings.settingsBuilder; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.XContentTestUtils.convertToMap; import static org.elasticsearch.test.XContentTestUtils.mapsEqualIgnoringArrayOrder; -import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; -import static org.hamcrest.Matchers.emptyIterable; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; +import static org.hamcrest.Matchers.*; /** * {@link ElasticsearchIntegrationTest} is an abstract base class to run integration @@ -244,7 +224,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase /** * Annotation for third-party integration tests. - *

+ *

* These are tests the require a third-party service in order to run. They * may require the user to manually configure an external process (such as rabbitmq), * or may additionally require some external configuration (e.g. AWS credentials) @@ -408,56 +388,56 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase } mappings.startArray("dynamic_templates") .startObject() - .startObject("template-strings") - .field("match_mapping_type", "string") + .startObject("template-strings") + .field("match_mapping_type", "string") .startObject("mapping") - .startObject("fielddata") - .field(FieldDataType.FORMAT_KEY, randomFrom("paged_bytes", "fst")) - .field(Loading.KEY, randomLoadingValues()) + .startObject("fielddata") + .field(FieldDataType.FORMAT_KEY, randomFrom("paged_bytes", "fst")) + .field(Loading.KEY, randomLoadingValues()) .endObject() .endObject() .endObject() .endObject() .startObject() - .startObject("template-longs") - .field("match_mapping_type", "long") - .startObject("mapping") - .field("doc_values", randomBoolean()) - .startObject("fielddata") - .field(Loading.KEY, randomFrom(Loading.LAZY, Loading.EAGER)) + .startObject("template-longs") + .field("match_mapping_type", "long") + .startObject("mapping") + .field("doc_values", randomBoolean()) + .startObject("fielddata") + .field(Loading.KEY, randomFrom(Loading.LAZY, Loading.EAGER)) .endObject() .endObject() .endObject() .endObject() .startObject() - .startObject("template-doubles") - .field("match_mapping_type", "double") - .startObject("mapping") - .field("doc_values", randomBoolean()) - .startObject("fielddata") - .field(Loading.KEY, randomFrom(Loading.LAZY, Loading.EAGER)) + .startObject("template-doubles") + .field("match_mapping_type", "double") + .startObject("mapping") + .field("doc_values", randomBoolean()) + .startObject("fielddata") + .field(Loading.KEY, randomFrom(Loading.LAZY, Loading.EAGER)) .endObject() .endObject() .endObject() .endObject() .startObject() - .startObject("template-geo_points") - .field("match_mapping_type", "geo_point") - .startObject("mapping") - .field("doc_values", randomBoolean()) - .startObject("fielddata") - .field(Loading.KEY, randomFrom(Loading.LAZY, Loading.EAGER)) + .startObject("template-geo_points") + .field("match_mapping_type", "geo_point") + .startObject("mapping") + .field("doc_values", randomBoolean()) + .startObject("fielddata") + .field(Loading.KEY, randomFrom(Loading.LAZY, Loading.EAGER)) .endObject() .endObject() .endObject() .endObject() .startObject() - .startObject("template-booleans") - .field("match_mapping_type", "boolean") - .startObject("mapping") - .startObject("fielddata") - .field(FieldDataType.FORMAT_KEY, randomFrom("array", "doc_values")) - .field(Loading.KEY, randomFrom(Loading.LAZY, Loading.EAGER)) + .startObject("template-booleans") + .field("match_mapping_type", "boolean") + .startObject("mapping") + .startObject("fielddata") + .field(FieldDataType.FORMAT_KEY, randomFrom("array", "doc_values")) + .field(Loading.KEY, randomFrom(Loading.LAZY, Loading.EAGER)) .endObject() .endObject() .endObject() @@ -512,7 +492,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase } if (random.nextBoolean()) { - builder.put(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, RandomPicks.randomFrom(random, TranslogWriter.Type.values()).name()); + builder.put(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, RandomPicks.randomFrom(random, TranslogWriter.Type.values()).name()); } if (random.nextBoolean()) { @@ -652,9 +632,9 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase if (currentClusterScope != Scope.TEST) { MetaData metaData = client().admin().cluster().prepareState().execute().actionGet().getState().getMetaData(); assertThat("test leaves persistent cluster metadata behind: " + metaData.persistentSettings().getAsMap(), metaData - .persistentSettings().getAsMap().size(), equalTo(0)); + .persistentSettings().getAsMap().size(), equalTo(0)); assertThat("test leaves transient cluster metadata behind: " + metaData.transientSettings().getAsMap(), metaData - .transientSettings().getAsMap().size(), equalTo(0)); + .transientSettings().getAsMap().size(), equalTo(0)); } ensureClusterSizeConsistency(); ensureClusterStateConsistency(); @@ -870,11 +850,11 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase public void run() { for (Client client : clients()) { ClusterHealthResponse clusterHealth = client.admin().cluster().prepareHealth().setLocal(true).get(); - assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0)); + assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0)); PendingClusterTasksResponse pendingTasks = client.admin().cluster().preparePendingClusterTasks().setLocal(true).get(); assertThat("client " + client + " still has pending tasks " + pendingTasks.prettyPrint(), pendingTasks, Matchers.emptyIterable()); clusterHealth = client.admin().cluster().prepareHealth().setLocal(true).get(); - assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0)); + assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0)); } } }); @@ -961,7 +941,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase * It is useful to ensure that all action on the cluster have finished and all shards that were currently relocating * are now allocated and started. */ - public ClusterHealthStatus ensureGreen(String... indices) { + public ClusterHealthStatus ensureGreen(String... indices) { return ensureGreen(TimeValue.timeValueSeconds(30), indices); } @@ -1248,11 +1228,11 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase /** * Syntactic sugar for: - * + *

*

      *   return client().prepareIndex(index, type, id).setSource(source).execute().actionGet();
      * 
- * + *

* where source is a String. */ protected final IndexResponse index(String index, String type, String id, String source) { @@ -1375,7 +1355,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase * @param forceRefresh if true all involved indices are refreshed once the documents are indexed. * @param dummyDocuments if true some empty dummy documents may be randomly inserted into the document list and deleted once * all documents are indexed. This is useful to produce deleted documents on the server side. - * @param maybeFlush if true this method may randomly execute full flushes after index operations. + * @param maybeFlush if true this method may randomly execute full flushes after index operations. * @param builders the documents to index. */ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean maybeFlush, List builders) throws InterruptedException, ExecutionException { @@ -1509,8 +1489,8 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase client().admin().indices().prepareFlush(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute( new LatchedActionListener(newLatch(inFlightAsyncOperations))); } else { - client().admin().indices().prepareSealIndices(indices).execute( - new LatchedActionListener(newLatch(inFlightAsyncOperations))); + internalCluster().getInstance(SyncedFlushService.class).attemptSyncedFlush(indices, IndicesOptions.lenientExpandOpen(), + new LatchedActionListener(newLatch(inFlightAsyncOperations))); } } else if (rarely()) { client().admin().indices().prepareOptimize(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).setMaxNumSegments(between(1, 10)).setFlush(maybeFlush && randomBoolean()).execute( @@ -1669,7 +1649,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase private int getMinNumDataNodes() { ClusterScope annotation = getAnnotation(this.getClass()); - return annotation == null || annotation.minNumDataNodes() == -1 ? InternalTestCluster.DEFAULT_MIN_NUM_DATA_NODES : annotation.minNumDataNodes(); + return annotation == null || annotation.minNumDataNodes() == -1 ? InternalTestCluster.DEFAULT_MIN_NUM_DATA_NODES : annotation.minNumDataNodes(); } private int getMaxNumDataNodes() { @@ -1702,7 +1682,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "1b") .put("script.indexed", "on") .put("script.inline", "on") - // wait short time for other active shards before actually deleting, default 30s not needed in tests + // wait short time for other active shards before actually deleting, default 30s not needed in tests .put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(1, TimeUnit.SECONDS)) .build(); } @@ -1886,7 +1866,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { for (ShardRouting shardRouting : indexShardRoutingTable) { - if (shardRouting.currentNodeId() != null && index.equals(shardRouting.getIndex())) { + if (shardRouting.currentNodeId() != null && index.equals(shardRouting.getIndex())) { String name = clusterState.nodes().get(shardRouting.currentNodeId()).name(); nodes.add(name); assertThat("Allocated on new node: " + name, Regex.simpleMatch(pattern, name), is(true)); diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 5fd564f6e88..079008ef8cb 100644 --- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -78,7 +78,6 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardModule; -import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.IndexStoreModule; import org.elasticsearch.index.translog.TranslogConfig; @@ -975,7 +974,7 @@ public final class InternalTestCluster extends TestCluster { @Override public void beforeIndexDeletion() { - // Check that the operations counter on index shard has reached 1. + // Check that the operations counter on index shard has reached 0. // The assumption here is that after a test there are no ongoing write operations. // test that have ongoing write operations after the test (for example because ttl is used // and not all docs have been purged after the test) and inherit from @@ -1018,10 +1017,7 @@ public final class InternalTestCluster extends TestCluster { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { - assertThat("index shard counter on shard " + indexShard.shardId() + " on node " + nodeAndClient.name + " not 0 or 1 ", indexShard.getOperationsCount(), anyOf(equalTo(1), equalTo(0))); - if (indexShard.getOperationsCount() == 0) { - assertThat(indexShard.state(), equalTo(IndexShardState.CLOSED)); - } + assertThat("index shard counter on shard " + indexShard.shardId() + " on node " + nodeAndClient.name + " not 0", indexShard.getOperationsCount(), equalTo(0)); } } }