From 03369e098055bc0b99cc8e8e83a421df7d744973 Mon Sep 17 00:00:00 2001 From: Rory Hunter Date: Tue, 16 Jun 2020 21:50:38 +0100 Subject: [PATCH] Implement dangling indices API (#58176) Backport of #50920. Part of #48366. Implement an API for listing, importing and deleting dangling indices. Co-authored-by: David Turner --- .../client/RestHighLevelClientTests.java | 3 + .../http/DanglingIndicesRestIT.java | 316 ++++++++++++ .../api/dangling_indices.delete.json | 39 ++ .../api/dangling_indices.import.json | 39 ++ .../api/dangling_indices.list.json | 20 + .../indices/recovery/DanglingIndicesIT.java | 465 ++++++++++++++++-- .../elasticsearch/action/ActionModule.java | 22 + .../indices/dangling/DanglingIndexInfo.java | 75 +++ .../delete/DeleteDanglingIndexAction.java | 36 ++ .../delete/DeleteDanglingIndexRequest.java | 74 +++ .../TransportDeleteDanglingIndexAction.java | 240 +++++++++ .../find/FindDanglingIndexAction.java | 35 ++ .../find/FindDanglingIndexRequest.java | 56 +++ .../find/FindDanglingIndexResponse.java | 58 +++ .../find/NodeFindDanglingIndexRequest.java | 52 ++ .../find/NodeFindDanglingIndexResponse.java | 62 +++ .../TransportFindDanglingIndexAction.java | 107 ++++ .../ImportDanglingIndexAction.java | 36 ++ .../ImportDanglingIndexRequest.java | 76 +++ .../TransportImportDanglingIndexAction.java | 163 ++++++ .../list/ListDanglingIndicesAction.java | 35 ++ .../list/ListDanglingIndicesRequest.java | 64 +++ .../list/ListDanglingIndicesResponse.java | 168 +++++++ .../list/NodeListDanglingIndicesRequest.java | 55 +++ .../list/NodeListDanglingIndicesResponse.java | 56 +++ .../TransportListDanglingIndicesAction.java | 115 +++++ .../admin/indices/dangling/package-info.java | 36 ++ .../support/nodes/TransportNodesAction.java | 2 +- .../client/ClusterAdminClient.java | 34 ++ .../client/support/AbstractClient.java | 39 +- .../cluster/ClusterChangedEvent.java | 38 +- .../cluster/metadata/Metadata.java | 5 + .../common/util/CollectionUtils.java | 1 - .../gateway/DanglingIndicesState.java | 117 +++-- .../cluster/IndicesClusterStateService.java | 8 +- .../RestDeleteDanglingIndexAction.java | 68 +++ .../RestImportDanglingIndexAction.java | 67 +++ .../RestListDanglingIndicesAction.java | 52 ++ .../admin/cluster/dangling/package-info.java | 25 + .../ListDanglingIndicesResponseTests.java | 157 ++++++ .../gateway/DanglingIndicesStateTests.java | 97 +++- .../elasticsearch/test/XContentTestUtils.java | 32 ++ 42 files changed, 3142 insertions(+), 103 deletions(-) create mode 100644 qa/smoke-test-http/src/test/java/org/elasticsearch/http/DanglingIndicesRestIT.java create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/dangling_indices.delete.json create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/dangling_indices.import.json create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/dangling_indices.list.json create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/DanglingIndexInfo.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/delete/DeleteDanglingIndexAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/delete/DeleteDanglingIndexRequest.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexRequest.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexResponse.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/NodeFindDanglingIndexRequest.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/NodeFindDanglingIndexResponse.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/TransportFindDanglingIndexAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/import_index/ImportDanglingIndexAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/import_index/ImportDanglingIndexRequest.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/import_index/TransportImportDanglingIndexAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesRequest.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesResponse.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/NodeListDanglingIndicesRequest.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/NodeListDanglingIndicesResponse.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/TransportListDanglingIndicesAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/dangling/package-info.java create mode 100644 server/src/main/java/org/elasticsearch/rest/action/admin/cluster/dangling/RestDeleteDanglingIndexAction.java create mode 100644 server/src/main/java/org/elasticsearch/rest/action/admin/cluster/dangling/RestImportDanglingIndexAction.java create mode 100644 server/src/main/java/org/elasticsearch/rest/action/admin/cluster/dangling/RestListDanglingIndicesAction.java create mode 100644 server/src/main/java/org/elasticsearch/rest/action/admin/cluster/dangling/package-info.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesResponseTests.java diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index 8044f5fd07a..53e57018f44 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -816,6 +816,9 @@ public class RestHighLevelClientTests extends ESTestCase { "cluster.stats", "cluster.post_voting_config_exclusions", "cluster.delete_voting_config_exclusions", + "dangling_indices.delete", + "dangling_indices.import", + "dangling_indices.list", "indices.shard_stores", "indices.upgrade", "indices.recovery", diff --git a/qa/smoke-test-http/src/test/java/org/elasticsearch/http/DanglingIndicesRestIT.java b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/DanglingIndicesRestIT.java new file mode 100644 index 00000000000..06274d35271 --- /dev/null +++ b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/DanglingIndicesRestIT.java @@ -0,0 +1,316 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.http; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.XContentTestUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.cluster.metadata.IndexGraveyard.SETTING_MAX_TOMBSTONES; +import static org.elasticsearch.gateway.DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING; +import static org.elasticsearch.indices.IndicesService.WRITE_DANGLING_INDICES_INFO_SETTING; +import static org.elasticsearch.rest.RestStatus.ACCEPTED; +import static org.elasticsearch.rest.RestStatus.OK; +import static org.elasticsearch.test.XContentTestUtils.createJsonMapView; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; + +/** + * This class tests the dangling indices REST API. These tests are here + * today so they have access to a proper REST client. They cannot be in + * :server:integTest since the REST client needs a proper transport + * implementation, and they cannot be REST tests today since they need to + * restart nodes. Really, though, this test should live elsewhere. + * + * @see org.elasticsearch.action.admin.indices.dangling + */ +@ClusterScope(numDataNodes = 0, scope = ESIntegTestCase.Scope.TEST, autoManageMasterNodes = false) +public class DanglingIndicesRestIT extends HttpSmokeTestCase { + private static final String INDEX_NAME = "test-idx-1"; + private static final String OTHER_INDEX_NAME = INDEX_NAME + "-other"; + + private Settings buildSettings(int maxTombstones) { + return Settings.builder() + // Limit the indices kept in the graveyard. This can be set to zero, so that + // when we delete an index, it's definitely considered to be dangling. + .put(SETTING_MAX_TOMBSTONES.getKey(), maxTombstones) + .put(WRITE_DANGLING_INDICES_INFO_SETTING.getKey(), true) + .put(AUTO_IMPORT_DANGLING_INDICES_SETTING.getKey(), false) + .build(); + } + + /** + * Check that when dangling indices are discovered, then they can be listed via the REST API. + */ + public void testDanglingIndicesCanBeListed() throws Exception { + internalCluster().setBootstrapMasterNodeIndex(1); + internalCluster().startNodes(3, buildSettings(0)); + + final DanglingIndexDetails danglingIndexDetails = createDanglingIndices(INDEX_NAME); + final String stoppedNodeId = mapNodeNameToId(danglingIndexDetails.stoppedNodeName); + + final RestClient restClient = getRestClient(); + + final Response listResponse = restClient.performRequest(new Request("GET", "/_dangling")); + assertOK(listResponse); + + final XContentTestUtils.JsonMapView mapView = createJsonMapView(listResponse.getEntity().getContent()); + + assertThat(mapView.get("_nodes.total"), equalTo(3)); + assertThat(mapView.get("_nodes.successful"), equalTo(3)); + assertThat(mapView.get("_nodes.failed"), equalTo(0)); + + List indices = mapView.get("dangling_indices"); + assertThat(indices, hasSize(1)); + + assertThat(mapView.get("dangling_indices.0.index_name"), equalTo(INDEX_NAME)); + assertThat(mapView.get("dangling_indices.0.index_uuid"), equalTo(danglingIndexDetails.indexToUUID.get(INDEX_NAME))); + assertThat(mapView.get("dangling_indices.0.creation_date_millis"), instanceOf(Long.class)); + assertThat(mapView.get("dangling_indices.0.node_ids.0"), equalTo(stoppedNodeId)); + } + + /** + * Check that dangling indices can be imported. + */ + public void testDanglingIndicesCanBeImported() throws Exception { + internalCluster().setBootstrapMasterNodeIndex(1); + internalCluster().startNodes(3, buildSettings(0)); + + createDanglingIndices(INDEX_NAME); + + final RestClient restClient = getRestClient(); + + final List danglingIndexIds = listDanglingIndexIds(); + assertThat(danglingIndexIds, hasSize(1)); + + final Request importRequest = new Request("POST", "/_dangling/" + danglingIndexIds.get(0)); + importRequest.addParameter("accept_data_loss", "true"); + // Ensure this parameter is accepted + importRequest.addParameter("timeout", "20s"); + importRequest.addParameter("master_timeout", "20s"); + final Response importResponse = restClient.performRequest(importRequest); + assertThat(importResponse.getStatusLine().getStatusCode(), equalTo(ACCEPTED.getStatus())); + + final XContentTestUtils.JsonMapView mapView = createJsonMapView(importResponse.getEntity().getContent()); + assertThat(mapView.get("acknowledged"), equalTo(true)); + + assertTrue("Expected dangling index " + INDEX_NAME + " to be recovered", indexExists(INDEX_NAME)); + } + + /** + * Check that dangling indices can be deleted. Since this requires that + * we add an entry to the index graveyard, the graveyard size must be + * greater than 1. To test deletes, we set the index graveyard size to + * 1, then create two indices and delete them both while one node in + * the cluster is stopped. The deletion of the second pushes the deletion + * of the first out of the graveyard. When the stopped node is resumed, + * only the second index will be found into the graveyard and the the + * other will be considered dangling, and can therefore be listed and + * deleted through the API + */ + public void testDanglingIndicesCanBeDeleted() throws Exception { + internalCluster().setBootstrapMasterNodeIndex(1); + internalCluster().startNodes(3, buildSettings(1)); + + createDanglingIndices(INDEX_NAME, OTHER_INDEX_NAME); + + final RestClient restClient = getRestClient(); + + final List danglingIndexIds = listDanglingIndexIds(); + assertThat(danglingIndexIds, hasSize(1)); + + final Request deleteRequest = new Request("DELETE", "/_dangling/" + danglingIndexIds.get(0)); + deleteRequest.addParameter("accept_data_loss", "true"); + // Ensure these parameters is accepted + deleteRequest.addParameter("timeout", "20s"); + deleteRequest.addParameter("master_timeout", "20s"); + final Response deleteResponse = restClient.performRequest(deleteRequest); + assertThat(deleteResponse.getStatusLine().getStatusCode(), equalTo(ACCEPTED.getStatus())); + + final XContentTestUtils.JsonMapView mapView = createJsonMapView(deleteResponse.getEntity().getContent()); + assertThat(mapView.get("acknowledged"), equalTo(true)); + + assertBusy(() -> assertThat("Expected dangling index to be deleted", listDanglingIndexIds(), hasSize(0))); + + // The dangling index that we deleted ought to have been removed from disk. Check by + // creating and deleting another index, which creates a new tombstone entry, which should + // not cause the deleted dangling index to be considered "live" again, just because its + // tombstone has been pushed out of the graveyard. + createIndex("additional"); + deleteIndex("additional"); + assertThat(listDanglingIndexIds(), is(empty())); + } + + private List listDanglingIndexIds() throws IOException { + final Response response = getRestClient().performRequest(new Request("GET", "/_dangling")); + assertOK(response); + + final XContentTestUtils.JsonMapView mapView = createJsonMapView(response.getEntity().getContent()); + + assertThat(mapView.get("_nodes.total"), equalTo(3)); + assertThat(mapView.get("_nodes.successful"), equalTo(3)); + assertThat(mapView.get("_nodes.failed"), equalTo(0)); + + List indices = mapView.get("dangling_indices"); + + List danglingIndexIds = new ArrayList<>(); + + for (int i = 0; i < indices.size(); i++) { + danglingIndexIds.add(mapView.get("dangling_indices." + i + ".index_uuid")); + } + + return danglingIndexIds; + } + + private void assertOK(Response response) { + assertThat(response.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + } + + /** + * Given a node name, finds the corresponding node ID. + */ + private String mapNodeNameToId(String nodeName) throws IOException { + final Response catResponse = getRestClient().performRequest(new Request("GET", "/_cat/nodes?full_id&h=id,name")); + assertOK(catResponse); + + for (String nodeLine : Streams.readAllLines(catResponse.getEntity().getContent())) { + String[] elements = nodeLine.split(" "); + if (elements[1].equals(nodeName)) { + return elements[0]; + } + } + + throw new AssertionError("Failed to map node name [" + nodeName + "] to node ID"); + } + + /** + * Helper that creates one or more indices, and importantly, + * checks that they are green before proceeding. This is important + * because the tests in this class stop and restart nodes, assuming + * that each index has a primary or replica shard on every node, and if + * a node is stopped prematurely, this assumption is broken. + * + * @return a mapping from each created index name to its UUID + */ + private Map createIndices(String... indices) throws IOException { + assert indices.length > 0; + + for (String index : indices) { + String indexSettings = "{" + + " \"settings\": {" + + " \"index\": {" + + " \"number_of_shards\": 1," + + " \"number_of_replicas\": 2," + + " \"routing\": {" + + " \"allocation\": {" + + " \"total_shards_per_node\": 1" + + " }" + + " }" + + " }" + + " }" + + "}"; + Request request = new Request("PUT", "/" + index); + request.setJsonEntity(indexSettings); + assertOK(getRestClient().performRequest(request)); + } + ensureGreen(indices); + + final Response catResponse = getRestClient().performRequest(new Request("GET", "/_cat/indices?h=index,uuid")); + assertOK(catResponse); + + final Map createdIndexIDs = new HashMap<>(); + + final List indicesAsList = Arrays.asList(indices); + + for (String indexLine : Streams.readAllLines(catResponse.getEntity().getContent())) { + String[] elements = indexLine.split(" +"); + if (indicesAsList.contains(elements[0])) { + createdIndexIDs.put(elements[0], elements[1]); + } + } + + assertThat("Expected to find as many index UUIDs as created indices", createdIndexIDs.size(), equalTo(indices.length)); + + return createdIndexIDs; + } + + private void deleteIndex(String indexName) throws IOException { + Response deleteResponse = getRestClient().performRequest(new Request("DELETE", "/" + indexName)); + assertOK(deleteResponse); + } + + private DanglingIndexDetails createDanglingIndices(String... indices) throws Exception { + ensureStableCluster(3); + final Map indexToUUID = createIndices(indices); + + final AtomicReference stoppedNodeName = new AtomicReference<>(); + + assertBusy( + () -> internalCluster().getInstances(IndicesService.class) + .forEach(indicesService -> assertTrue(indicesService.allPendingDanglingIndicesWritten())) + ); + + // Restart node, deleting the index in its absence, so that there is a dangling index to recover + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + ensureClusterSizeConsistency(); + stoppedNodeName.set(nodeName); + for (String index : indices) { + deleteIndex(index); + } + return super.onNodeStopped(nodeName); + } + }); + + ensureStableCluster(3); + + return new DanglingIndexDetails(stoppedNodeName.get(), indexToUUID); + } + + private static class DanglingIndexDetails { + private final String stoppedNodeName; + private final Map indexToUUID; + + DanglingIndexDetails(String stoppedNodeName, Map indexToUUID) { + this.stoppedNodeName = stoppedNodeName; + this.indexToUUID = indexToUUID; + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/dangling_indices.delete.json b/rest-api-spec/src/main/resources/rest-api-spec/api/dangling_indices.delete.json new file mode 100644 index 00000000000..e7f7136a98f --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/dangling_indices.delete.json @@ -0,0 +1,39 @@ +{ + "dangling_indices.delete": { + "documentation": { + "url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-gateway-dangling-indices.html", + "description": "Deletes the specified dangling index" + }, + "stability": "stable", + "url": { + "paths": [ + { + "path": "/_dangling/{index_uuid}", + "methods": [ + "DELETE" + ], + "parts": { + "index_uuid": { + "type": "string", + "description": "The UUID of the dangling index" + } + } + } + ] + }, + "params": { + "accept_data_loss": { + "type": "boolean", + "description": "Must be set to true in order to delete the dangling index" + }, + "timeout": { + "type": "time", + "description": "Explicit operation timeout" + }, + "master_timeout": { + "type": "time", + "description": "Specify timeout for connection to master" + } + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/dangling_indices.import.json b/rest-api-spec/src/main/resources/rest-api-spec/api/dangling_indices.import.json new file mode 100644 index 00000000000..53f44f6b08e --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/dangling_indices.import.json @@ -0,0 +1,39 @@ +{ + "dangling_indices.import": { + "documentation": { + "url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-gateway-dangling-indices.html", + "description": "Imports the specified dangling index" + }, + "stability": "stable", + "url": { + "paths": [ + { + "path": "/_dangling/{index_uuid}", + "methods": [ + "POST" + ], + "parts": { + "index_uuid": { + "type": "string", + "description": "The UUID of the dangling index" + } + } + } + ] + }, + "params": { + "accept_data_loss": { + "type": "boolean", + "description": "Must be set to true in order to import the dangling index" + }, + "timeout": { + "type": "time", + "description": "Explicit operation timeout" + }, + "master_timeout": { + "type": "time", + "description": "Specify timeout for connection to master" + } + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/dangling_indices.list.json b/rest-api-spec/src/main/resources/rest-api-spec/api/dangling_indices.list.json new file mode 100644 index 00000000000..9925167906c --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/dangling_indices.list.json @@ -0,0 +1,20 @@ +{ + "dangling_indices.list": { + "documentation": { + "url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-gateway-dangling-indices.html", + "description": "Returns all dangling indices." + }, + "stability": "stable", + "url": { + "paths": [ + { + "path": "/_dangling", + "methods": [ + "GET" + ] + } + ] + }, + "params": {} + } +} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/DanglingIndicesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/DanglingIndicesIT.java index 74af1162db0..eeefe4b8825 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/DanglingIndicesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/DanglingIndicesIT.java @@ -19,54 +19,79 @@ package org.elasticsearch.indices.recovery; +import org.elasticsearch.action.admin.indices.dangling.DanglingIndexInfo; +import org.elasticsearch.action.admin.indices.dangling.delete.DeleteDanglingIndexRequest; +import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDanglingIndexRequest; +import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest; +import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesResponse; +import org.elasticsearch.action.admin.indices.dangling.list.NodeListDanglingIndicesResponse; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.InternalTestCluster; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.cluster.metadata.IndexGraveyard.SETTING_MAX_TOMBSTONES; import static org.elasticsearch.gateway.DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; +/** + * This class tests how dangling indices are handled, in terms of how they + * are discovered, and how they can be accessed and manipulated through the + * API. + * + *

See also DanglingIndicesRestIT in the qa:smoke-test-http + * project. + * + * @see org.elasticsearch.action.admin.indices.dangling + */ @ClusterScope(numDataNodes = 0, scope = ESIntegTestCase.Scope.TEST) public class DanglingIndicesIT extends ESIntegTestCase { private static final String INDEX_NAME = "test-idx-1"; + private static final String OTHER_INDEX_NAME = INDEX_NAME + "-other"; - private Settings buildSettings(boolean writeDanglingIndices, boolean importDanglingIndices) { + private Settings buildSettings(int maxTombstones, boolean writeDanglingIndices, boolean importDanglingIndices) { return Settings.builder() - // Don't keep any indices in the graveyard, so that when we delete an index, - // it's definitely considered to be dangling. - .put(SETTING_MAX_TOMBSTONES.getKey(), 0) + // Limit the indices kept in the graveyard. This can be set to zero, so that + // when we delete an index, it's definitely considered to be dangling. + .put(SETTING_MAX_TOMBSTONES.getKey(), maxTombstones) .put(IndicesService.WRITE_DANGLING_INDICES_INFO_SETTING.getKey(), writeDanglingIndices) .put(AUTO_IMPORT_DANGLING_INDICES_SETTING.getKey(), importDanglingIndices) .build(); } /** - * Check that when dangling indices are discovered, then they are recovered into - * the cluster, so long as the recovery setting is enabled. + * Check that when the auto-recovery setting is enabled and a dangling index is + * discovered, then that index is recovered into the cluster. */ public void testDanglingIndicesAreRecoveredWhenSettingIsEnabled() throws Exception { - final Settings settings = buildSettings(true, true); + final Settings settings = buildSettings(0, true, true); internalCluster().startNodes(3, settings); - createIndex(INDEX_NAME, Settings.builder().put("number_of_replicas", 2).build()); - ensureGreen(INDEX_NAME); - assertBusy(() -> internalCluster().getInstances(IndicesService.class).forEach( - indicesService -> assertTrue(indicesService.allPendingDanglingIndicesWritten()))); + createIndices(INDEX_NAME); + ensurePendingDanglingIndicesWritten(); boolean refreshIntervalChanged = randomBoolean(); if (refreshIntervalChanged) { - client().admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings( - Settings.builder().put("index.refresh_interval", "42s").build()).get(); - assertBusy(() -> internalCluster().getInstances(IndicesService.class).forEach( - indicesService -> assertTrue(indicesService.allPendingDanglingIndicesWritten()))); + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put("index.refresh_interval", "42s").build()) + .get(); + ensurePendingDanglingIndicesWritten(); } if (randomBoolean()) { @@ -87,36 +112,31 @@ public class DanglingIndicesIT extends ESIntegTestCase { assertBusy(() -> assertTrue("Expected dangling index " + INDEX_NAME + " to be recovered", indexExists(INDEX_NAME))); if (refreshIntervalChanged) { - assertThat(client().admin().indices().prepareGetSettings(INDEX_NAME).get().getSetting(INDEX_NAME, "index.refresh_interval"), - equalTo("42s")); + assertThat( + client().admin().indices().prepareGetSettings(INDEX_NAME).get().getSetting(INDEX_NAME, "index.refresh_interval"), + equalTo("42s") + ); } ensureGreen(INDEX_NAME); final IndexMetadata indexMetadata = clusterService().state().metadata().index(INDEX_NAME); assertThat(indexMetadata.getSettings().get(IndexMetadata.SETTING_HISTORY_UUID), notNullValue()); } + private void ensurePendingDanglingIndicesWritten() throws Exception { + assertBusy( + () -> internalCluster().getInstances(IndicesService.class) + .forEach(indicesService -> assertTrue(indicesService.allPendingDanglingIndicesWritten())) + ); + } + /** * Check that when dangling indices are discovered, then they are not recovered into * the cluster when the recovery setting is disabled. */ public void testDanglingIndicesAreNotRecoveredWhenSettingIsDisabled() throws Exception { - internalCluster().startNodes(3, buildSettings(true, false)); + internalCluster().startNodes(3, buildSettings(0, true, false)); - createIndex(INDEX_NAME, Settings.builder().put("number_of_replicas", 2).build()); - ensureGreen(INDEX_NAME); - assertBusy(() -> internalCluster().getInstances(IndicesService.class).forEach( - indicesService -> assertTrue(indicesService.allPendingDanglingIndicesWritten()))); - - // Restart node, deleting the index in its absence, so that there is a dangling index to recover - internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { - - @Override - public Settings onNodeStopped(String nodeName) throws Exception { - internalCluster().validateClusterFormed(); - assertAcked(client().admin().indices().prepareDelete(INDEX_NAME)); - return super.onNodeStopped(nodeName); - } - }); + createDanglingIndices(INDEX_NAME); // Since index recovery is async, we can't prove index recovery will never occur, just that it doesn't occur within some reasonable // amount of time @@ -130,23 +150,9 @@ public class DanglingIndicesIT extends ESIntegTestCase { * Check that when dangling indices are not written, then they cannot be recovered into the cluster. */ public void testDanglingIndicesAreNotRecoveredWhenNotWritten() throws Exception { - internalCluster().startNodes(3, buildSettings(false, true)); + internalCluster().startNodes(3, buildSettings(0, false, true)); - createIndex(INDEX_NAME, Settings.builder().put("number_of_replicas", 2).build()); - ensureGreen(INDEX_NAME); - internalCluster().getInstances(IndicesService.class).forEach( - indicesService -> assertTrue(indicesService.allPendingDanglingIndicesWritten())); - - // Restart node, deleting the index in its absence, so that there is a dangling index to recover - internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { - - @Override - public Settings onNodeStopped(String nodeName) throws Exception { - internalCluster().validateClusterFormed(); - assertAcked(client().admin().indices().prepareDelete(INDEX_NAME)); - return super.onNodeStopped(nodeName); - } - }); + createDanglingIndices(INDEX_NAME); // Since index recovery is async, we can't prove index recovery will never occur, just that it doesn't occur within some reasonable // amount of time @@ -155,4 +161,365 @@ public class DanglingIndicesIT extends ESIntegTestCase { waitUntil(() -> indexExists(INDEX_NAME), 1, TimeUnit.SECONDS) ); } + + /** + * Check that when dangling indices are discovered, then they can be listed. + */ + public void testDanglingIndicesCanBeListed() throws Exception { + internalCluster().startNodes(3, buildSettings(0, true, false)); + + final String stoppedNodeName = createDanglingIndices(INDEX_NAME); + + final ListDanglingIndicesResponse response = client().admin() + .cluster() + .listDanglingIndices(new ListDanglingIndicesRequest()) + .actionGet(); + assertThat(response.status(), equalTo(RestStatus.OK)); + + final List nodeResponses = response.getNodes(); + assertThat("Didn't get responses from all nodes", nodeResponses, hasSize(3)); + + for (NodeListDanglingIndicesResponse nodeResponse : nodeResponses) { + if (nodeResponse.getNode().getName().equals(stoppedNodeName)) { + assertThat("Expected node that was stopped to have one dangling index", nodeResponse.getDanglingIndices(), hasSize(1)); + + final DanglingIndexInfo danglingIndexInfo = nodeResponse.getDanglingIndices().get(0); + assertThat(danglingIndexInfo.getIndexName(), equalTo(INDEX_NAME)); + } else { + assertThat("Expected node that was never stopped to have no dangling indices", nodeResponse.getDanglingIndices(), empty()); + } + } + } + + /** + * Check that when dangling index auto imports are enabled, and a dangling index is is discovered + * but cannot be imported due to a name clash with an existing index, then that dangling index can + * still be listed through the API. + */ + public void testDanglingIndicesCanBeListedWhenAutoImportEnabled() throws Exception { + internalCluster().startNodes(3, buildSettings(0, true, true)); + + createIndices(INDEX_NAME); + ensurePendingDanglingIndicesWritten(); + + // Restart node, deleting the indices in its absence, so that there is a dangling index to recover + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + internalCluster().validateClusterFormed(); + assertAcked(client().admin().indices().prepareDelete(INDEX_NAME)); + + // Create another index with the same name, which will prevent the dangling + // index from being restored, and also ensures that we are checking index + // existence by UUID, not name. + // + // Note: don't call `createIndices()` here as it calls `ensureGreen()`, which will + // fail while a node is offline + createIndex(INDEX_NAME); + ensurePendingDanglingIndicesWritten(); + + return super.onNodeStopped(nodeName); + } + }); + + final List danglingIndices = listDanglingIndices(); + + assertThat(danglingIndices, hasSize(1)); + assertThat(danglingIndices.get(0).getIndexName(), equalTo(INDEX_NAME)); + } + + /** + * Check that dangling indices can be imported. + */ + public void testDanglingIndicesCanBeImported() throws Exception { + internalCluster().startNodes(3, buildSettings(0, true, false)); + + final String stoppedNodeName = createDanglingIndices(INDEX_NAME); + + final String danglingIndexUUID = findDanglingIndexForNode(stoppedNodeName, INDEX_NAME); + + final ImportDanglingIndexRequest request = new ImportDanglingIndexRequest(danglingIndexUUID, true); + + client().admin().cluster().importDanglingIndex(request).actionGet(); + + assertTrue("Expected dangling index " + INDEX_NAME + " to be recovered", indexExists(INDEX_NAME)); + } + + /** + * Check that the when sending an import-dangling-indices request, the specified UUIDs are validated as + * being dangling. + */ + public void testDanglingIndicesMustExistToBeImported() { + internalCluster().startNodes(1, buildSettings(0, true, false)); + + final ImportDanglingIndexRequest request = new ImportDanglingIndexRequest("NonExistentUUID", true); + + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> client().admin().cluster().importDanglingIndex(request).actionGet() + ); + + assertThat(e.getMessage(), containsString("No dangling index found for UUID [NonExistentUUID]")); + } + + /** + * Check that a dangling index can only be imported if "accept_data_loss" is set to true. + */ + public void testMustAcceptDataLossToImportDanglingIndex() throws Exception { + internalCluster().startNodes(3, buildSettings(0, true, false)); + + final String stoppedNodeName = createDanglingIndices(INDEX_NAME); + final String danglingIndexUUID = findDanglingIndexForNode(stoppedNodeName, INDEX_NAME); + + final ImportDanglingIndexRequest request = new ImportDanglingIndexRequest(danglingIndexUUID, false); + + Exception e = expectThrows(Exception.class, () -> client().admin().cluster().importDanglingIndex(request).actionGet()); + + assertThat(e.getMessage(), containsString("accept_data_loss must be set to true")); + } + + /** + * Check that dangling indices can be deleted. Since this requires that + * we add an entry to the index graveyard, the graveyard size must be + * greater than 1. To test deletes, we set the index graveyard size to + * 1, then create two indices and delete them both while one node in + * the cluster is stopped. The deletion of the second pushes the deletion + * of the first out of the graveyard. When the stopped node is resumed, + * only the second index will be found into the graveyard and the the + * other will be considered dangling, and can therefore be listed and + * deleted through the API + */ + public void testDanglingIndexCanBeDeleted() throws Exception { + final Settings settings = buildSettings(1, true, false); + internalCluster().startNodes(3, settings); + + final String stoppedNodeName = createDanglingIndices(INDEX_NAME, OTHER_INDEX_NAME); + final String danglingIndexUUID = findDanglingIndexForNode(stoppedNodeName, INDEX_NAME); + + client().admin().cluster().deleteDanglingIndex(new DeleteDanglingIndexRequest(danglingIndexUUID, true)).actionGet(); + + // The dangling index that we deleted ought to have been removed from disk. Check by + // creating and deleting another index, which creates a new tombstone entry, which should + // not cause the deleted dangling index to be considered "live" again, just because its + // tombstone has been pushed out of the graveyard. + createIndex("additional"); + assertAcked(client().admin().indices().prepareDelete("additional")); + assertThat(listDanglingIndices(), is(empty())); + } + + /** + * Check that when dangling index auto imports are enabled, and a dangling index is is discovered + * but cannot be imported due to a name clash with an existing index, then that dangling index can + * still be deleted through the API. + */ + public void testDanglingIndexCanBeDeletedWhenAutoImportEnabled() throws Exception { + final Settings settings = buildSettings(1, true, true); + internalCluster().startNodes(3, settings); + + createIndices(INDEX_NAME, OTHER_INDEX_NAME); + ensurePendingDanglingIndicesWritten(); + + AtomicReference stoppedNodeName = new AtomicReference<>(); + + // Restart node, deleting the indices in its absence, so that there is a dangling index to recover + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + internalCluster().validateClusterFormed(); + stoppedNodeName.set(nodeName); + + assertAcked(client().admin().indices().prepareDelete(INDEX_NAME)); + assertAcked(client().admin().indices().prepareDelete(OTHER_INDEX_NAME)); + + // Create another index with the same name, which will prevent the dangling + // index from being restored, and also ensures that we are checking index + // existence by UUID, not name. + // + // Note: don't call `createIndices()` here as it calls `ensureGreen()`, which will + // fail while a node is offline + createIndex(INDEX_NAME); + ensurePendingDanglingIndicesWritten(); + + return super.onNodeStopped(nodeName); + } + }); + + final String danglingIndexUUID = findDanglingIndexForNode(stoppedNodeName.get(), INDEX_NAME); + + client().admin().cluster().deleteDanglingIndex(new DeleteDanglingIndexRequest(danglingIndexUUID, true)).actionGet(); + + // The dangling index that we deleted ought to have been removed from disk. Check by + // creating and deleting another index, which creates a new tombstone entry, which should + // not cause the deleted dangling index to be considered "live" again, just because its + // tombstone has been pushed out of the graveyard. + createIndex("additional"); + assertAcked(client().admin().indices().prepareDelete("additional")); + assertThat(listDanglingIndices(), is(empty())); + } + + /** + * Check that when a index is found to be dangling on more than one node, it can be deleted. + */ + public void testDanglingIndexOverMultipleNodesCanBeDeleted() throws Exception { + final Settings settings = buildSettings(1, true, false); + internalCluster().startNodes(3, settings); + + createIndices(INDEX_NAME, OTHER_INDEX_NAME); + + ensurePendingDanglingIndicesWritten(); + + // Restart 2 nodes, deleting the indices in their absence, so that there is a dangling index to recover + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + internalCluster().validateClusterFormed(); + assertAcked(client().admin().indices().prepareDelete(INDEX_NAME)); + assertAcked(client().admin().indices().prepareDelete(OTHER_INDEX_NAME)); + return super.onNodeStopped(nodeName); + } + }); + + return super.onNodeStopped(nodeName); + } + }); + + final AtomicReference> danglingIndices = new AtomicReference<>(); + + final List results = listDanglingIndices(); + + // Both the stopped nodes should have found a dangling index. + assertThat(results, hasSize(2)); + danglingIndices.set(results); + + // Try to delete the index - this request should succeed + client().admin() + .cluster() + .deleteDanglingIndex(new DeleteDanglingIndexRequest(danglingIndices.get().get(0).getIndexUUID(), true)) + .actionGet(); + + // The dangling index that we deleted ought to have been removed from disk. Check by + // creating and deleting another index, which creates a new tombstone entry, which should + // not cause the deleted dangling index to be considered "live" again, just because its + // tombstone has been pushed out of the graveyard. + createIndex("additional"); + assertAcked(client().admin().indices().prepareDelete("additional")); + assertBusy(() -> assertThat(listDanglingIndices(), empty())); + } + + /** + * Check that when deleting a dangling index, it is required that the "accept_data_loss" flag is set. + */ + public void testDeleteDanglingIndicesRequiresDataLossFlagToBeTrue() throws Exception { + final Settings settings = buildSettings(1, true, false); + internalCluster().startNodes(3, settings); + + final String stoppedNodeName = createDanglingIndices(INDEX_NAME, OTHER_INDEX_NAME); + final String danglingIndexUUID = findDanglingIndexForNode(stoppedNodeName, INDEX_NAME); + + Exception e = expectThrows( + Exception.class, + () -> client().admin().cluster().deleteDanglingIndex(new DeleteDanglingIndexRequest(danglingIndexUUID, false)).actionGet() + ); + + assertThat(e.getMessage(), containsString("accept_data_loss must be set to true")); + } + + /** + * Helper that fetches the current list of dangling indices. + */ + private List listDanglingIndices() { + final ListDanglingIndicesResponse response = client().admin() + .cluster() + .listDanglingIndices(new ListDanglingIndicesRequest()) + .actionGet(); + assertThat(response.status(), equalTo(RestStatus.OK)); + + final List nodeResponses = response.getNodes(); + + final List results = new ArrayList<>(); + + for (NodeListDanglingIndicesResponse nodeResponse : nodeResponses) { + results.addAll(nodeResponse.getDanglingIndices()); + } + + return results; + } + + /** + * Simple helper that creates one or more indices, and importantly, + * checks that they are green before proceeding. This is important + * because the tests in this class stop and restart nodes, assuming + * that each index has a primary or replica shard on every node, and if + * a node is stopped prematurely, this assumption is broken. + */ + private void createIndices(String... indices) { + assert indices.length > 0; + for (String index : indices) { + createIndex(index, Settings.builder().put("number_of_replicas", 2).put("routing.allocation.total_shards_per_node", 1).build()); + } + ensureGreen(indices); + } + + /** + * Creates a number of dangling indices by first creating then, then stopping a data node + * and deleting the indices while the node is stopped. + * @param indices the indices to create and delete + * @return the name of the stopped node + */ + private String createDanglingIndices(String... indices) throws Exception { + createIndices(indices); + + ensurePendingDanglingIndicesWritten(); + + AtomicReference stoppedNodeName = new AtomicReference<>(); + + // Restart node, deleting the indices in its absence, so that there is a dangling index to recover + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + internalCluster().validateClusterFormed(); + stoppedNodeName.set(nodeName); + for (String index : indices) { + assertAcked(client().admin().indices().prepareDelete(index)); + } + return super.onNodeStopped(nodeName); + } + }); + + return stoppedNodeName.get(); + } + + private String findDanglingIndexForNode(String stoppedNodeName, String indexName) { + String danglingIndexUUID = null; + + final ListDanglingIndicesResponse response = client().admin() + .cluster() + .listDanglingIndices(new ListDanglingIndicesRequest()) + .actionGet(); + assertThat(response.status(), equalTo(RestStatus.OK)); + + final List nodeResponses = response.getNodes(); + + for (NodeListDanglingIndicesResponse nodeResponse : nodeResponses) { + if (nodeResponse.getNode().getName().equals(stoppedNodeName)) { + final DanglingIndexInfo danglingIndexInfo = nodeResponse.getDanglingIndices().get(0); + assertThat(danglingIndexInfo.getIndexName(), equalTo(indexName)); + + danglingIndexUUID = danglingIndexInfo.getIndexUUID(); + break; + } + } + + assertNotNull("Failed to find a dangling index UUID for node [" + stoppedNodeName + "]", danglingIndexUUID); + + return danglingIndexUUID; + } } diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 15a1ea55408..bbbde24672b 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -107,6 +107,14 @@ import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; import org.elasticsearch.action.admin.indices.create.AutoCreateAction; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; +import org.elasticsearch.action.admin.indices.dangling.delete.DeleteDanglingIndexAction; +import org.elasticsearch.action.admin.indices.dangling.delete.TransportDeleteDanglingIndexAction; +import org.elasticsearch.action.admin.indices.dangling.find.FindDanglingIndexAction; +import org.elasticsearch.action.admin.indices.dangling.find.TransportFindDanglingIndexAction; +import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDanglingIndexAction; +import org.elasticsearch.action.admin.indices.dangling.import_index.TransportImportDanglingIndexAction; +import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesAction; +import org.elasticsearch.action.admin.indices.dangling.list.TransportListDanglingIndicesAction; import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction; @@ -289,6 +297,9 @@ import org.elasticsearch.rest.action.admin.cluster.RestRemoteClusterInfoAction; import org.elasticsearch.rest.action.admin.cluster.RestRestoreSnapshotAction; import org.elasticsearch.rest.action.admin.cluster.RestSnapshotsStatusAction; import org.elasticsearch.rest.action.admin.cluster.RestVerifyRepositoryAction; +import org.elasticsearch.rest.action.admin.cluster.dangling.RestDeleteDanglingIndexAction; +import org.elasticsearch.rest.action.admin.cluster.dangling.RestImportDanglingIndexAction; +import org.elasticsearch.rest.action.admin.cluster.dangling.RestListDanglingIndicesAction; import org.elasticsearch.rest.action.admin.indices.RestAnalyzeAction; import org.elasticsearch.rest.action.admin.indices.RestClearIndicesCacheAction; import org.elasticsearch.rest.action.admin.indices.RestCloseIndexAction; @@ -625,6 +636,12 @@ public class ActionModule extends AbstractModule { actions.register(RetentionLeaseActions.Renew.INSTANCE, RetentionLeaseActions.Renew.TransportAction.class); actions.register(RetentionLeaseActions.Remove.INSTANCE, RetentionLeaseActions.Remove.TransportAction.class); + // Dangling indices + actions.register(ListDanglingIndicesAction.INSTANCE, TransportListDanglingIndicesAction.class); + actions.register(ImportDanglingIndexAction.INSTANCE, TransportImportDanglingIndexAction.class); + actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class); + actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class); + return unmodifiableMap(actions.getRegistry()); } @@ -759,6 +776,11 @@ public class ActionModule extends AbstractModule { registerHandler.accept(new RestDeletePipelineAction()); registerHandler.accept(new RestSimulatePipelineAction()); + // Dangling indices API + registerHandler.accept(new RestListDanglingIndicesAction()); + registerHandler.accept(new RestImportDanglingIndexAction()); + registerHandler.accept(new RestDeleteDanglingIndexAction()); + // Data Stream API if (DATASTREAMS_FEATURE_ENABLED) { registerHandler.accept(new RestCreateDataStreamAction()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/DanglingIndexInfo.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/DanglingIndexInfo.java new file mode 100644 index 00000000000..f1df0ce54c0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/DanglingIndexInfo.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; + +/** + * Contains information about a dangling index, i.e. an index that Elasticsearch has found + * on-disk but is not present in the cluster state. + */ +public class DanglingIndexInfo implements Writeable { + private final String nodeId; + private final String indexName; + private final String indexUUID; + private final long creationDateMillis; + + public DanglingIndexInfo(String nodeId, String indexName, String indexUUID, long creationDateMillis) { + this.nodeId = nodeId; + this.indexName = indexName; + this.indexUUID = indexUUID; + this.creationDateMillis = creationDateMillis; + } + + public DanglingIndexInfo(StreamInput in) throws IOException { + this.nodeId = in.readString(); + this.indexName = in.readString(); + this.indexUUID = in.readString(); + this.creationDateMillis = in.readLong(); + } + + public String getIndexName() { + return indexName; + } + + public String getIndexUUID() { + return indexUUID; + } + + public String getNodeId() { + return this.nodeId; + } + + public long getCreationDateMillis() { + return creationDateMillis; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(this.nodeId); + out.writeString(this.indexName); + out.writeString(this.indexUUID); + out.writeLong(this.creationDateMillis); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/delete/DeleteDanglingIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/delete/DeleteDanglingIndexAction.java new file mode 100644 index 00000000000..ab0b4d12de3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/delete/DeleteDanglingIndexAction.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling.delete; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedResponse; + +/** + * This action causes a dangling index to be considered as deleted by the cluster. + */ +public class DeleteDanglingIndexAction extends ActionType { + + public static final DeleteDanglingIndexAction INSTANCE = new DeleteDanglingIndexAction(); + public static final String NAME = "cluster:admin/indices/dangling/delete"; + + private DeleteDanglingIndexAction() { + super(NAME, AcknowledgedResponse::new); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/delete/DeleteDanglingIndexRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/delete/DeleteDanglingIndexRequest.java new file mode 100644 index 00000000000..f8952c6da68 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/delete/DeleteDanglingIndexRequest.java @@ -0,0 +1,74 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling.delete; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +/** + * Represents a request to delete a particular dangling index, specified by its UUID. The {@link #acceptDataLoss} + * flag must also be explicitly set to true, or later validation will fail. + */ +public class DeleteDanglingIndexRequest extends AcknowledgedRequest { + private final String indexUUID; + private final boolean acceptDataLoss; + + public DeleteDanglingIndexRequest(StreamInput in) throws IOException { + super(in); + this.indexUUID = in.readString(); + this.acceptDataLoss = in.readBoolean(); + } + + public DeleteDanglingIndexRequest(String indexUUID, boolean acceptDataLoss) { + super(); + this.indexUUID = Objects.requireNonNull(indexUUID, "indexUUID cannot be null"); + this.acceptDataLoss = acceptDataLoss; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public String getIndexUUID() { + return indexUUID; + } + + public boolean isAcceptDataLoss() { + return acceptDataLoss; + } + + @Override + public String toString() { + return "DeleteDanglingIndexRequest{" + "indexUUID='" + indexUUID + "', acceptDataLoss=" + acceptDataLoss + '}'; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(this.indexUUID); + out.writeBoolean(this.acceptDataLoss); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java new file mode 100644 index 00000000000..18161cc66bd --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java @@ -0,0 +1,240 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling.delete; + +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.admin.indices.dangling.DanglingIndexInfo; +import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesAction; +import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest; +import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesResponse; +import org.elasticsearch.action.admin.indices.dangling.list.NodeListDanglingIndicesResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.IndexGraveyard; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Implements the deletion of a dangling index. When handling a {@link DeleteDanglingIndexAction}, + * this class first checks that such a dangling index exists. It then submits a cluster state update + * to add the index to the index graveyard. + */ +public class TransportDeleteDanglingIndexAction extends TransportMasterNodeAction { + private static final Logger logger = LogManager.getLogger(TransportDeleteDanglingIndexAction.class); + + private final Settings settings; + private final NodeClient nodeClient; + + @Inject + public TransportDeleteDanglingIndexAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + Settings settings, + NodeClient nodeClient + ) { + super( + DeleteDanglingIndexAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + DeleteDanglingIndexRequest::new, + indexNameExpressionResolver + ); + this.settings = settings; + this.nodeClient = nodeClient; + } + + @Override + protected String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + protected AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + + @Override + protected void masterOperation( + DeleteDanglingIndexRequest deleteRequest, + ClusterState state, + ActionListener deleteListener + ) throws Exception { + findDanglingIndex(deleteRequest.getIndexUUID(), new ActionListener() { + @Override + public void onResponse(Index indexToDelete) { + // This flag is checked at this point so that we always check that the supplied index ID + // does correspond to a dangling index. + if (deleteRequest.isAcceptDataLoss() == false) { + deleteListener.onFailure(new IllegalArgumentException("accept_data_loss must be set to true")); + return; + } + + String indexName = indexToDelete.getName(); + String indexUUID = indexToDelete.getUUID(); + + final ActionListener clusterStateUpdatedListener = new ActionListener() { + @Override + public void onResponse(AcknowledgedResponse response) { + deleteListener.onResponse(response); + } + + @Override + public void onFailure(Exception e) { + logger.debug("Failed to delete dangling index [" + indexName + "] [" + indexUUID + "]", e); + deleteListener.onFailure(e); + } + }; + + final String taskSource = "delete-dangling-index [" + indexName + "] [" + indexUUID + "]"; + + clusterService.submitStateUpdateTask( + taskSource, + new AckedClusterStateUpdateTask(deleteRequest, clusterStateUpdatedListener) { + + @Override + protected AcknowledgedResponse newResponse(boolean acknowledged) { + return new AcknowledgedResponse(acknowledged); + } + + @Override + public ClusterState execute(final ClusterState currentState) { + return deleteDanglingIndex(currentState, indexToDelete); + } + } + ); + } + + @Override + public void onFailure(Exception e) { + logger.debug("Failed to find dangling index [" + deleteRequest.getIndexUUID() + "]", e); + deleteListener.onFailure(e); + } + }); + } + + private ClusterState deleteDanglingIndex(ClusterState currentState, Index indexToDelete) { + final Metadata metaData = currentState.getMetadata(); + + for (ObjectObjectCursor each : metaData.indices()) { + if (indexToDelete.getUUID().equals(each.value.getIndexUUID())) { + throw new IllegalArgumentException( + "Refusing to delete dangling index " + + indexToDelete + + " as an index with UUID [" + + indexToDelete.getUUID() + + "] already exists in the cluster state" + ); + } + } + + // By definition, a dangling index is an index not present in the cluster state and with no tombstone, + // so we shouldn't reach this point if these conditions aren't met. For super-safety, however, check + // that a tombstone doesn't already exist for this index. + if (metaData.indexGraveyard().containsIndex(indexToDelete)) { + return currentState; + } + + Metadata.Builder metaDataBuilder = Metadata.builder(metaData); + + final IndexGraveyard newGraveyard = IndexGraveyard.builder(metaDataBuilder.indexGraveyard()) + .addTombstone(indexToDelete) + .build(settings); + metaDataBuilder.indexGraveyard(newGraveyard); + + return ClusterState.builder(currentState).metadata(metaDataBuilder.build()).build(); + } + + @Override + protected ClusterBlockException checkBlock(DeleteDanglingIndexRequest request, ClusterState state) { + return null; + } + + private void findDanglingIndex(String indexUUID, ActionListener listener) { + this.nodeClient.execute( + ListDanglingIndicesAction.INSTANCE, + new ListDanglingIndicesRequest(indexUUID), + new ActionListener() { + @Override + public void onResponse(ListDanglingIndicesResponse response) { + if (response.hasFailures()) { + final String nodeIds = response.failures() + .stream() + .map(FailedNodeException::nodeId) + .collect(Collectors.joining(",")); + ElasticsearchException e = new ElasticsearchException("Failed to query nodes [" + nodeIds + "]"); + + for (FailedNodeException failure : response.failures()) { + logger.error("Failed to query node [" + failure.nodeId() + "]", failure); + e.addSuppressed(failure); + } + + listener.onFailure(e); + return; + } + + final List nodes = response.getNodes(); + + for (NodeListDanglingIndicesResponse nodeResponse : nodes) { + for (DanglingIndexInfo each : nodeResponse.getDanglingIndices()) { + if (each.getIndexUUID().equals(indexUUID)) { + listener.onResponse(new Index(each.getIndexName(), each.getIndexUUID())); + return; + } + } + } + + listener.onFailure(new IllegalArgumentException("No dangling index found for UUID [" + indexUUID + "]")); + } + + @Override + public void onFailure(Exception exp) { + listener.onFailure(exp); + } + } + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexAction.java new file mode 100644 index 00000000000..faf07cf3f4b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexAction.java @@ -0,0 +1,35 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling.find; + +import org.elasticsearch.action.ActionType; + +/** + * Represents a request to find a particular dangling index by UUID. + */ +public class FindDanglingIndexAction extends ActionType { + + public static final FindDanglingIndexAction INSTANCE = new FindDanglingIndexAction(); + public static final String NAME = "cluster:admin/indices/dangling/find"; + + private FindDanglingIndexAction() { + super(NAME, FindDanglingIndexResponse::new); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexRequest.java new file mode 100644 index 00000000000..12c2d58eb43 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexRequest.java @@ -0,0 +1,56 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling.find; + +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class FindDanglingIndexRequest extends BaseNodesRequest { + private final String indexUUID; + + public FindDanglingIndexRequest(StreamInput in) throws IOException { + super(in); + this.indexUUID = in.readString(); + } + + public FindDanglingIndexRequest(String indexUUID) { + super(Strings.EMPTY_ARRAY); + this.indexUUID = indexUUID; + } + + public String getIndexUUID() { + return indexUUID; + } + + @Override + public String toString() { + return "FindDanglingIndicesRequest{indexUUID='" + indexUUID + "'}"; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(this.indexUUID); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexResponse.java new file mode 100644 index 00000000000..3460acf50cf --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/FindDanglingIndexResponse.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling.find; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.List; + +/** + * Models a response to a {@link FindDanglingIndexRequest}. A find request queries every node in the + * cluster looking for a dangling index with a specific UUID. + */ +public class FindDanglingIndexResponse extends BaseNodesResponse { + + public FindDanglingIndexResponse(StreamInput in) throws IOException { + super(in); + } + + public FindDanglingIndexResponse( + ClusterName clusterName, + List nodes, + List failures + ) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(NodeFindDanglingIndexResponse::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/NodeFindDanglingIndexRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/NodeFindDanglingIndexRequest.java new file mode 100644 index 00000000000..6ea5e6a156a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/NodeFindDanglingIndexRequest.java @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling.find; + +import java.io.IOException; + +import org.elasticsearch.action.support.nodes.BaseNodeRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +/** + * Used when querying every node in the cluster for a specific dangling index. + */ +public class NodeFindDanglingIndexRequest extends BaseNodeRequest { + private final String indexUUID; + + public NodeFindDanglingIndexRequest(String indexUUID) { + this.indexUUID = indexUUID; + } + + public NodeFindDanglingIndexRequest(StreamInput in) throws IOException { + super(in); + this.indexUUID = in.readString(); + } + + public String getIndexUUID() { + return indexUUID; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(this.indexUUID); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/NodeFindDanglingIndexResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/NodeFindDanglingIndexResponse.java new file mode 100644 index 00000000000..65b1a9a37a5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/NodeFindDanglingIndexResponse.java @@ -0,0 +1,62 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling.find; + +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.List; + +/** + * Used when querying every node in the cluster for a specific dangling index. + */ +public class NodeFindDanglingIndexResponse extends BaseNodeResponse { + /** + * A node could report several dangling indices. This class will contain them all. + * A single node could even multiple different index versions for the same index + * UUID if the situation is really crazy, though perhaps this is more likely + * when collating responses from different nodes. + */ + private final List danglingIndexInfo; + + public List getDanglingIndexInfo() { + return this.danglingIndexInfo; + } + + public NodeFindDanglingIndexResponse(DiscoveryNode node, List danglingIndexInfo) { + super(node); + this.danglingIndexInfo = danglingIndexInfo; + } + + protected NodeFindDanglingIndexResponse(StreamInput in) throws IOException { + super(in); + this.danglingIndexInfo = in.readList(IndexMetadata::readFrom); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(this.danglingIndexInfo); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/TransportFindDanglingIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/TransportFindDanglingIndexAction.java new file mode 100644 index 00000000000..42326c136e0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/find/TransportFindDanglingIndexAction.java @@ -0,0 +1,107 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling.find; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.gateway.DanglingIndicesState; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +/** + * Finds a specified dangling index by its UUID, searching across all nodes. + */ +public class TransportFindDanglingIndexAction extends TransportNodesAction< + FindDanglingIndexRequest, + FindDanglingIndexResponse, + NodeFindDanglingIndexRequest, + NodeFindDanglingIndexResponse> { + + private final TransportService transportService; + private final DanglingIndicesState danglingIndicesState; + + @Inject + public TransportFindDanglingIndexAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + DanglingIndicesState danglingIndicesState + ) { + super( + FindDanglingIndexAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + FindDanglingIndexRequest::new, + NodeFindDanglingIndexRequest::new, + ThreadPool.Names.MANAGEMENT, + NodeFindDanglingIndexResponse.class + ); + this.transportService = transportService; + this.danglingIndicesState = danglingIndicesState; + } + + @Override + protected FindDanglingIndexResponse newResponse( + FindDanglingIndexRequest request, + List nodeResponses, + List failures + ) { + return new FindDanglingIndexResponse(clusterService.getClusterName(), nodeResponses, failures); + } + + @Override + protected NodeFindDanglingIndexRequest newNodeRequest(FindDanglingIndexRequest request) { + return new NodeFindDanglingIndexRequest(request.getIndexUUID()); + } + + @Override + protected NodeFindDanglingIndexResponse newNodeResponse(StreamInput in) throws IOException { + return new NodeFindDanglingIndexResponse(in); + } + + @Override + protected NodeFindDanglingIndexResponse nodeOperation(NodeFindDanglingIndexRequest request) { + final DiscoveryNode localNode = transportService.getLocalNode(); + final String indexUUID = request.getIndexUUID(); + + final List danglingIndexInfo = new ArrayList<>(); + + for (IndexMetadata each : danglingIndicesState.getDanglingIndices().values()) { + if (each.getIndexUUID().equals(indexUUID)) { + danglingIndexInfo.add(each); + } + } + + return new NodeFindDanglingIndexResponse(localNode, danglingIndexInfo); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/import_index/ImportDanglingIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/import_index/ImportDanglingIndexAction.java new file mode 100644 index 00000000000..d4d022d6abe --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/import_index/ImportDanglingIndexAction.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling.import_index; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedResponse; + +/** + * Represents a request to import a particular dangling index. + */ +public class ImportDanglingIndexAction extends ActionType { + + public static final ImportDanglingIndexAction INSTANCE = new ImportDanglingIndexAction(); + public static final String NAME = "cluster:admin/indices/dangling/import"; + + private ImportDanglingIndexAction() { + super(NAME, AcknowledgedResponse::new); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/import_index/ImportDanglingIndexRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/import_index/ImportDanglingIndexRequest.java new file mode 100644 index 00000000000..06806c00ebe --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/import_index/ImportDanglingIndexRequest.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling.import_index; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Locale; +import java.util.Objects; + +/** + * Represents a request to import a particular dangling index, specified + * by its UUID. The {@link #acceptDataLoss} flag must also be + * explicitly set to true, or later validation will fail. + */ +public class ImportDanglingIndexRequest extends AcknowledgedRequest { + private final String indexUUID; + private final boolean acceptDataLoss; + + public ImportDanglingIndexRequest(StreamInput in) throws IOException { + super(in); + this.indexUUID = in.readString(); + this.acceptDataLoss = in.readBoolean(); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public ImportDanglingIndexRequest(String indexUUID, boolean acceptDataLoss) { + super(); + this.indexUUID = Objects.requireNonNull(indexUUID, "indexUUID cannot be null"); + this.acceptDataLoss = acceptDataLoss; + } + + public String getIndexUUID() { + return indexUUID; + } + + public boolean isAcceptDataLoss() { + return acceptDataLoss; + } + + @Override + public String toString() { + return String.format(Locale.ROOT, "ImportDanglingIndexRequest{indexUUID='%s', acceptDataLoss=%s}", indexUUID, acceptDataLoss); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(this.indexUUID); + out.writeBoolean(this.acceptDataLoss); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/import_index/TransportImportDanglingIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/import_index/TransportImportDanglingIndexAction.java new file mode 100644 index 00000000000..91739e9ec8b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/import_index/TransportImportDanglingIndexAction.java @@ -0,0 +1,163 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling.import_index; + +import static java.util.Collections.singletonList; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.admin.indices.dangling.find.FindDanglingIndexAction; +import org.elasticsearch.action.admin.indices.dangling.find.FindDanglingIndexRequest; +import org.elasticsearch.action.admin.indices.dangling.find.FindDanglingIndexResponse; +import org.elasticsearch.action.admin.indices.dangling.find.NodeFindDanglingIndexResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.gateway.LocalAllocateDangledIndices; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; + +/** + * Implements the import of a dangling index. When handling a {@link ImportDanglingIndexAction}, + * this class first checks that such a dangling index exists. It then calls {@link LocalAllocateDangledIndices} + * to perform the actual allocation. + */ +public class TransportImportDanglingIndexAction extends HandledTransportAction { + private static final Logger logger = LogManager.getLogger(TransportImportDanglingIndexAction.class); + + private final LocalAllocateDangledIndices danglingIndexAllocator; + private final NodeClient nodeClient; + + @Inject + public TransportImportDanglingIndexAction( + ActionFilters actionFilters, + TransportService transportService, + LocalAllocateDangledIndices danglingIndexAllocator, + NodeClient nodeClient + ) { + super(ImportDanglingIndexAction.NAME, transportService, actionFilters, ImportDanglingIndexRequest::new); + this.danglingIndexAllocator = danglingIndexAllocator; + this.nodeClient = nodeClient; + } + + @Override + protected void doExecute(Task task, ImportDanglingIndexRequest importRequest, ActionListener importListener) { + findDanglingIndex(importRequest, new ActionListener() { + @Override + public void onResponse(IndexMetadata indexMetaDataToImport) { + // This flag is checked at this point so that we always check that the supplied index UUID + // does correspond to a dangling index. + if (importRequest.isAcceptDataLoss() == false) { + importListener.onFailure(new IllegalArgumentException("accept_data_loss must be set to true")); + return; + } + + String indexName = indexMetaDataToImport.getIndex().getName(); + String indexUUID = indexMetaDataToImport.getIndexUUID(); + + danglingIndexAllocator.allocateDangled( + singletonList(indexMetaDataToImport), + new ActionListener() { + @Override + public void onResponse(LocalAllocateDangledIndices.AllocateDangledResponse allocateDangledResponse) { + importListener.onResponse(new AcknowledgedResponse(true)); + } + + @Override + public void onFailure(Exception e) { + logger.debug("Failed to import dangling index [" + indexName + "] [" + indexUUID + "]", e); + importListener.onFailure(e); + } + } + ); + } + + @Override + public void onFailure(Exception e) { + logger.debug("Failed to find dangling index [" + importRequest.getIndexUUID() + "]", e); + importListener.onFailure(e); + } + }); + } + + private void findDanglingIndex(ImportDanglingIndexRequest request, ActionListener listener) { + final String indexUUID = request.getIndexUUID(); + + this.nodeClient.execute( + FindDanglingIndexAction.INSTANCE, + new FindDanglingIndexRequest(indexUUID), + new ActionListener() { + @Override + public void onResponse(FindDanglingIndexResponse response) { + if (response.hasFailures()) { + final String nodeIds = response.failures() + .stream() + .map(FailedNodeException::nodeId) + .collect(Collectors.joining(",")); + ElasticsearchException e = new ElasticsearchException("Failed to query nodes [" + nodeIds + "]"); + + for (FailedNodeException failure : response.failures()) { + logger.error("Failed to query node [" + failure.nodeId() + "]", failure); + e.addSuppressed(failure); + } + + listener.onFailure(e); + return; + } + + final List metaDataSortedByVersion = new ArrayList<>(); + for (NodeFindDanglingIndexResponse each : response.getNodes()) { + metaDataSortedByVersion.addAll(each.getDanglingIndexInfo()); + } + metaDataSortedByVersion.sort(Comparator.comparingLong(IndexMetadata::getVersion)); + + if (metaDataSortedByVersion.isEmpty()) { + listener.onFailure(new IllegalArgumentException("No dangling index found for UUID [" + indexUUID + "]")); + return; + } + + logger.debug( + "Metadata versions {} found for index UUID [{}], selecting the highest", + metaDataSortedByVersion.stream().map(IndexMetadata::getVersion).collect(Collectors.toList()), + indexUUID + ); + + listener.onResponse(metaDataSortedByVersion.get(metaDataSortedByVersion.size() - 1)); + } + + @Override + public void onFailure(Exception exp) { + listener.onFailure(exp); + } + } + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesAction.java new file mode 100644 index 00000000000..65b39dc7bd1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesAction.java @@ -0,0 +1,35 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling.list; + +import org.elasticsearch.action.ActionType; + +/** + * Represents a request to list all dangling indices known to the cluster. + */ +public class ListDanglingIndicesAction extends ActionType { + + public static final ListDanglingIndicesAction INSTANCE = new ListDanglingIndicesAction(); + public static final String NAME = "cluster:admin/indices/dangling/list"; + + private ListDanglingIndicesAction() { + super(NAME, ListDanglingIndicesResponse::new); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesRequest.java new file mode 100644 index 00000000000..6d9fa9a5bbb --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesRequest.java @@ -0,0 +1,64 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling.list; + +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class ListDanglingIndicesRequest extends BaseNodesRequest { + /** + * Filter the response by index UUID. Leave as null to find all indices. + */ + private final String indexUUID; + + public ListDanglingIndicesRequest(StreamInput in) throws IOException { + super(in); + this.indexUUID = in.readOptionalString(); + } + + public ListDanglingIndicesRequest() { + super(Strings.EMPTY_ARRAY); + this.indexUUID = null; + } + + public ListDanglingIndicesRequest(String indexUUID) { + super(Strings.EMPTY_ARRAY); + this.indexUUID = indexUUID; + } + + public String getIndexUUID() { + return indexUUID; + } + + @Override + public String toString() { + return "ListDanglingIndicesRequest{indexUUID='" + indexUUID + "'}"; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalString(this.indexUUID); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesResponse.java new file mode 100644 index 00000000000..517fbb61916 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesResponse.java @@ -0,0 +1,168 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling.list; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.admin.indices.dangling.DanglingIndexInfo; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.StatusToXContentObject; +import org.elasticsearch.common.xcontent.XContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; + +/** + * Models a response to a {@link ListDanglingIndicesRequest}. A list request queries every node in the + * cluster and aggregates their responses. When the aggregated response is converted to {@link XContent}, + * information for each dangling index is presented under the "dangling_indices" key. If any nodes + * in the cluster failed to answer, the details are presented under the "_nodes.failures" key. + */ +public class ListDanglingIndicesResponse extends BaseNodesResponse implements StatusToXContentObject { + + public ListDanglingIndicesResponse(StreamInput in) throws IOException { + super(in); + } + + public ListDanglingIndicesResponse( + ClusterName clusterName, + List nodes, + List failures + ) { + super(clusterName, nodes, failures); + } + + @Override + public RestStatus status() { + return this.hasFailures() ? RestStatus.INTERNAL_SERVER_ERROR : RestStatus.OK; + } + + // Visible for testing + static Collection resultsByIndexUUID(List nodes) { + Map byIndexUUID = new HashMap<>(); + + for (NodeListDanglingIndicesResponse nodeResponse : nodes) { + for (DanglingIndexInfo info : nodeResponse.getDanglingIndices()) { + final String indexUUID = info.getIndexUUID(); + + final AggregatedDanglingIndexInfo aggregatedInfo = byIndexUUID.computeIfAbsent( + indexUUID, + (_uuid) -> new AggregatedDanglingIndexInfo(indexUUID, info.getIndexName(), info.getCreationDateMillis()) + ); + + aggregatedInfo.nodeIds.add(info.getNodeId()); + } + } + + return byIndexUUID.values(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray("dangling_indices"); + + for (AggregatedDanglingIndexInfo info : resultsByIndexUUID(this.getNodes())) { + builder.startObject(); + + builder.field("index_name", info.indexName); + builder.field("index_uuid", info.indexUUID); + builder.timeField("creation_date_millis", "creation_date", info.creationDateMillis); + + builder.array("node_ids", info.nodeIds.toArray(new String[0])); + + builder.endObject(); + } + + return builder.endArray(); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(NodeListDanglingIndicesResponse::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + + // visible for testing + static class AggregatedDanglingIndexInfo { + private final String indexUUID; + private final String indexName; + private final long creationDateMillis; + private final List nodeIds; + + AggregatedDanglingIndexInfo(String indexUUID, String indexName, long creationDateMillis) { + this.indexUUID = indexUUID; + this.indexName = indexName; + this.creationDateMillis = creationDateMillis; + this.nodeIds = new ArrayList<>(); + } + + // the methods below are used in the unit tests + + public List getNodeIds() { + return nodeIds; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AggregatedDanglingIndexInfo that = (AggregatedDanglingIndexInfo) o; + return creationDateMillis == that.creationDateMillis + && indexUUID.equals(that.indexUUID) + && indexName.equals(that.indexName) + && nodeIds.equals(that.nodeIds); + } + + @Override + public int hashCode() { + return Objects.hash(indexUUID, indexName, creationDateMillis, nodeIds); + } + + @Override + public String toString() { + return String.format( + Locale.ROOT, + "AggregatedDanglingIndexInfo{indexUUID='%s', indexName='%s', creationDateMillis=%d, nodeIds=%s}", + indexUUID, + indexName, + creationDateMillis, + nodeIds + ); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/NodeListDanglingIndicesRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/NodeListDanglingIndicesRequest.java new file mode 100644 index 00000000000..ef241cbe227 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/NodeListDanglingIndicesRequest.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling.list; + +import org.elasticsearch.action.support.nodes.BaseNodeRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Used when querying every node in the cluster for dangling indices, in response to a list request. + */ +public class NodeListDanglingIndicesRequest extends BaseNodeRequest { + /** + * Filter the response by index UUID. Leave as null to find all indices. + */ + private final String indexUUID; + + public NodeListDanglingIndicesRequest(String indexUUID) { + this.indexUUID = indexUUID; + } + + public NodeListDanglingIndicesRequest(StreamInput in) throws IOException { + super(in); + this.indexUUID = in.readOptionalString(); + } + + public String getIndexUUID() { + return indexUUID; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalString(indexUUID); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/NodeListDanglingIndicesResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/NodeListDanglingIndicesResponse.java new file mode 100644 index 00000000000..83aea53c1cc --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/NodeListDanglingIndicesResponse.java @@ -0,0 +1,56 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling.list; + +import org.elasticsearch.action.admin.indices.dangling.DanglingIndexInfo; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.List; + +/** + * Used when querying every node in the cluster for dangling indices, in response to a list request. + */ +public class NodeListDanglingIndicesResponse extends BaseNodeResponse { + private final List indexMetaData; + + public List getDanglingIndices() { + return this.indexMetaData; + } + + public NodeListDanglingIndicesResponse(DiscoveryNode node, List indexMetaData) { + super(node); + this.indexMetaData = indexMetaData; + } + + protected NodeListDanglingIndicesResponse(StreamInput in) throws IOException { + super(in); + this.indexMetaData = in.readList(DanglingIndexInfo::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(this.indexMetaData); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/TransportListDanglingIndicesAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/TransportListDanglingIndicesAction.java new file mode 100644 index 00000000000..82fbdcb09b6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/list/TransportListDanglingIndicesAction.java @@ -0,0 +1,115 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling.list; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.admin.indices.dangling.DanglingIndexInfo; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.gateway.DanglingIndicesState; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Implements the listing of all dangling indices. All nodes in the cluster are queried, and + * their answers aggregated. Finding dangling indices is performed in {@link DanglingIndicesState}. + */ +public class TransportListDanglingIndicesAction extends TransportNodesAction< + ListDanglingIndicesRequest, + ListDanglingIndicesResponse, + NodeListDanglingIndicesRequest, + NodeListDanglingIndicesResponse> { + private final TransportService transportService; + private final DanglingIndicesState danglingIndicesState; + + @Inject + public TransportListDanglingIndicesAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + DanglingIndicesState danglingIndicesState + ) { + super( + ListDanglingIndicesAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + ListDanglingIndicesRequest::new, + NodeListDanglingIndicesRequest::new, + ThreadPool.Names.MANAGEMENT, + NodeListDanglingIndicesResponse.class + ); + this.transportService = transportService; + this.danglingIndicesState = danglingIndicesState; + } + + @Override + protected ListDanglingIndicesResponse newResponse( + ListDanglingIndicesRequest request, + List nodeListDanglingIndicesResponse, + List failures + ) { + return new ListDanglingIndicesResponse(clusterService.getClusterName(), nodeListDanglingIndicesResponse, failures); + } + + @Override + protected NodeListDanglingIndicesRequest newNodeRequest(ListDanglingIndicesRequest request) { + return new NodeListDanglingIndicesRequest(request.getIndexUUID()); + } + + @Override + protected NodeListDanglingIndicesResponse newNodeResponse(StreamInput in) throws IOException { + return new NodeListDanglingIndicesResponse(in); + } + + @Override + protected NodeListDanglingIndicesResponse nodeOperation(NodeListDanglingIndicesRequest request) { + final DiscoveryNode localNode = transportService.getLocalNode(); + + final List indexMetaData = new ArrayList<>(); + + final String indexFilter = request.getIndexUUID(); + + for (IndexMetadata each : danglingIndicesState.getDanglingIndices().values()) { + if (indexFilter == null || indexFilter.equals(each.getIndexUUID())) { + DanglingIndexInfo danglingIndexInfo = new DanglingIndexInfo( + localNode.getId(), + each.getIndex().getName(), + each.getIndexUUID(), + each.getCreationDate() + ); + indexMetaData.add(danglingIndexInfo); + } + } + + return new NodeListDanglingIndicesResponse(localNode, indexMetaData); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/package-info.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/package-info.java new file mode 100644 index 00000000000..60626f11729 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/package-info.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Dangling indices are indices that exist on disk on one or more nodes but + * which do not currently exist in the cluster state. They arise in a + * number of situations, such as: + * + *

    + *
  • A user overflows the index graveyard by deleting more than 500 indices while a node is offline and then the node rejoins the + * cluster
  • + *
  • A node (unsafely) moves from one cluster to another, perhaps because the original cluster lost all its master nodes
  • + *
  • A user (unsafely) meddles with the contents of the data path, maybe restoring an old index folder from a backup
  • + *
  • A disk partially fails and the user has no replicas and no snapshots and wants to (unsafely) recover whatever they can
  • + *
  • A cluster loses all master nodes and those are (unsafely) restored from backup, but the backup does not contain the index
  • + *
+ * + *

The classes in this package form an API for managing dangling indices, allowing them to be listed, imported or deleted. + */ +package org.elasticsearch.action.admin.indices.dangling; diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index 6f460069564..4264c28cc6f 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -91,7 +91,7 @@ public abstract class TransportNodesAction nodesResponses) { final List responses = new ArrayList<>(); final List failures = new ArrayList<>(); diff --git a/server/src/main/java/org/elasticsearch/client/ClusterAdminClient.java b/server/src/main/java/org/elasticsearch/client/ClusterAdminClient.java index 19e876eebcd..cd9adabaf27 100644 --- a/server/src/main/java/org/elasticsearch/client/ClusterAdminClient.java +++ b/server/src/main/java/org/elasticsearch/client/ClusterAdminClient.java @@ -101,6 +101,10 @@ import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptReque import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest; import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder; import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; +import org.elasticsearch.action.admin.indices.dangling.delete.DeleteDanglingIndexRequest; +import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDanglingIndexRequest; +import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest; +import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesResponse; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.DeletePipelineRequestBuilder; import org.elasticsearch.action.ingest.GetPipelineRequest; @@ -718,4 +722,34 @@ public interface ClusterAdminClient extends ElasticsearchClient { * Get a script from the cluster state */ ActionFuture getStoredScript(GetStoredScriptRequest request); + + /** + * List dangling indices on all nodes. + */ + void listDanglingIndices(ListDanglingIndicesRequest request, ActionListener listener); + + /** + * List dangling indices on all nodes. + */ + ActionFuture listDanglingIndices(ListDanglingIndicesRequest request); + + /** + * Restore specified dangling indices. + */ + void importDanglingIndex(ImportDanglingIndexRequest request, ActionListener listener); + + /** + * Restore specified dangling indices. + */ + ActionFuture importDanglingIndex(ImportDanglingIndexRequest request); + + /** + * Delete specified dangling indices. + */ + void deleteDanglingIndex(DeleteDanglingIndexRequest request, ActionListener listener); + + /** + * Delete specified dangling indices. + */ + ActionFuture deleteDanglingIndex(DeleteDanglingIndexRequest request); } diff --git a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java index fcf05817ca8..286b62c42ac 100644 --- a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -21,11 +21,11 @@ package org.elasticsearch.client.support; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainAction; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequestBuilder; @@ -162,6 +162,13 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.dangling.delete.DeleteDanglingIndexAction; +import org.elasticsearch.action.admin.indices.dangling.delete.DeleteDanglingIndexRequest; +import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDanglingIndexAction; +import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDanglingIndexRequest; +import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesAction; +import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest; +import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; @@ -1168,6 +1175,36 @@ public abstract class AbstractClient implements Client { execute(GetStoredScriptAction.INSTANCE, request, listener); } + @Override + public ActionFuture listDanglingIndices(ListDanglingIndicesRequest request) { + return execute(ListDanglingIndicesAction.INSTANCE, request); + } + + @Override + public void listDanglingIndices(ListDanglingIndicesRequest request, ActionListener listener) { + execute(ListDanglingIndicesAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture importDanglingIndex(ImportDanglingIndexRequest request) { + return execute(ImportDanglingIndexAction.INSTANCE, request); + } + + @Override + public void importDanglingIndex(ImportDanglingIndexRequest request, ActionListener listener) { + execute(ImportDanglingIndexAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture deleteDanglingIndex(DeleteDanglingIndexRequest request) { + return execute(DeleteDanglingIndexAction.INSTANCE, request); + } + + @Override + public void deleteDanglingIndex(DeleteDanglingIndexRequest request, ActionListener listener) { + execute(DeleteDanglingIndexAction.INSTANCE, request, listener); + } + @Override public GetStoredScriptRequestBuilder prepareGetStoredScript() { return new GetStoredScriptRequestBuilder(this, GetStoredScriptAction.INSTANCE); diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java b/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java index 1262b0873e1..e38801847ad 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java @@ -21,8 +21,9 @@ package org.elasticsearch.cluster; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; -import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexGraveyard; +import org.elasticsearch.cluster.metadata.IndexGraveyard.IndexGraveyardDiff; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -256,18 +257,43 @@ public class ClusterChangedEvent { if (metadataChanged() == false || isNewCluster()) { return Collections.emptyList(); } - List deleted = null; - for (ObjectCursor cursor : previousState.metadata().indices().values()) { + Set deleted = null; + final Metadata previousMetadata = previousState.metadata(); + final Metadata currentMetadata = state.metadata(); + + for (ObjectCursor cursor : previousMetadata.indices().values()) { IndexMetadata index = cursor.value; - IndexMetadata current = state.metadata().index(index.getIndex()); + IndexMetadata current = currentMetadata.index(index.getIndex()); if (current == null) { if (deleted == null) { - deleted = new ArrayList<>(); + deleted = new HashSet<>(); } deleted.add(index.getIndex()); } } - return deleted == null ? Collections.emptyList() : deleted; + + final IndexGraveyard currentGraveyard = currentMetadata.indexGraveyard(); + final IndexGraveyard previousGraveyard = previousMetadata.indexGraveyard(); + + // Look for new entries in the index graveyard, where there's no corresponding index in the + // previous metadata. This indicates that a dangling index has been explicitly deleted, so + // each node should make sure to delete any related data. + if (currentGraveyard != previousGraveyard) { + final IndexGraveyardDiff indexGraveyardDiff = (IndexGraveyardDiff) currentGraveyard.diff(previousGraveyard); + + final List added = indexGraveyardDiff.getAdded(); + + if (added.isEmpty() == false) { + if (deleted == null) { + deleted = new HashSet<>(); + } + for (IndexGraveyard.Tombstone tombstone : added) { + deleted.add(tombstone.getIndex()); + } + } + } + + return deleted == null ? Collections.emptyList() : new ArrayList<>(deleted); } private List indicesDeletedFromTombstones() { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index 8e9b00813bc..cf0c923fd13 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -682,6 +682,11 @@ public class Metadata implements Iterable, Diffable, To return indices.containsKey(index); } + public boolean hasIndex(Index index) { + IndexMetadata metadata = index(index.getName()); + return metadata != null && metadata.getIndexUUID().equals(index.getUUID()); + } + public boolean hasConcreteIndex(String index) { return getIndicesLookup().containsKey(index); } diff --git a/server/src/main/java/org/elasticsearch/common/util/CollectionUtils.java b/server/src/main/java/org/elasticsearch/common/util/CollectionUtils.java index 6e477f0d412..9b28d6c6c85 100644 --- a/server/src/main/java/org/elasticsearch/common/util/CollectionUtils.java +++ b/server/src/main/java/org/elasticsearch/common/util/CollectionUtils.java @@ -20,7 +20,6 @@ package org.elasticsearch.common.util; import com.carrotsearch.hppc.ObjectArrayList; - import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefArray; import org.apache.lucene.util.BytesRefBuilder; diff --git a/server/src/main/java/org/elasticsearch/gateway/DanglingIndicesState.java b/server/src/main/java/org/elasticsearch/gateway/DanglingIndicesState.java index 686983de6da..49698acb4ef 100644 --- a/server/src/main/java/org/elasticsearch/gateway/DanglingIndicesState.java +++ b/server/src/main/java/org/elasticsearch/gateway/DanglingIndicesState.java @@ -37,13 +37,11 @@ import org.elasticsearch.index.Index; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; @@ -57,6 +55,13 @@ public class DanglingIndicesState implements ClusterStateListener { private static final Logger logger = LogManager.getLogger(DanglingIndicesState.class); + /** + * Controls whether dangling indices should be automatically detected and imported into the cluster + * state upon discovery. This setting is deprecated - use the _dangling API instead. + * If disabled, dangling indices will not be automatically detected. + * + * @see org.elasticsearch.action.admin.indices.dangling + */ public static final Setting AUTO_IMPORT_DANGLING_INDICES_SETTING = Setting.boolSetting( "gateway.auto_import_dangling_indices", true, @@ -66,24 +71,29 @@ public class DanglingIndicesState implements ClusterStateListener { private final NodeEnvironment nodeEnv; private final MetaStateService metaStateService; - private final LocalAllocateDangledIndices allocateDangledIndices; + private final LocalAllocateDangledIndices danglingIndicesAllocator; private final boolean isAutoImportDanglingIndicesEnabled; + private final ClusterService clusterService; private final Map danglingIndices = ConcurrentCollections.newConcurrentMap(); @Inject public DanglingIndicesState(NodeEnvironment nodeEnv, MetaStateService metaStateService, - LocalAllocateDangledIndices allocateDangledIndices, ClusterService clusterService) { + LocalAllocateDangledIndices danglingIndicesAllocator, ClusterService clusterService) { this.nodeEnv = nodeEnv; this.metaStateService = metaStateService; - this.allocateDangledIndices = allocateDangledIndices; + this.danglingIndicesAllocator = danglingIndicesAllocator; + this.clusterService = clusterService; this.isAutoImportDanglingIndicesEnabled = AUTO_IMPORT_DANGLING_INDICES_SETTING.get(clusterService.getSettings()); if (this.isAutoImportDanglingIndicesEnabled) { clusterService.addListener(this); } else { - logger.warn(AUTO_IMPORT_DANGLING_INDICES_SETTING.getKey() + " is disabled, dangling indices will not be detected or imported"); + logger.warn( + AUTO_IMPORT_DANGLING_INDICES_SETTING.getKey() + + " is disabled, dangling indices will not be automatically detected or imported and must be managed manually" + ); } } @@ -96,20 +106,28 @@ public class DanglingIndicesState implements ClusterStateListener { * new dangling indices, and allocating outstanding ones. */ public void processDanglingIndices(final Metadata metadata) { + assert this.isAutoImportDanglingIndicesEnabled; + if (nodeEnv.hasNodeFile() == false) { return; } cleanupAllocatedDangledIndices(metadata); findNewAndAddDanglingIndices(metadata); - allocateDanglingIndices(); + allocateDanglingIndices(metadata); } /** - * The current set of dangling indices. + * Either return the current set of dangling indices, if auto-import is enabled, otherwise + * scan for dangling indices right away. + * @return a map of currently-known dangling indices */ - Map getDanglingIndices() { - // This might be a good use case for CopyOnWriteHashMap - return unmodifiableMap(new HashMap<>(danglingIndices)); + public Map getDanglingIndices() { + if (this.isAutoImportDanglingIndicesEnabled) { + // This might be a good use case for CopyOnWriteHashMap + return unmodifiableMap(new HashMap<>(danglingIndices)); + } else { + return findNewDanglingIndices(emptyMap(), this.clusterService.state().metadata()); + } } /** @@ -135,38 +153,40 @@ public class DanglingIndicesState implements ClusterStateListener { * to the currently tracked dangling indices. */ void findNewAndAddDanglingIndices(final Metadata metadata) { - danglingIndices.putAll(findNewDanglingIndices(metadata)); + final IndexGraveyard graveyard = metadata.indexGraveyard(); + + // If a tombstone is created for a dangling index, we need to make sure that the + // index is no longer considered dangling. + danglingIndices.keySet().removeIf(graveyard::containsIndex); + + danglingIndices.putAll(findNewDanglingIndices(danglingIndices, metadata)); } /** * Finds new dangling indices by iterating over the indices and trying to find indices - * that have state on disk, but are not part of the provided meta data, or not detected + * that have state on disk, but are not part of the provided metadata, or not detected * as dangled already. */ - Map findNewDanglingIndices(final Metadata metadata) { + public Map findNewDanglingIndices(Map existingDanglingIndices, final Metadata metadata) { final Set excludeIndexPathIds = new HashSet<>(metadata.indices().size() + danglingIndices.size()); for (ObjectCursor cursor : metadata.indices().values()) { excludeIndexPathIds.add(cursor.value.getIndex().getUUID()); } - excludeIndexPathIds.addAll(danglingIndices.keySet().stream().map(Index::getUUID).collect(Collectors.toList())); + for (Index index : existingDanglingIndices.keySet()) { + excludeIndexPathIds.add(index.getUUID()); + } try { final List indexMetadataList = metaStateService.loadIndicesStates(excludeIndexPathIds::contains); Map newIndices = new HashMap<>(indexMetadataList.size()); final IndexGraveyard graveyard = metadata.indexGraveyard(); + for (IndexMetadata indexMetadata : indexMetadataList) { - if (metadata.hasIndex(indexMetadata.getIndex().getName())) { - logger.warn("[{}] can not be imported as a dangling index, as index with same name already exists in cluster metadata", - indexMetadata.getIndex()); - } else if (graveyard.containsIndex(indexMetadata.getIndex())) { - logger.warn("[{}] can not be imported as a dangling index, as an index with the same name and UUID exist in the " + - "index tombstones. This situation is likely caused by copying over the data directory for an index " + - "that was previously deleted.", indexMetadata.getIndex()); - } else { - logger.info("[{}] dangling index exists on local file system, but not in cluster metadata, " + - "auto import to cluster state", indexMetadata.getIndex()); - newIndices.put(indexMetadata.getIndex(), stripAliases(indexMetadata)); + Index index = indexMetadata.getIndex(); + if (graveyard.containsIndex(index) == false) { + newIndices.put(index, stripAliases(indexMetadata)); } } + return newIndices; } catch (IOException e) { logger.warn("failed to list dangling indices", e); @@ -175,6 +195,33 @@ public class DanglingIndicesState implements ClusterStateListener { } /** + * Filters out dangling indices that cannot be automatically imported into the cluster state. + * @param metadata the current cluster metadata + * @param allIndices all currently known dangling indices + * @return a filtered list of dangling index metadata + */ + List filterDanglingIndices(Metadata metadata, Map allIndices) { + List filteredIndices = new ArrayList<>(allIndices.size()); + + allIndices.forEach((index, indexMetadata) -> { + if (metadata.hasIndex(indexMetadata.getIndex().getName())) { + logger.warn("[{}] can not be imported as a dangling index, as index with same name already exists in cluster metadata", + indexMetadata.getIndex()); + } else { + logger.info( + "[{}] dangling index exists on local file system, but not in cluster metadata, auto import to cluster state", + indexMetadata.getIndex() + ); + filteredIndices.add(stripAliases(indexMetadata)); + } + }); + + return filteredIndices; + } + + /** + * Removes all aliases from the supplied index metadata. + * * Dangling importing indices with aliases is dangerous, it could for instance result in inability to write to an existing alias if it * previously had only one index with any is_write_index indication. */ @@ -189,15 +236,25 @@ public class DanglingIndicesState implements ClusterStateListener { } /** - * Allocates the provided list of the dangled indices by sending them to the master node - * for allocation. + * Allocates the detected list of dangling indices by sending them to the master node + * for allocation, provided auto-import is enabled via the + * {@link #AUTO_IMPORT_DANGLING_INDICES_SETTING} setting. + * @param metadata the current cluster metadata, used to filter out dangling indices that cannot be allocated + * for some reason. */ - void allocateDanglingIndices() { + void allocateDanglingIndices(Metadata metadata) { if (danglingIndices.isEmpty()) { return; } + + final List filteredIndices = filterDanglingIndices(metadata, danglingIndices); + + if (filteredIndices.isEmpty()) { + return; + } + try { - allocateDangledIndices.allocateDangled(Collections.unmodifiableCollection(new ArrayList<>(danglingIndices.values())), + danglingIndicesAllocator.allocateDangled(filteredIndices, new ActionListener() { @Override public void onResponse(LocalAllocateDangledIndices.AllocateDangledResponse response) { diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 61e45d47242..c4d8eeb46a4 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -300,7 +300,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple if (indexService != null) { indexSettings = indexService.getIndexSettings(); indicesService.removeIndex(index, DELETED, "index no longer part of the metadata"); - } else if (previousState.metadata().hasIndex(index.getName())) { + } else if (previousState.metadata().hasIndex(index)) { // The deleted index was part of the previous cluster state, but not loaded on the local node final IndexMetadata metadata = previousState.metadata().index(index); indexSettings = new IndexSettings(metadata, settings); @@ -311,8 +311,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple // node was not part of the cluster. In this case, try reading the index // metadata from disk. If its not there, there is nothing to delete. // First, though, verify the precondition for applying this case by - // asserting that the previous cluster state is not initialized/recovered. - assert previousState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK); + // asserting that either this index is already in the graveyard, or the + // previous cluster state is not initialized/recovered. + assert state.metadata().indexGraveyard().containsIndex(index) + || previousState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK); final IndexMetadata metadata = indicesService.verifyIndexIsDeleted(index, event.state()); if (metadata != null) { indexSettings = new IndexSettings(metadata, settings); diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/dangling/RestDeleteDanglingIndexAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/dangling/RestDeleteDanglingIndexAction.java new file mode 100644 index 00000000000..99895e15501 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/dangling/RestDeleteDanglingIndexAction.java @@ -0,0 +1,68 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.cluster.dangling; + +import org.elasticsearch.action.admin.indices.dangling.delete.DeleteDanglingIndexRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.elasticsearch.rest.RestRequest.Method.DELETE; +import static org.elasticsearch.rest.RestStatus.ACCEPTED; + +public class RestDeleteDanglingIndexAction extends BaseRestHandler { + + @Override + public List routes() { + return singletonList(new Route(DELETE, "/_dangling/{index_uuid}")); + } + + @Override + public String getName() { + return "delete_dangling_index"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, NodeClient client) throws IOException { + final DeleteDanglingIndexRequest deleteRequest = new DeleteDanglingIndexRequest( + request.param("index_uuid"), + request.paramAsBoolean("accept_data_loss", false) + ); + + deleteRequest.timeout(request.paramAsTime("timeout", deleteRequest.timeout())); + deleteRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteRequest.masterNodeTimeout())); + + return channel -> client.admin() + .cluster() + .deleteDanglingIndex(deleteRequest, new RestToXContentListener(channel) { + @Override + protected RestStatus getStatus(AcknowledgedResponse acknowledgedResponse) { + return ACCEPTED; + } + }); + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/dangling/RestImportDanglingIndexAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/dangling/RestImportDanglingIndexAction.java new file mode 100644 index 00000000000..7378373358d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/dangling/RestImportDanglingIndexAction.java @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.cluster.dangling; + +import static java.util.Collections.singletonList; +import static org.elasticsearch.rest.RestRequest.Method.POST; +import static org.elasticsearch.rest.RestStatus.ACCEPTED; + +import java.io.IOException; +import java.util.List; + +import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDanglingIndexRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestToXContentListener; + +public class RestImportDanglingIndexAction extends BaseRestHandler { + @Override + public List routes() { + return singletonList(new Route(POST, "/_dangling/{index_uuid}")); + } + + @Override + public String getName() { + return "import_dangling_index"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, NodeClient client) throws IOException { + final ImportDanglingIndexRequest importRequest = new ImportDanglingIndexRequest( + request.param("index_uuid"), + request.paramAsBoolean("accept_data_loss", false) + ); + + importRequest.timeout(request.paramAsTime("timeout", importRequest.timeout())); + importRequest.masterNodeTimeout(request.paramAsTime("master_timeout", importRequest.masterNodeTimeout())); + + return channel -> client.admin() + .cluster() + .importDanglingIndex(importRequest, new RestToXContentListener(channel) { + @Override + protected RestStatus getStatus(AcknowledgedResponse acknowledgedResponse) { + return ACCEPTED; + } + }); + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/dangling/RestListDanglingIndicesAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/dangling/RestListDanglingIndicesAction.java new file mode 100644 index 00000000000..7db0d1251d0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/dangling/RestListDanglingIndicesAction.java @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.cluster.dangling; + +import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestActions; + +import java.io.IOException; +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.elasticsearch.rest.RestRequest.Method.GET; + +public class RestListDanglingIndicesAction extends BaseRestHandler { + @Override + public List routes() { + return singletonList(new Route(GET, "/_dangling")); + } + + @Override + public String getName() { + return "list_dangling_indices"; + } + + @Override + public BaseRestHandler.RestChannelConsumer prepareRequest(final RestRequest request, NodeClient client) throws IOException { + final ListDanglingIndicesRequest danglingIndicesRequest = new ListDanglingIndicesRequest(); + return channel -> client.admin() + .cluster() + .listDanglingIndices(danglingIndicesRequest, new RestActions.NodesResponseRestListener<>(channel)); + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/dangling/package-info.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/dangling/package-info.java new file mode 100644 index 00000000000..05476fbe16a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/dangling/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * {@link org.elasticsearch.rest.RestHandler}s for managing dangling indices. + * + * @see org.elasticsearch.action.admin.indices.dangling + */ +package org.elasticsearch.rest.action.admin.cluster.dangling; diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesResponseTests.java new file mode 100644 index 00000000000..89dd56c415c --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/dangling/list/ListDanglingIndicesResponseTests.java @@ -0,0 +1,157 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.dangling.list; + +import org.elasticsearch.action.admin.indices.dangling.DanglingIndexInfo; +import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesResponse.AggregatedDanglingIndexInfo; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesResponse.resultsByIndexUUID; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ListDanglingIndicesResponseTests extends ESTestCase { + + public static final String UUID_1 = UUID.randomUUID().toString(); + public static final String UUID_2 = UUID.randomUUID().toString(); + + /** + * Checks that {@link ListDanglingIndicesResponse#resultsByIndexUUID(List)} handles the + * basic base of empty input. + */ + public void testResultsByIndexUUIDWithEmptyListReturnsEmptyMap() { + assertThat(resultsByIndexUUID(emptyList()), empty()); + } + + /** + * Checks that resultsByIndexUUID(List) can aggregate a single dangling index + * on a single node. + */ + public void testResultsByIndexUUIDCanAggregateASingleResponse() { + final DiscoveryNode node = mock(DiscoveryNode.class); + when(node.getId()).thenReturn("some-node-id"); + + final List danglingIndexInfo = singletonList( + new DanglingIndexInfo("some-node-id", "some-index", UUID_1, 123456L) + ); + final List nodes = singletonList(new NodeListDanglingIndicesResponse(node, danglingIndexInfo)); + + final List aggregated = new ArrayList<>(resultsByIndexUUID(nodes)); + assertThat(aggregated, hasSize(1)); + + final AggregatedDanglingIndexInfo expected = new AggregatedDanglingIndexInfo(UUID_1, "some-index", 123456L); + expected.getNodeIds().add("some-node-id"); + assertThat(aggregated.get(0), equalTo(expected)); + } + + /** + * Checks that resultsByIndexUUID(List) can aggregate a single dangling index + * across multiple nodes. + */ + public void testResultsByIndexUUIDCanAggregateAcrossMultipleNodes() { + final DiscoveryNode node1 = mock(DiscoveryNode.class); + final DiscoveryNode node2 = mock(DiscoveryNode.class); + when(node1.getId()).thenReturn("node-id-1"); + when(node2.getId()).thenReturn("node-id-2"); + + final List danglingIndexInfo1 = singletonList(new DanglingIndexInfo("node-id-1", "some-index", UUID_1, 123456L)); + final List danglingIndexInfo2 = singletonList(new DanglingIndexInfo("node-id-2", "some-index", UUID_1, 123456L)); + final List nodes = asList( + new NodeListDanglingIndicesResponse(node1, danglingIndexInfo1), + new NodeListDanglingIndicesResponse(node2, danglingIndexInfo2) + ); + + final List aggregated = new ArrayList<>(resultsByIndexUUID(nodes)); + assertThat(aggregated, hasSize(1)); + + final AggregatedDanglingIndexInfo expected = new AggregatedDanglingIndexInfo(UUID_1, "some-index", 123456L); + expected.getNodeIds().add("node-id-1"); + expected.getNodeIds().add("node-id-2"); + assertThat(aggregated.get(0), equalTo(expected)); + } + + /** + * Checks that resultsByIndexUUID(List) can aggregate multiple dangling indices + * on a single node. + */ + public void testResultsByIndexUUIDCanAggregateMultipleIndicesOnOneNode() { + final DiscoveryNode node1 = mock(DiscoveryNode.class); + when(node1.getId()).thenReturn("node-id-1"); + + final List danglingIndexInfo = asList( + new DanglingIndexInfo("node-id-1", "some-index", UUID_1, 123456L), + new DanglingIndexInfo("node-id-1", "some-other-index", UUID_2, 7891011L) + ); + + final List nodes = singletonList(new NodeListDanglingIndicesResponse(node1, danglingIndexInfo)); + + final List aggregated = new ArrayList<>(resultsByIndexUUID(nodes)); + assertThat(aggregated, hasSize(2)); + + AggregatedDanglingIndexInfo info1 = new AggregatedDanglingIndexInfo(UUID_1, "some-index", 123456L); + AggregatedDanglingIndexInfo info2 = new AggregatedDanglingIndexInfo(UUID_2, "some-other-index", 7891011L); + info1.getNodeIds().add("node-id-1"); + info2.getNodeIds().add("node-id-1"); + + assertThat(aggregated, containsInAnyOrder(info1, info2)); + } + + /** + * Checks that resultsByIndexUUID(List) can aggregate multiple dangling indices + * across multiple nodes. + */ + public void testResultsByIndexUUIDCanAggregateMultipleIndicesAcrossMultipleNodes() { + final DiscoveryNode node1 = mock(DiscoveryNode.class); + final DiscoveryNode node2 = mock(DiscoveryNode.class); + when(node1.getId()).thenReturn("node-id-1"); + when(node2.getId()).thenReturn("node-id-2"); + + final List danglingIndexInfo1 = singletonList(new DanglingIndexInfo("node-id-1", "some-index", UUID_1, 123456L)); + final List danglingIndexInfo2 = singletonList( + new DanglingIndexInfo("node-id-2", "some-other-index", UUID_2, 7891011L) + ); + final List nodes = asList( + new NodeListDanglingIndicesResponse(node1, danglingIndexInfo1), + new NodeListDanglingIndicesResponse(node2, danglingIndexInfo2) + ); + + final List aggregated = new ArrayList<>(resultsByIndexUUID(nodes)); + assertThat(aggregated, hasSize(2)); + + AggregatedDanglingIndexInfo info1 = new AggregatedDanglingIndexInfo(UUID_1, "some-index", 123456L); + AggregatedDanglingIndexInfo info2 = new AggregatedDanglingIndexInfo(UUID_2, "some-other-index", 7891011L); + info1.getNodeIds().add("node-id-1"); + info2.getNodeIds().add("node-id-2"); + + assertThat(aggregated, containsInAnyOrder(info1, info2)); + } +} diff --git a/server/src/test/java/org/elasticsearch/gateway/DanglingIndicesStateTests.java b/server/src/test/java/org/elasticsearch/gateway/DanglingIndicesStateTests.java index 20b5708fa1d..25ef395e842 100644 --- a/server/src/test/java/org/elasticsearch/gateway/DanglingIndicesStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/DanglingIndicesStateTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.IndexGraveyard; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; @@ -33,12 +34,20 @@ import org.hamcrest.Matchers; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import java.util.List; import java.util.Map; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; import static org.elasticsearch.gateway.DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING; +import static org.hamcrest.Matchers.aMapWithSize; +import static org.hamcrest.Matchers.anEmptyMap; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -78,14 +87,36 @@ public class DanglingIndicesStateTests extends ESTestCase { final Settings.Builder settings = Settings.builder().put(indexSettings).put(IndexMetadata.SETTING_INDEX_UUID, "test1UUID"); IndexMetadata dangledIndex = IndexMetadata.builder("test1").settings(settings).build(); metaStateService.writeIndex("test_write", dangledIndex); - Map newDanglingIndices = danglingState.findNewDanglingIndices(metadata); + Map newDanglingIndices = danglingState.findNewDanglingIndices(emptyMap(), metadata); assertTrue(newDanglingIndices.containsKey(dangledIndex.getIndex())); metadata = Metadata.builder().put(dangledIndex, false).build(); - newDanglingIndices = danglingState.findNewDanglingIndices(metadata); + newDanglingIndices = danglingState.findNewDanglingIndices(emptyMap(), metadata); assertFalse(newDanglingIndices.containsKey(dangledIndex.getIndex())); } } + /** + * Check that a dangling index is not reported as newly discovered when we + * already known about it. + */ + public void testDanglingIndicesNotDiscoveredWhenAlreadyKnown() throws Exception { + try (NodeEnvironment env = newNodeEnvironment()) { + MetaStateService metaStateService = new MetaStateService(env, xContentRegistry()); + DanglingIndicesState danglingState = createDanglingIndicesState(env, metaStateService); + + Metadata metadata = Metadata.builder().build(); + final Settings.Builder settings = Settings.builder().put(indexSettings).put(IndexMetadata.SETTING_INDEX_UUID, "test1UUID"); + IndexMetadata dangledIndex = IndexMetadata.builder("test1").settings(settings).build(); + metaStateService.writeIndex("test_write", dangledIndex); + + Map newDanglingIndices = danglingState.findNewDanglingIndices( + singletonMap(dangledIndex.getIndex(), dangledIndex), + metadata + ); + assertThat(newDanglingIndices, is(anEmptyMap())); + } + } + public void testInvalidIndexFolder() throws Exception { try (NodeEnvironment env = newNodeEnvironment()) { MetaStateService metaStateService = new MetaStateService(env, xContentRegistry()); @@ -102,7 +133,7 @@ public class DanglingIndicesStateTests extends ESTestCase { } } try { - danglingState.findNewDanglingIndices(metadata); + danglingState.findNewDanglingIndices(emptyMap(), metadata); fail("no exception thrown for invalid folder name"); } catch (IllegalStateException e) { assertThat(e.getMessage(), equalTo("[invalidUUID] invalid index folder name, rename to [test1UUID]")); @@ -124,7 +155,7 @@ public class DanglingIndicesStateTests extends ESTestCase { // check that several runs when not in the metadata still keep the dangled index around int numberOfChecks = randomIntBetween(1, 10); for (int i = 0; i < numberOfChecks; i++) { - Map newDanglingIndices = danglingState.findNewDanglingIndices(metadata); + Map newDanglingIndices = danglingState.findNewDanglingIndices(emptyMap(), metadata); assertThat(newDanglingIndices.size(), equalTo(1)); assertThat(newDanglingIndices.keySet(), Matchers.hasItems(dangledIndex.getIndex())); assertTrue(danglingState.getDanglingIndices().isEmpty()); @@ -142,7 +173,7 @@ public class DanglingIndicesStateTests extends ESTestCase { // check that several runs when in the metadata, but not cleaned yet, still keeps dangled for (int i = 0; i < numberOfChecks; i++) { - Map newDanglingIndices = danglingState.findNewDanglingIndices(metadata); + Map newDanglingIndices = danglingState.findNewDanglingIndices(emptyMap(), metadata); assertTrue(newDanglingIndices.isEmpty()); assertThat(danglingState.getDanglingIndices().size(), equalTo(1)); @@ -165,7 +196,42 @@ public class DanglingIndicesStateTests extends ESTestCase { final IndexGraveyard graveyard = IndexGraveyard.builder().addTombstone(dangledIndex.getIndex()).build(); final Metadata metadata = Metadata.builder().indexGraveyard(graveyard).build(); - assertThat(danglingState.findNewDanglingIndices(metadata).size(), equalTo(0)); + + final Map newDanglingIndices = danglingState.findNewDanglingIndices(emptyMap(), metadata); + assertThat(newDanglingIndices, is(emptyMap())); + } + } + + public void testDanglingIndicesNotImportedWhenIndexNameIsAlreadyUsed() throws Exception { + try (NodeEnvironment env = newNodeEnvironment()) { + MetaStateService metaStateService = new MetaStateService(env, xContentRegistry()); + DanglingIndicesState danglingState = createDanglingIndicesState(env, metaStateService); + + final Settings.Builder danglingSettings = Settings.builder() + .put(indexSettings) + .put(IndexMetadata.SETTING_INDEX_UUID, "test1UUID"); + IndexMetadata dangledIndex = IndexMetadata.builder("test_index").settings(danglingSettings).build(); + metaStateService.writeIndex("test_write", dangledIndex); + + // Build another index with the same name but a different UUID + final Settings.Builder existingSettings = Settings.builder() + .put(indexSettings) + .put(IndexMetadata.SETTING_INDEX_UUID, "test2UUID"); + IndexMetadata existingIndex = IndexMetadata.builder("test_index").settings(existingSettings).build(); + metaStateService.writeIndex("test_write", existingIndex); + + final ImmutableOpenMap indices = ImmutableOpenMap.builder() + .fPut(dangledIndex.getIndex().getName(), existingIndex) + .build(); + final Metadata metadata = Metadata.builder().indices(indices).build(); + + // All dangling indices should be found... + final Map newDanglingIndices = danglingState.findNewDanglingIndices(emptyMap(), metadata); + assertThat(newDanglingIndices, is(aMapWithSize(1))); + + // ...but the filter method should remove those where another index exists with the same name + final List filteredIndices = danglingState.filterDanglingIndices(metadata, newDanglingIndices); + assertThat(filteredIndices, is(empty())); } } @@ -183,7 +249,7 @@ public class DanglingIndicesStateTests extends ESTestCase { assertThat(dangledIndex.getAliases().size(), equalTo(1)); final Metadata metadata = Metadata.builder().build(); - Map newDanglingIndices = danglingState.findNewDanglingIndices(metadata); + Map newDanglingIndices = danglingState.findNewDanglingIndices(emptyMap(), metadata); assertThat(newDanglingIndices.size(), equalTo(1)); Map.Entry entry = newDanglingIndices.entrySet().iterator().next(); assertThat(entry.getKey().getName(), equalTo("test1")); @@ -191,7 +257,10 @@ public class DanglingIndicesStateTests extends ESTestCase { } } - public void testDanglingIndicesAreNotAllocatedWhenDisabled() throws Exception { + /** + * Check that when auto-imports are disabled, then no change listener is registered with the cluster state. + */ + public void testClusterStateListenerNotRegisterWhenSettingDisabled() throws Exception { try (NodeEnvironment env = newNodeEnvironment()) { MetaStateService metaStateService = new MetaStateService(env, xContentRegistry()); LocalAllocateDangledIndices localAllocateDangledIndices = mock(LocalAllocateDangledIndices.class); @@ -201,17 +270,20 @@ public class DanglingIndicesStateTests extends ESTestCase { final ClusterService clusterServiceMock = mock(ClusterService.class); when(clusterServiceMock.getSettings()).thenReturn(allocateSettings); - final DanglingIndicesState danglingIndicesState = new DanglingIndicesState( + new DanglingIndicesState( env, metaStateService, localAllocateDangledIndices, clusterServiceMock ); - assertFalse("Expected dangling imports to be disabled", danglingIndicesState.isAutoImportDanglingIndicesEnabled()); + verify(clusterServiceMock, never()).addListener(any()); } } + /** + * Check that when auto-imports are enabled, then dangling indices are automatically imported. + */ public void testDanglingIndicesAreAllocatedWhenEnabled() throws Exception { try (NodeEnvironment env = newNodeEnvironment()) { MetaStateService metaStateService = new MetaStateService(env, xContentRegistry()); @@ -233,9 +305,10 @@ public class DanglingIndicesStateTests extends ESTestCase { IndexMetadata dangledIndex = IndexMetadata.builder("test1").settings(settings).build(); metaStateService.writeIndex("test_write", dangledIndex); - danglingIndicesState.findNewAndAddDanglingIndices(Metadata.builder().build()); + final Metadata metadata = Metadata.builder().build(); + danglingIndicesState.findNewAndAddDanglingIndices(metadata); - danglingIndicesState.allocateDanglingIndices(); + danglingIndicesState.allocateDanglingIndices(metadata); verify(localAllocateDangledIndices).allocateDangled(any(), any()); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/XContentTestUtils.java b/test/framework/src/main/java/org/elasticsearch/test/XContentTestUtils.java index 1449252d024..2fb36196eee 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/XContentTestUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/XContentTestUtils.java @@ -29,9 +29,11 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.rest.yaml.ObjectPath; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -302,4 +304,34 @@ public final class XContentTestUtils { } return object.toXContentBuilder(xContent); } + + public static JsonMapView createJsonMapView(InputStream inputStream) { + final Map responseMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, inputStream, true); + + return new JsonMapView(responseMap); + } + + public static class JsonMapView { + private final Map map; + + public JsonMapView(Map map) { + this.map = map; + } + + @SuppressWarnings("unchecked") + public T get(String path) { + String[] keys = path.split("\\."); + Object context = map; + for (String key : keys) { + if (context instanceof Map) { + context = ((Map) context).get(key); + } else if (context instanceof List) { + context = ((List) context).get(Integer.parseInt(key)); + } else { + throw new IllegalStateException("neither list nor map"); + } + } + return (T) context; + } + } }