Implement dangling indices API (#58176)

Backport of #50920. Part of #48366. Implement an API for listing,
importing and deleting dangling indices.

Co-authored-by: David Turner <david.turner@elastic.co>
This commit is contained in:
Rory Hunter 2020-06-16 21:50:38 +01:00 committed by GitHub
parent ce22e951f8
commit 03369e0980
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 3142 additions and 103 deletions

View File

@ -816,6 +816,9 @@ public class RestHighLevelClientTests extends ESTestCase {
"cluster.stats",
"cluster.post_voting_config_exclusions",
"cluster.delete_voting_config_exclusions",
"dangling_indices.delete",
"dangling_indices.import",
"dangling_indices.list",
"indices.shard_stores",
"indices.upgrade",
"indices.recovery",

View File

@ -0,0 +1,316 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.XContentTestUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.cluster.metadata.IndexGraveyard.SETTING_MAX_TOMBSTONES;
import static org.elasticsearch.gateway.DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING;
import static org.elasticsearch.indices.IndicesService.WRITE_DANGLING_INDICES_INFO_SETTING;
import static org.elasticsearch.rest.RestStatus.ACCEPTED;
import static org.elasticsearch.rest.RestStatus.OK;
import static org.elasticsearch.test.XContentTestUtils.createJsonMapView;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
/**
* This class tests the dangling indices REST API. These tests are here
* today so they have access to a proper REST client. They cannot be in
* :server:integTest since the REST client needs a proper transport
* implementation, and they cannot be REST tests today since they need to
* restart nodes. Really, though, this test should live elsewhere.
*
* @see org.elasticsearch.action.admin.indices.dangling
*/
@ClusterScope(numDataNodes = 0, scope = ESIntegTestCase.Scope.TEST, autoManageMasterNodes = false)
public class DanglingIndicesRestIT extends HttpSmokeTestCase {
private static final String INDEX_NAME = "test-idx-1";
private static final String OTHER_INDEX_NAME = INDEX_NAME + "-other";
private Settings buildSettings(int maxTombstones) {
return Settings.builder()
// Limit the indices kept in the graveyard. This can be set to zero, so that
// when we delete an index, it's definitely considered to be dangling.
.put(SETTING_MAX_TOMBSTONES.getKey(), maxTombstones)
.put(WRITE_DANGLING_INDICES_INFO_SETTING.getKey(), true)
.put(AUTO_IMPORT_DANGLING_INDICES_SETTING.getKey(), false)
.build();
}
/**
* Check that when dangling indices are discovered, then they can be listed via the REST API.
*/
public void testDanglingIndicesCanBeListed() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(1);
internalCluster().startNodes(3, buildSettings(0));
final DanglingIndexDetails danglingIndexDetails = createDanglingIndices(INDEX_NAME);
final String stoppedNodeId = mapNodeNameToId(danglingIndexDetails.stoppedNodeName);
final RestClient restClient = getRestClient();
final Response listResponse = restClient.performRequest(new Request("GET", "/_dangling"));
assertOK(listResponse);
final XContentTestUtils.JsonMapView mapView = createJsonMapView(listResponse.getEntity().getContent());
assertThat(mapView.get("_nodes.total"), equalTo(3));
assertThat(mapView.get("_nodes.successful"), equalTo(3));
assertThat(mapView.get("_nodes.failed"), equalTo(0));
List<Object> indices = mapView.get("dangling_indices");
assertThat(indices, hasSize(1));
assertThat(mapView.get("dangling_indices.0.index_name"), equalTo(INDEX_NAME));
assertThat(mapView.get("dangling_indices.0.index_uuid"), equalTo(danglingIndexDetails.indexToUUID.get(INDEX_NAME)));
assertThat(mapView.get("dangling_indices.0.creation_date_millis"), instanceOf(Long.class));
assertThat(mapView.get("dangling_indices.0.node_ids.0"), equalTo(stoppedNodeId));
}
/**
* Check that dangling indices can be imported.
*/
public void testDanglingIndicesCanBeImported() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(1);
internalCluster().startNodes(3, buildSettings(0));
createDanglingIndices(INDEX_NAME);
final RestClient restClient = getRestClient();
final List<String> danglingIndexIds = listDanglingIndexIds();
assertThat(danglingIndexIds, hasSize(1));
final Request importRequest = new Request("POST", "/_dangling/" + danglingIndexIds.get(0));
importRequest.addParameter("accept_data_loss", "true");
// Ensure this parameter is accepted
importRequest.addParameter("timeout", "20s");
importRequest.addParameter("master_timeout", "20s");
final Response importResponse = restClient.performRequest(importRequest);
assertThat(importResponse.getStatusLine().getStatusCode(), equalTo(ACCEPTED.getStatus()));
final XContentTestUtils.JsonMapView mapView = createJsonMapView(importResponse.getEntity().getContent());
assertThat(mapView.get("acknowledged"), equalTo(true));
assertTrue("Expected dangling index " + INDEX_NAME + " to be recovered", indexExists(INDEX_NAME));
}
/**
* Check that dangling indices can be deleted. Since this requires that
* we add an entry to the index graveyard, the graveyard size must be
* greater than 1. To test deletes, we set the index graveyard size to
* 1, then create two indices and delete them both while one node in
* the cluster is stopped. The deletion of the second pushes the deletion
* of the first out of the graveyard. When the stopped node is resumed,
* only the second index will be found into the graveyard and the the
* other will be considered dangling, and can therefore be listed and
* deleted through the API
*/
public void testDanglingIndicesCanBeDeleted() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(1);
internalCluster().startNodes(3, buildSettings(1));
createDanglingIndices(INDEX_NAME, OTHER_INDEX_NAME);
final RestClient restClient = getRestClient();
final List<String> danglingIndexIds = listDanglingIndexIds();
assertThat(danglingIndexIds, hasSize(1));
final Request deleteRequest = new Request("DELETE", "/_dangling/" + danglingIndexIds.get(0));
deleteRequest.addParameter("accept_data_loss", "true");
// Ensure these parameters is accepted
deleteRequest.addParameter("timeout", "20s");
deleteRequest.addParameter("master_timeout", "20s");
final Response deleteResponse = restClient.performRequest(deleteRequest);
assertThat(deleteResponse.getStatusLine().getStatusCode(), equalTo(ACCEPTED.getStatus()));
final XContentTestUtils.JsonMapView mapView = createJsonMapView(deleteResponse.getEntity().getContent());
assertThat(mapView.get("acknowledged"), equalTo(true));
assertBusy(() -> assertThat("Expected dangling index to be deleted", listDanglingIndexIds(), hasSize(0)));
// The dangling index that we deleted ought to have been removed from disk. Check by
// creating and deleting another index, which creates a new tombstone entry, which should
// not cause the deleted dangling index to be considered "live" again, just because its
// tombstone has been pushed out of the graveyard.
createIndex("additional");
deleteIndex("additional");
assertThat(listDanglingIndexIds(), is(empty()));
}
private List<String> listDanglingIndexIds() throws IOException {
final Response response = getRestClient().performRequest(new Request("GET", "/_dangling"));
assertOK(response);
final XContentTestUtils.JsonMapView mapView = createJsonMapView(response.getEntity().getContent());
assertThat(mapView.get("_nodes.total"), equalTo(3));
assertThat(mapView.get("_nodes.successful"), equalTo(3));
assertThat(mapView.get("_nodes.failed"), equalTo(0));
List<Object> indices = mapView.get("dangling_indices");
List<String> danglingIndexIds = new ArrayList<>();
for (int i = 0; i < indices.size(); i++) {
danglingIndexIds.add(mapView.get("dangling_indices." + i + ".index_uuid"));
}
return danglingIndexIds;
}
private void assertOK(Response response) {
assertThat(response.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
}
/**
* Given a node name, finds the corresponding node ID.
*/
private String mapNodeNameToId(String nodeName) throws IOException {
final Response catResponse = getRestClient().performRequest(new Request("GET", "/_cat/nodes?full_id&h=id,name"));
assertOK(catResponse);
for (String nodeLine : Streams.readAllLines(catResponse.getEntity().getContent())) {
String[] elements = nodeLine.split(" ");
if (elements[1].equals(nodeName)) {
return elements[0];
}
}
throw new AssertionError("Failed to map node name [" + nodeName + "] to node ID");
}
/**
* Helper that creates one or more indices, and importantly,
* checks that they are green before proceeding. This is important
* because the tests in this class stop and restart nodes, assuming
* that each index has a primary or replica shard on every node, and if
* a node is stopped prematurely, this assumption is broken.
*
* @return a mapping from each created index name to its UUID
*/
private Map<String, String> createIndices(String... indices) throws IOException {
assert indices.length > 0;
for (String index : indices) {
String indexSettings = "{"
+ " \"settings\": {"
+ " \"index\": {"
+ " \"number_of_shards\": 1,"
+ " \"number_of_replicas\": 2,"
+ " \"routing\": {"
+ " \"allocation\": {"
+ " \"total_shards_per_node\": 1"
+ " }"
+ " }"
+ " }"
+ " }"
+ "}";
Request request = new Request("PUT", "/" + index);
request.setJsonEntity(indexSettings);
assertOK(getRestClient().performRequest(request));
}
ensureGreen(indices);
final Response catResponse = getRestClient().performRequest(new Request("GET", "/_cat/indices?h=index,uuid"));
assertOK(catResponse);
final Map<String, String> createdIndexIDs = new HashMap<>();
final List<String> indicesAsList = Arrays.asList(indices);
for (String indexLine : Streams.readAllLines(catResponse.getEntity().getContent())) {
String[] elements = indexLine.split(" +");
if (indicesAsList.contains(elements[0])) {
createdIndexIDs.put(elements[0], elements[1]);
}
}
assertThat("Expected to find as many index UUIDs as created indices", createdIndexIDs.size(), equalTo(indices.length));
return createdIndexIDs;
}
private void deleteIndex(String indexName) throws IOException {
Response deleteResponse = getRestClient().performRequest(new Request("DELETE", "/" + indexName));
assertOK(deleteResponse);
}
private DanglingIndexDetails createDanglingIndices(String... indices) throws Exception {
ensureStableCluster(3);
final Map<String, String> indexToUUID = createIndices(indices);
final AtomicReference<String> stoppedNodeName = new AtomicReference<>();
assertBusy(
() -> internalCluster().getInstances(IndicesService.class)
.forEach(indicesService -> assertTrue(indicesService.allPendingDanglingIndicesWritten()))
);
// Restart node, deleting the index in its absence, so that there is a dangling index to recover
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
ensureClusterSizeConsistency();
stoppedNodeName.set(nodeName);
for (String index : indices) {
deleteIndex(index);
}
return super.onNodeStopped(nodeName);
}
});
ensureStableCluster(3);
return new DanglingIndexDetails(stoppedNodeName.get(), indexToUUID);
}
private static class DanglingIndexDetails {
private final String stoppedNodeName;
private final Map<String, String> indexToUUID;
DanglingIndexDetails(String stoppedNodeName, Map<String, String> indexToUUID) {
this.stoppedNodeName = stoppedNodeName;
this.indexToUUID = indexToUUID;
}
}
}

View File

@ -0,0 +1,39 @@
{
"dangling_indices.delete": {
"documentation": {
"url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-gateway-dangling-indices.html",
"description": "Deletes the specified dangling index"
},
"stability": "stable",
"url": {
"paths": [
{
"path": "/_dangling/{index_uuid}",
"methods": [
"DELETE"
],
"parts": {
"index_uuid": {
"type": "string",
"description": "The UUID of the dangling index"
}
}
}
]
},
"params": {
"accept_data_loss": {
"type": "boolean",
"description": "Must be set to true in order to delete the dangling index"
},
"timeout": {
"type": "time",
"description": "Explicit operation timeout"
},
"master_timeout": {
"type": "time",
"description": "Specify timeout for connection to master"
}
}
}
}

View File

@ -0,0 +1,39 @@
{
"dangling_indices.import": {
"documentation": {
"url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-gateway-dangling-indices.html",
"description": "Imports the specified dangling index"
},
"stability": "stable",
"url": {
"paths": [
{
"path": "/_dangling/{index_uuid}",
"methods": [
"POST"
],
"parts": {
"index_uuid": {
"type": "string",
"description": "The UUID of the dangling index"
}
}
}
]
},
"params": {
"accept_data_loss": {
"type": "boolean",
"description": "Must be set to true in order to import the dangling index"
},
"timeout": {
"type": "time",
"description": "Explicit operation timeout"
},
"master_timeout": {
"type": "time",
"description": "Specify timeout for connection to master"
}
}
}
}

View File

@ -0,0 +1,20 @@
{
"dangling_indices.list": {
"documentation": {
"url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-gateway-dangling-indices.html",
"description": "Returns all dangling indices."
},
"stability": "stable",
"url": {
"paths": [
{
"path": "/_dangling",
"methods": [
"GET"
]
}
]
},
"params": {}
}
}

View File

@ -19,54 +19,79 @@
package org.elasticsearch.indices.recovery;
import org.elasticsearch.action.admin.indices.dangling.DanglingIndexInfo;
import org.elasticsearch.action.admin.indices.dangling.delete.DeleteDanglingIndexRequest;
import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDanglingIndexRequest;
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest;
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesResponse;
import org.elasticsearch.action.admin.indices.dangling.list.NodeListDanglingIndicesResponse;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.cluster.metadata.IndexGraveyard.SETTING_MAX_TOMBSTONES;
import static org.elasticsearch.gateway.DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
/**
* This class tests how dangling indices are handled, in terms of how they
* are discovered, and how they can be accessed and manipulated through the
* API.
*
* <p>See also <code>DanglingIndicesRestIT</code> in the <code>qa:smoke-test-http</code>
* project.
*
* @see org.elasticsearch.action.admin.indices.dangling
*/
@ClusterScope(numDataNodes = 0, scope = ESIntegTestCase.Scope.TEST)
public class DanglingIndicesIT extends ESIntegTestCase {
private static final String INDEX_NAME = "test-idx-1";
private static final String OTHER_INDEX_NAME = INDEX_NAME + "-other";
private Settings buildSettings(boolean writeDanglingIndices, boolean importDanglingIndices) {
private Settings buildSettings(int maxTombstones, boolean writeDanglingIndices, boolean importDanglingIndices) {
return Settings.builder()
// Don't keep any indices in the graveyard, so that when we delete an index,
// it's definitely considered to be dangling.
.put(SETTING_MAX_TOMBSTONES.getKey(), 0)
// Limit the indices kept in the graveyard. This can be set to zero, so that
// when we delete an index, it's definitely considered to be dangling.
.put(SETTING_MAX_TOMBSTONES.getKey(), maxTombstones)
.put(IndicesService.WRITE_DANGLING_INDICES_INFO_SETTING.getKey(), writeDanglingIndices)
.put(AUTO_IMPORT_DANGLING_INDICES_SETTING.getKey(), importDanglingIndices)
.build();
}
/**
* Check that when dangling indices are discovered, then they are recovered into
* the cluster, so long as the recovery setting is enabled.
* Check that when the auto-recovery setting is enabled and a dangling index is
* discovered, then that index is recovered into the cluster.
*/
public void testDanglingIndicesAreRecoveredWhenSettingIsEnabled() throws Exception {
final Settings settings = buildSettings(true, true);
final Settings settings = buildSettings(0, true, true);
internalCluster().startNodes(3, settings);
createIndex(INDEX_NAME, Settings.builder().put("number_of_replicas", 2).build());
ensureGreen(INDEX_NAME);
assertBusy(() -> internalCluster().getInstances(IndicesService.class).forEach(
indicesService -> assertTrue(indicesService.allPendingDanglingIndicesWritten())));
createIndices(INDEX_NAME);
ensurePendingDanglingIndicesWritten();
boolean refreshIntervalChanged = randomBoolean();
if (refreshIntervalChanged) {
client().admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(
Settings.builder().put("index.refresh_interval", "42s").build()).get();
assertBusy(() -> internalCluster().getInstances(IndicesService.class).forEach(
indicesService -> assertTrue(indicesService.allPendingDanglingIndicesWritten())));
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put("index.refresh_interval", "42s").build())
.get();
ensurePendingDanglingIndicesWritten();
}
if (randomBoolean()) {
@ -87,36 +112,31 @@ public class DanglingIndicesIT extends ESIntegTestCase {
assertBusy(() -> assertTrue("Expected dangling index " + INDEX_NAME + " to be recovered", indexExists(INDEX_NAME)));
if (refreshIntervalChanged) {
assertThat(client().admin().indices().prepareGetSettings(INDEX_NAME).get().getSetting(INDEX_NAME, "index.refresh_interval"),
equalTo("42s"));
assertThat(
client().admin().indices().prepareGetSettings(INDEX_NAME).get().getSetting(INDEX_NAME, "index.refresh_interval"),
equalTo("42s")
);
}
ensureGreen(INDEX_NAME);
final IndexMetadata indexMetadata = clusterService().state().metadata().index(INDEX_NAME);
assertThat(indexMetadata.getSettings().get(IndexMetadata.SETTING_HISTORY_UUID), notNullValue());
}
private void ensurePendingDanglingIndicesWritten() throws Exception {
assertBusy(
() -> internalCluster().getInstances(IndicesService.class)
.forEach(indicesService -> assertTrue(indicesService.allPendingDanglingIndicesWritten()))
);
}
/**
* Check that when dangling indices are discovered, then they are not recovered into
* the cluster when the recovery setting is disabled.
*/
public void testDanglingIndicesAreNotRecoveredWhenSettingIsDisabled() throws Exception {
internalCluster().startNodes(3, buildSettings(true, false));
internalCluster().startNodes(3, buildSettings(0, true, false));
createIndex(INDEX_NAME, Settings.builder().put("number_of_replicas", 2).build());
ensureGreen(INDEX_NAME);
assertBusy(() -> internalCluster().getInstances(IndicesService.class).forEach(
indicesService -> assertTrue(indicesService.allPendingDanglingIndicesWritten())));
// Restart node, deleting the index in its absence, so that there is a dangling index to recover
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
internalCluster().validateClusterFormed();
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME));
return super.onNodeStopped(nodeName);
}
});
createDanglingIndices(INDEX_NAME);
// Since index recovery is async, we can't prove index recovery will never occur, just that it doesn't occur within some reasonable
// amount of time
@ -130,23 +150,9 @@ public class DanglingIndicesIT extends ESIntegTestCase {
* Check that when dangling indices are not written, then they cannot be recovered into the cluster.
*/
public void testDanglingIndicesAreNotRecoveredWhenNotWritten() throws Exception {
internalCluster().startNodes(3, buildSettings(false, true));
internalCluster().startNodes(3, buildSettings(0, false, true));
createIndex(INDEX_NAME, Settings.builder().put("number_of_replicas", 2).build());
ensureGreen(INDEX_NAME);
internalCluster().getInstances(IndicesService.class).forEach(
indicesService -> assertTrue(indicesService.allPendingDanglingIndicesWritten()));
// Restart node, deleting the index in its absence, so that there is a dangling index to recover
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
internalCluster().validateClusterFormed();
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME));
return super.onNodeStopped(nodeName);
}
});
createDanglingIndices(INDEX_NAME);
// Since index recovery is async, we can't prove index recovery will never occur, just that it doesn't occur within some reasonable
// amount of time
@ -155,4 +161,365 @@ public class DanglingIndicesIT extends ESIntegTestCase {
waitUntil(() -> indexExists(INDEX_NAME), 1, TimeUnit.SECONDS)
);
}
/**
* Check that when dangling indices are discovered, then they can be listed.
*/
public void testDanglingIndicesCanBeListed() throws Exception {
internalCluster().startNodes(3, buildSettings(0, true, false));
final String stoppedNodeName = createDanglingIndices(INDEX_NAME);
final ListDanglingIndicesResponse response = client().admin()
.cluster()
.listDanglingIndices(new ListDanglingIndicesRequest())
.actionGet();
assertThat(response.status(), equalTo(RestStatus.OK));
final List<NodeListDanglingIndicesResponse> nodeResponses = response.getNodes();
assertThat("Didn't get responses from all nodes", nodeResponses, hasSize(3));
for (NodeListDanglingIndicesResponse nodeResponse : nodeResponses) {
if (nodeResponse.getNode().getName().equals(stoppedNodeName)) {
assertThat("Expected node that was stopped to have one dangling index", nodeResponse.getDanglingIndices(), hasSize(1));
final DanglingIndexInfo danglingIndexInfo = nodeResponse.getDanglingIndices().get(0);
assertThat(danglingIndexInfo.getIndexName(), equalTo(INDEX_NAME));
} else {
assertThat("Expected node that was never stopped to have no dangling indices", nodeResponse.getDanglingIndices(), empty());
}
}
}
/**
* Check that when dangling index auto imports are enabled, and a dangling index is is discovered
* but cannot be imported due to a name clash with an existing index, then that dangling index can
* still be listed through the API.
*/
public void testDanglingIndicesCanBeListedWhenAutoImportEnabled() throws Exception {
internalCluster().startNodes(3, buildSettings(0, true, true));
createIndices(INDEX_NAME);
ensurePendingDanglingIndicesWritten();
// Restart node, deleting the indices in its absence, so that there is a dangling index to recover
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
internalCluster().validateClusterFormed();
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME));
// Create another index with the same name, which will prevent the dangling
// index from being restored, and also ensures that we are checking index
// existence by UUID, not name.
//
// Note: don't call `createIndices()` here as it calls `ensureGreen()`, which will
// fail while a node is offline
createIndex(INDEX_NAME);
ensurePendingDanglingIndicesWritten();
return super.onNodeStopped(nodeName);
}
});
final List<DanglingIndexInfo> danglingIndices = listDanglingIndices();
assertThat(danglingIndices, hasSize(1));
assertThat(danglingIndices.get(0).getIndexName(), equalTo(INDEX_NAME));
}
/**
* Check that dangling indices can be imported.
*/
public void testDanglingIndicesCanBeImported() throws Exception {
internalCluster().startNodes(3, buildSettings(0, true, false));
final String stoppedNodeName = createDanglingIndices(INDEX_NAME);
final String danglingIndexUUID = findDanglingIndexForNode(stoppedNodeName, INDEX_NAME);
final ImportDanglingIndexRequest request = new ImportDanglingIndexRequest(danglingIndexUUID, true);
client().admin().cluster().importDanglingIndex(request).actionGet();
assertTrue("Expected dangling index " + INDEX_NAME + " to be recovered", indexExists(INDEX_NAME));
}
/**
* Check that the when sending an import-dangling-indices request, the specified UUIDs are validated as
* being dangling.
*/
public void testDanglingIndicesMustExistToBeImported() {
internalCluster().startNodes(1, buildSettings(0, true, false));
final ImportDanglingIndexRequest request = new ImportDanglingIndexRequest("NonExistentUUID", true);
final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> client().admin().cluster().importDanglingIndex(request).actionGet()
);
assertThat(e.getMessage(), containsString("No dangling index found for UUID [NonExistentUUID]"));
}
/**
* Check that a dangling index can only be imported if "accept_data_loss" is set to true.
*/
public void testMustAcceptDataLossToImportDanglingIndex() throws Exception {
internalCluster().startNodes(3, buildSettings(0, true, false));
final String stoppedNodeName = createDanglingIndices(INDEX_NAME);
final String danglingIndexUUID = findDanglingIndexForNode(stoppedNodeName, INDEX_NAME);
final ImportDanglingIndexRequest request = new ImportDanglingIndexRequest(danglingIndexUUID, false);
Exception e = expectThrows(Exception.class, () -> client().admin().cluster().importDanglingIndex(request).actionGet());
assertThat(e.getMessage(), containsString("accept_data_loss must be set to true"));
}
/**
* Check that dangling indices can be deleted. Since this requires that
* we add an entry to the index graveyard, the graveyard size must be
* greater than 1. To test deletes, we set the index graveyard size to
* 1, then create two indices and delete them both while one node in
* the cluster is stopped. The deletion of the second pushes the deletion
* of the first out of the graveyard. When the stopped node is resumed,
* only the second index will be found into the graveyard and the the
* other will be considered dangling, and can therefore be listed and
* deleted through the API
*/
public void testDanglingIndexCanBeDeleted() throws Exception {
final Settings settings = buildSettings(1, true, false);
internalCluster().startNodes(3, settings);
final String stoppedNodeName = createDanglingIndices(INDEX_NAME, OTHER_INDEX_NAME);
final String danglingIndexUUID = findDanglingIndexForNode(stoppedNodeName, INDEX_NAME);
client().admin().cluster().deleteDanglingIndex(new DeleteDanglingIndexRequest(danglingIndexUUID, true)).actionGet();
// The dangling index that we deleted ought to have been removed from disk. Check by
// creating and deleting another index, which creates a new tombstone entry, which should
// not cause the deleted dangling index to be considered "live" again, just because its
// tombstone has been pushed out of the graveyard.
createIndex("additional");
assertAcked(client().admin().indices().prepareDelete("additional"));
assertThat(listDanglingIndices(), is(empty()));
}
/**
* Check that when dangling index auto imports are enabled, and a dangling index is is discovered
* but cannot be imported due to a name clash with an existing index, then that dangling index can
* still be deleted through the API.
*/
public void testDanglingIndexCanBeDeletedWhenAutoImportEnabled() throws Exception {
final Settings settings = buildSettings(1, true, true);
internalCluster().startNodes(3, settings);
createIndices(INDEX_NAME, OTHER_INDEX_NAME);
ensurePendingDanglingIndicesWritten();
AtomicReference<String> stoppedNodeName = new AtomicReference<>();
// Restart node, deleting the indices in its absence, so that there is a dangling index to recover
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
internalCluster().validateClusterFormed();
stoppedNodeName.set(nodeName);
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME));
assertAcked(client().admin().indices().prepareDelete(OTHER_INDEX_NAME));
// Create another index with the same name, which will prevent the dangling
// index from being restored, and also ensures that we are checking index
// existence by UUID, not name.
//
// Note: don't call `createIndices()` here as it calls `ensureGreen()`, which will
// fail while a node is offline
createIndex(INDEX_NAME);
ensurePendingDanglingIndicesWritten();
return super.onNodeStopped(nodeName);
}
});
final String danglingIndexUUID = findDanglingIndexForNode(stoppedNodeName.get(), INDEX_NAME);
client().admin().cluster().deleteDanglingIndex(new DeleteDanglingIndexRequest(danglingIndexUUID, true)).actionGet();
// The dangling index that we deleted ought to have been removed from disk. Check by
// creating and deleting another index, which creates a new tombstone entry, which should
// not cause the deleted dangling index to be considered "live" again, just because its
// tombstone has been pushed out of the graveyard.
createIndex("additional");
assertAcked(client().admin().indices().prepareDelete("additional"));
assertThat(listDanglingIndices(), is(empty()));
}
/**
* Check that when a index is found to be dangling on more than one node, it can be deleted.
*/
public void testDanglingIndexOverMultipleNodesCanBeDeleted() throws Exception {
final Settings settings = buildSettings(1, true, false);
internalCluster().startNodes(3, settings);
createIndices(INDEX_NAME, OTHER_INDEX_NAME);
ensurePendingDanglingIndicesWritten();
// Restart 2 nodes, deleting the indices in their absence, so that there is a dangling index to recover
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
internalCluster().validateClusterFormed();
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME));
assertAcked(client().admin().indices().prepareDelete(OTHER_INDEX_NAME));
return super.onNodeStopped(nodeName);
}
});
return super.onNodeStopped(nodeName);
}
});
final AtomicReference<List<DanglingIndexInfo>> danglingIndices = new AtomicReference<>();
final List<DanglingIndexInfo> results = listDanglingIndices();
// Both the stopped nodes should have found a dangling index.
assertThat(results, hasSize(2));
danglingIndices.set(results);
// Try to delete the index - this request should succeed
client().admin()
.cluster()
.deleteDanglingIndex(new DeleteDanglingIndexRequest(danglingIndices.get().get(0).getIndexUUID(), true))
.actionGet();
// The dangling index that we deleted ought to have been removed from disk. Check by
// creating and deleting another index, which creates a new tombstone entry, which should
// not cause the deleted dangling index to be considered "live" again, just because its
// tombstone has been pushed out of the graveyard.
createIndex("additional");
assertAcked(client().admin().indices().prepareDelete("additional"));
assertBusy(() -> assertThat(listDanglingIndices(), empty()));
}
/**
* Check that when deleting a dangling index, it is required that the "accept_data_loss" flag is set.
*/
public void testDeleteDanglingIndicesRequiresDataLossFlagToBeTrue() throws Exception {
final Settings settings = buildSettings(1, true, false);
internalCluster().startNodes(3, settings);
final String stoppedNodeName = createDanglingIndices(INDEX_NAME, OTHER_INDEX_NAME);
final String danglingIndexUUID = findDanglingIndexForNode(stoppedNodeName, INDEX_NAME);
Exception e = expectThrows(
Exception.class,
() -> client().admin().cluster().deleteDanglingIndex(new DeleteDanglingIndexRequest(danglingIndexUUID, false)).actionGet()
);
assertThat(e.getMessage(), containsString("accept_data_loss must be set to true"));
}
/**
* Helper that fetches the current list of dangling indices.
*/
private List<DanglingIndexInfo> listDanglingIndices() {
final ListDanglingIndicesResponse response = client().admin()
.cluster()
.listDanglingIndices(new ListDanglingIndicesRequest())
.actionGet();
assertThat(response.status(), equalTo(RestStatus.OK));
final List<NodeListDanglingIndicesResponse> nodeResponses = response.getNodes();
final List<DanglingIndexInfo> results = new ArrayList<>();
for (NodeListDanglingIndicesResponse nodeResponse : nodeResponses) {
results.addAll(nodeResponse.getDanglingIndices());
}
return results;
}
/**
* Simple helper that creates one or more indices, and importantly,
* checks that they are green before proceeding. This is important
* because the tests in this class stop and restart nodes, assuming
* that each index has a primary or replica shard on every node, and if
* a node is stopped prematurely, this assumption is broken.
*/
private void createIndices(String... indices) {
assert indices.length > 0;
for (String index : indices) {
createIndex(index, Settings.builder().put("number_of_replicas", 2).put("routing.allocation.total_shards_per_node", 1).build());
}
ensureGreen(indices);
}
/**
* Creates a number of dangling indices by first creating then, then stopping a data node
* and deleting the indices while the node is stopped.
* @param indices the indices to create and delete
* @return the name of the stopped node
*/
private String createDanglingIndices(String... indices) throws Exception {
createIndices(indices);
ensurePendingDanglingIndicesWritten();
AtomicReference<String> stoppedNodeName = new AtomicReference<>();
// Restart node, deleting the indices in its absence, so that there is a dangling index to recover
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
internalCluster().validateClusterFormed();
stoppedNodeName.set(nodeName);
for (String index : indices) {
assertAcked(client().admin().indices().prepareDelete(index));
}
return super.onNodeStopped(nodeName);
}
});
return stoppedNodeName.get();
}
private String findDanglingIndexForNode(String stoppedNodeName, String indexName) {
String danglingIndexUUID = null;
final ListDanglingIndicesResponse response = client().admin()
.cluster()
.listDanglingIndices(new ListDanglingIndicesRequest())
.actionGet();
assertThat(response.status(), equalTo(RestStatus.OK));
final List<NodeListDanglingIndicesResponse> nodeResponses = response.getNodes();
for (NodeListDanglingIndicesResponse nodeResponse : nodeResponses) {
if (nodeResponse.getNode().getName().equals(stoppedNodeName)) {
final DanglingIndexInfo danglingIndexInfo = nodeResponse.getDanglingIndices().get(0);
assertThat(danglingIndexInfo.getIndexName(), equalTo(indexName));
danglingIndexUUID = danglingIndexInfo.getIndexUUID();
break;
}
}
assertNotNull("Failed to find a dangling index UUID for node [" + stoppedNodeName + "]", danglingIndexUUID);
return danglingIndexUUID;
}
}

View File

@ -107,6 +107,14 @@ import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.admin.indices.create.AutoCreateAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.admin.indices.dangling.delete.DeleteDanglingIndexAction;
import org.elasticsearch.action.admin.indices.dangling.delete.TransportDeleteDanglingIndexAction;
import org.elasticsearch.action.admin.indices.dangling.find.FindDanglingIndexAction;
import org.elasticsearch.action.admin.indices.dangling.find.TransportFindDanglingIndexAction;
import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDanglingIndexAction;
import org.elasticsearch.action.admin.indices.dangling.import_index.TransportImportDanglingIndexAction;
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesAction;
import org.elasticsearch.action.admin.indices.dangling.list.TransportListDanglingIndicesAction;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction;
@ -289,6 +297,9 @@ import org.elasticsearch.rest.action.admin.cluster.RestRemoteClusterInfoAction;
import org.elasticsearch.rest.action.admin.cluster.RestRestoreSnapshotAction;
import org.elasticsearch.rest.action.admin.cluster.RestSnapshotsStatusAction;
import org.elasticsearch.rest.action.admin.cluster.RestVerifyRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.dangling.RestDeleteDanglingIndexAction;
import org.elasticsearch.rest.action.admin.cluster.dangling.RestImportDanglingIndexAction;
import org.elasticsearch.rest.action.admin.cluster.dangling.RestListDanglingIndicesAction;
import org.elasticsearch.rest.action.admin.indices.RestAnalyzeAction;
import org.elasticsearch.rest.action.admin.indices.RestClearIndicesCacheAction;
import org.elasticsearch.rest.action.admin.indices.RestCloseIndexAction;
@ -625,6 +636,12 @@ public class ActionModule extends AbstractModule {
actions.register(RetentionLeaseActions.Renew.INSTANCE, RetentionLeaseActions.Renew.TransportAction.class);
actions.register(RetentionLeaseActions.Remove.INSTANCE, RetentionLeaseActions.Remove.TransportAction.class);
// Dangling indices
actions.register(ListDanglingIndicesAction.INSTANCE, TransportListDanglingIndicesAction.class);
actions.register(ImportDanglingIndexAction.INSTANCE, TransportImportDanglingIndexAction.class);
actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class);
actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class);
return unmodifiableMap(actions.getRegistry());
}
@ -759,6 +776,11 @@ public class ActionModule extends AbstractModule {
registerHandler.accept(new RestDeletePipelineAction());
registerHandler.accept(new RestSimulatePipelineAction());
// Dangling indices API
registerHandler.accept(new RestListDanglingIndicesAction());
registerHandler.accept(new RestImportDanglingIndexAction());
registerHandler.accept(new RestDeleteDanglingIndexAction());
// Data Stream API
if (DATASTREAMS_FEATURE_ENABLED) {
registerHandler.accept(new RestCreateDataStreamAction());

View File

@ -0,0 +1,75 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
/**
* Contains information about a dangling index, i.e. an index that Elasticsearch has found
* on-disk but is not present in the cluster state.
*/
public class DanglingIndexInfo implements Writeable {
private final String nodeId;
private final String indexName;
private final String indexUUID;
private final long creationDateMillis;
public DanglingIndexInfo(String nodeId, String indexName, String indexUUID, long creationDateMillis) {
this.nodeId = nodeId;
this.indexName = indexName;
this.indexUUID = indexUUID;
this.creationDateMillis = creationDateMillis;
}
public DanglingIndexInfo(StreamInput in) throws IOException {
this.nodeId = in.readString();
this.indexName = in.readString();
this.indexUUID = in.readString();
this.creationDateMillis = in.readLong();
}
public String getIndexName() {
return indexName;
}
public String getIndexUUID() {
return indexUUID;
}
public String getNodeId() {
return this.nodeId;
}
public long getCreationDateMillis() {
return creationDateMillis;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(this.nodeId);
out.writeString(this.indexName);
out.writeString(this.indexUUID);
out.writeLong(this.creationDateMillis);
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling.delete;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
/**
* This action causes a dangling index to be considered as deleted by the cluster.
*/
public class DeleteDanglingIndexAction extends ActionType<AcknowledgedResponse> {
public static final DeleteDanglingIndexAction INSTANCE = new DeleteDanglingIndexAction();
public static final String NAME = "cluster:admin/indices/dangling/delete";
private DeleteDanglingIndexAction() {
super(NAME, AcknowledgedResponse::new);
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling.delete;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.Objects;
/**
* Represents a request to delete a particular dangling index, specified by its UUID. The {@link #acceptDataLoss}
* flag must also be explicitly set to true, or later validation will fail.
*/
public class DeleteDanglingIndexRequest extends AcknowledgedRequest<DeleteDanglingIndexRequest> {
private final String indexUUID;
private final boolean acceptDataLoss;
public DeleteDanglingIndexRequest(StreamInput in) throws IOException {
super(in);
this.indexUUID = in.readString();
this.acceptDataLoss = in.readBoolean();
}
public DeleteDanglingIndexRequest(String indexUUID, boolean acceptDataLoss) {
super();
this.indexUUID = Objects.requireNonNull(indexUUID, "indexUUID cannot be null");
this.acceptDataLoss = acceptDataLoss;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
public String getIndexUUID() {
return indexUUID;
}
public boolean isAcceptDataLoss() {
return acceptDataLoss;
}
@Override
public String toString() {
return "DeleteDanglingIndexRequest{" + "indexUUID='" + indexUUID + "', acceptDataLoss=" + acceptDataLoss + '}';
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(this.indexUUID);
out.writeBoolean(this.acceptDataLoss);
}
}

View File

@ -0,0 +1,240 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling.delete;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.indices.dangling.DanglingIndexInfo;
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesAction;
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest;
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesResponse;
import org.elasticsearch.action.admin.indices.dangling.list.NodeListDanglingIndicesResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexGraveyard;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
/**
* Implements the deletion of a dangling index. When handling a {@link DeleteDanglingIndexAction},
* this class first checks that such a dangling index exists. It then submits a cluster state update
* to add the index to the index graveyard.
*/
public class TransportDeleteDanglingIndexAction extends TransportMasterNodeAction<DeleteDanglingIndexRequest, AcknowledgedResponse> {
private static final Logger logger = LogManager.getLogger(TransportDeleteDanglingIndexAction.class);
private final Settings settings;
private final NodeClient nodeClient;
@Inject
public TransportDeleteDanglingIndexAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Settings settings,
NodeClient nodeClient
) {
super(
DeleteDanglingIndexAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
DeleteDanglingIndexRequest::new,
indexNameExpressionResolver
);
this.settings = settings;
this.nodeClient = nodeClient;
}
@Override
protected String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
protected AcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in);
}
@Override
protected void masterOperation(
DeleteDanglingIndexRequest deleteRequest,
ClusterState state,
ActionListener<AcknowledgedResponse> deleteListener
) throws Exception {
findDanglingIndex(deleteRequest.getIndexUUID(), new ActionListener<Index>() {
@Override
public void onResponse(Index indexToDelete) {
// This flag is checked at this point so that we always check that the supplied index ID
// does correspond to a dangling index.
if (deleteRequest.isAcceptDataLoss() == false) {
deleteListener.onFailure(new IllegalArgumentException("accept_data_loss must be set to true"));
return;
}
String indexName = indexToDelete.getName();
String indexUUID = indexToDelete.getUUID();
final ActionListener<AcknowledgedResponse> clusterStateUpdatedListener = new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse response) {
deleteListener.onResponse(response);
}
@Override
public void onFailure(Exception e) {
logger.debug("Failed to delete dangling index [" + indexName + "] [" + indexUUID + "]", e);
deleteListener.onFailure(e);
}
};
final String taskSource = "delete-dangling-index [" + indexName + "] [" + indexUUID + "]";
clusterService.submitStateUpdateTask(
taskSource,
new AckedClusterStateUpdateTask<AcknowledgedResponse>(deleteRequest, clusterStateUpdatedListener) {
@Override
protected AcknowledgedResponse newResponse(boolean acknowledged) {
return new AcknowledgedResponse(acknowledged);
}
@Override
public ClusterState execute(final ClusterState currentState) {
return deleteDanglingIndex(currentState, indexToDelete);
}
}
);
}
@Override
public void onFailure(Exception e) {
logger.debug("Failed to find dangling index [" + deleteRequest.getIndexUUID() + "]", e);
deleteListener.onFailure(e);
}
});
}
private ClusterState deleteDanglingIndex(ClusterState currentState, Index indexToDelete) {
final Metadata metaData = currentState.getMetadata();
for (ObjectObjectCursor<String, IndexMetadata> each : metaData.indices()) {
if (indexToDelete.getUUID().equals(each.value.getIndexUUID())) {
throw new IllegalArgumentException(
"Refusing to delete dangling index "
+ indexToDelete
+ " as an index with UUID ["
+ indexToDelete.getUUID()
+ "] already exists in the cluster state"
);
}
}
// By definition, a dangling index is an index not present in the cluster state and with no tombstone,
// so we shouldn't reach this point if these conditions aren't met. For super-safety, however, check
// that a tombstone doesn't already exist for this index.
if (metaData.indexGraveyard().containsIndex(indexToDelete)) {
return currentState;
}
Metadata.Builder metaDataBuilder = Metadata.builder(metaData);
final IndexGraveyard newGraveyard = IndexGraveyard.builder(metaDataBuilder.indexGraveyard())
.addTombstone(indexToDelete)
.build(settings);
metaDataBuilder.indexGraveyard(newGraveyard);
return ClusterState.builder(currentState).metadata(metaDataBuilder.build()).build();
}
@Override
protected ClusterBlockException checkBlock(DeleteDanglingIndexRequest request, ClusterState state) {
return null;
}
private void findDanglingIndex(String indexUUID, ActionListener<Index> listener) {
this.nodeClient.execute(
ListDanglingIndicesAction.INSTANCE,
new ListDanglingIndicesRequest(indexUUID),
new ActionListener<ListDanglingIndicesResponse>() {
@Override
public void onResponse(ListDanglingIndicesResponse response) {
if (response.hasFailures()) {
final String nodeIds = response.failures()
.stream()
.map(FailedNodeException::nodeId)
.collect(Collectors.joining(","));
ElasticsearchException e = new ElasticsearchException("Failed to query nodes [" + nodeIds + "]");
for (FailedNodeException failure : response.failures()) {
logger.error("Failed to query node [" + failure.nodeId() + "]", failure);
e.addSuppressed(failure);
}
listener.onFailure(e);
return;
}
final List<NodeListDanglingIndicesResponse> nodes = response.getNodes();
for (NodeListDanglingIndicesResponse nodeResponse : nodes) {
for (DanglingIndexInfo each : nodeResponse.getDanglingIndices()) {
if (each.getIndexUUID().equals(indexUUID)) {
listener.onResponse(new Index(each.getIndexName(), each.getIndexUUID()));
return;
}
}
}
listener.onFailure(new IllegalArgumentException("No dangling index found for UUID [" + indexUUID + "]"));
}
@Override
public void onFailure(Exception exp) {
listener.onFailure(exp);
}
}
);
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling.find;
import org.elasticsearch.action.ActionType;
/**
* Represents a request to find a particular dangling index by UUID.
*/
public class FindDanglingIndexAction extends ActionType<FindDanglingIndexResponse> {
public static final FindDanglingIndexAction INSTANCE = new FindDanglingIndexAction();
public static final String NAME = "cluster:admin/indices/dangling/find";
private FindDanglingIndexAction() {
super(NAME, FindDanglingIndexResponse::new);
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling.find;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
public class FindDanglingIndexRequest extends BaseNodesRequest<FindDanglingIndexRequest> {
private final String indexUUID;
public FindDanglingIndexRequest(StreamInput in) throws IOException {
super(in);
this.indexUUID = in.readString();
}
public FindDanglingIndexRequest(String indexUUID) {
super(Strings.EMPTY_ARRAY);
this.indexUUID = indexUUID;
}
public String getIndexUUID() {
return indexUUID;
}
@Override
public String toString() {
return "FindDanglingIndicesRequest{indexUUID='" + indexUUID + "'}";
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(this.indexUUID);
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling.find;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.List;
/**
* Models a response to a {@link FindDanglingIndexRequest}. A find request queries every node in the
* cluster looking for a dangling index with a specific UUID.
*/
public class FindDanglingIndexResponse extends BaseNodesResponse<NodeFindDanglingIndexResponse> {
public FindDanglingIndexResponse(StreamInput in) throws IOException {
super(in);
}
public FindDanglingIndexResponse(
ClusterName clusterName,
List<NodeFindDanglingIndexResponse> nodes,
List<FailedNodeException> failures
) {
super(clusterName, nodes, failures);
}
@Override
protected List<NodeFindDanglingIndexResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodeFindDanglingIndexResponse::new);
}
@Override
protected void writeNodesTo(StreamOutput out, List<NodeFindDanglingIndexResponse> nodes) throws IOException {
out.writeList(nodes);
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling.find;
import java.io.IOException;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
/**
* Used when querying every node in the cluster for a specific dangling index.
*/
public class NodeFindDanglingIndexRequest extends BaseNodeRequest {
private final String indexUUID;
public NodeFindDanglingIndexRequest(String indexUUID) {
this.indexUUID = indexUUID;
}
public NodeFindDanglingIndexRequest(StreamInput in) throws IOException {
super(in);
this.indexUUID = in.readString();
}
public String getIndexUUID() {
return indexUUID;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(this.indexUUID);
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling.find;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.List;
/**
* Used when querying every node in the cluster for a specific dangling index.
*/
public class NodeFindDanglingIndexResponse extends BaseNodeResponse {
/**
* A node could report several dangling indices. This class will contain them all.
* A single node could even multiple different index versions for the same index
* UUID if the situation is really crazy, though perhaps this is more likely
* when collating responses from different nodes.
*/
private final List<IndexMetadata> danglingIndexInfo;
public List<IndexMetadata> getDanglingIndexInfo() {
return this.danglingIndexInfo;
}
public NodeFindDanglingIndexResponse(DiscoveryNode node, List<IndexMetadata> danglingIndexInfo) {
super(node);
this.danglingIndexInfo = danglingIndexInfo;
}
protected NodeFindDanglingIndexResponse(StreamInput in) throws IOException {
super(in);
this.danglingIndexInfo = in.readList(IndexMetadata::readFrom);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(this.danglingIndexInfo);
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling.find;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.gateway.DanglingIndicesState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
/**
* Finds a specified dangling index by its UUID, searching across all nodes.
*/
public class TransportFindDanglingIndexAction extends TransportNodesAction<
FindDanglingIndexRequest,
FindDanglingIndexResponse,
NodeFindDanglingIndexRequest,
NodeFindDanglingIndexResponse> {
private final TransportService transportService;
private final DanglingIndicesState danglingIndicesState;
@Inject
public TransportFindDanglingIndexAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
DanglingIndicesState danglingIndicesState
) {
super(
FindDanglingIndexAction.NAME,
threadPool,
clusterService,
transportService,
actionFilters,
FindDanglingIndexRequest::new,
NodeFindDanglingIndexRequest::new,
ThreadPool.Names.MANAGEMENT,
NodeFindDanglingIndexResponse.class
);
this.transportService = transportService;
this.danglingIndicesState = danglingIndicesState;
}
@Override
protected FindDanglingIndexResponse newResponse(
FindDanglingIndexRequest request,
List<NodeFindDanglingIndexResponse> nodeResponses,
List<FailedNodeException> failures
) {
return new FindDanglingIndexResponse(clusterService.getClusterName(), nodeResponses, failures);
}
@Override
protected NodeFindDanglingIndexRequest newNodeRequest(FindDanglingIndexRequest request) {
return new NodeFindDanglingIndexRequest(request.getIndexUUID());
}
@Override
protected NodeFindDanglingIndexResponse newNodeResponse(StreamInput in) throws IOException {
return new NodeFindDanglingIndexResponse(in);
}
@Override
protected NodeFindDanglingIndexResponse nodeOperation(NodeFindDanglingIndexRequest request) {
final DiscoveryNode localNode = transportService.getLocalNode();
final String indexUUID = request.getIndexUUID();
final List<IndexMetadata> danglingIndexInfo = new ArrayList<>();
for (IndexMetadata each : danglingIndicesState.getDanglingIndices().values()) {
if (each.getIndexUUID().equals(indexUUID)) {
danglingIndexInfo.add(each);
}
}
return new NodeFindDanglingIndexResponse(localNode, danglingIndexInfo);
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling.import_index;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
/**
* Represents a request to import a particular dangling index.
*/
public class ImportDanglingIndexAction extends ActionType<AcknowledgedResponse> {
public static final ImportDanglingIndexAction INSTANCE = new ImportDanglingIndexAction();
public static final String NAME = "cluster:admin/indices/dangling/import";
private ImportDanglingIndexAction() {
super(NAME, AcknowledgedResponse::new);
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling.import_index;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
/**
* Represents a request to import a particular dangling index, specified
* by its UUID. The {@link #acceptDataLoss} flag must also be
* explicitly set to true, or later validation will fail.
*/
public class ImportDanglingIndexRequest extends AcknowledgedRequest<ImportDanglingIndexRequest> {
private final String indexUUID;
private final boolean acceptDataLoss;
public ImportDanglingIndexRequest(StreamInput in) throws IOException {
super(in);
this.indexUUID = in.readString();
this.acceptDataLoss = in.readBoolean();
}
@Override
public ActionRequestValidationException validate() {
return null;
}
public ImportDanglingIndexRequest(String indexUUID, boolean acceptDataLoss) {
super();
this.indexUUID = Objects.requireNonNull(indexUUID, "indexUUID cannot be null");
this.acceptDataLoss = acceptDataLoss;
}
public String getIndexUUID() {
return indexUUID;
}
public boolean isAcceptDataLoss() {
return acceptDataLoss;
}
@Override
public String toString() {
return String.format(Locale.ROOT, "ImportDanglingIndexRequest{indexUUID='%s', acceptDataLoss=%s}", indexUUID, acceptDataLoss);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(this.indexUUID);
out.writeBoolean(this.acceptDataLoss);
}
}

View File

@ -0,0 +1,163 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling.import_index;
import static java.util.Collections.singletonList;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.indices.dangling.find.FindDanglingIndexAction;
import org.elasticsearch.action.admin.indices.dangling.find.FindDanglingIndexRequest;
import org.elasticsearch.action.admin.indices.dangling.find.FindDanglingIndexResponse;
import org.elasticsearch.action.admin.indices.dangling.find.NodeFindDanglingIndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.gateway.LocalAllocateDangledIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
/**
* Implements the import of a dangling index. When handling a {@link ImportDanglingIndexAction},
* this class first checks that such a dangling index exists. It then calls {@link LocalAllocateDangledIndices}
* to perform the actual allocation.
*/
public class TransportImportDanglingIndexAction extends HandledTransportAction<ImportDanglingIndexRequest, AcknowledgedResponse> {
private static final Logger logger = LogManager.getLogger(TransportImportDanglingIndexAction.class);
private final LocalAllocateDangledIndices danglingIndexAllocator;
private final NodeClient nodeClient;
@Inject
public TransportImportDanglingIndexAction(
ActionFilters actionFilters,
TransportService transportService,
LocalAllocateDangledIndices danglingIndexAllocator,
NodeClient nodeClient
) {
super(ImportDanglingIndexAction.NAME, transportService, actionFilters, ImportDanglingIndexRequest::new);
this.danglingIndexAllocator = danglingIndexAllocator;
this.nodeClient = nodeClient;
}
@Override
protected void doExecute(Task task, ImportDanglingIndexRequest importRequest, ActionListener<AcknowledgedResponse> importListener) {
findDanglingIndex(importRequest, new ActionListener<IndexMetadata>() {
@Override
public void onResponse(IndexMetadata indexMetaDataToImport) {
// This flag is checked at this point so that we always check that the supplied index UUID
// does correspond to a dangling index.
if (importRequest.isAcceptDataLoss() == false) {
importListener.onFailure(new IllegalArgumentException("accept_data_loss must be set to true"));
return;
}
String indexName = indexMetaDataToImport.getIndex().getName();
String indexUUID = indexMetaDataToImport.getIndexUUID();
danglingIndexAllocator.allocateDangled(
singletonList(indexMetaDataToImport),
new ActionListener<LocalAllocateDangledIndices.AllocateDangledResponse>() {
@Override
public void onResponse(LocalAllocateDangledIndices.AllocateDangledResponse allocateDangledResponse) {
importListener.onResponse(new AcknowledgedResponse(true));
}
@Override
public void onFailure(Exception e) {
logger.debug("Failed to import dangling index [" + indexName + "] [" + indexUUID + "]", e);
importListener.onFailure(e);
}
}
);
}
@Override
public void onFailure(Exception e) {
logger.debug("Failed to find dangling index [" + importRequest.getIndexUUID() + "]", e);
importListener.onFailure(e);
}
});
}
private void findDanglingIndex(ImportDanglingIndexRequest request, ActionListener<IndexMetadata> listener) {
final String indexUUID = request.getIndexUUID();
this.nodeClient.execute(
FindDanglingIndexAction.INSTANCE,
new FindDanglingIndexRequest(indexUUID),
new ActionListener<FindDanglingIndexResponse>() {
@Override
public void onResponse(FindDanglingIndexResponse response) {
if (response.hasFailures()) {
final String nodeIds = response.failures()
.stream()
.map(FailedNodeException::nodeId)
.collect(Collectors.joining(","));
ElasticsearchException e = new ElasticsearchException("Failed to query nodes [" + nodeIds + "]");
for (FailedNodeException failure : response.failures()) {
logger.error("Failed to query node [" + failure.nodeId() + "]", failure);
e.addSuppressed(failure);
}
listener.onFailure(e);
return;
}
final List<IndexMetadata> metaDataSortedByVersion = new ArrayList<>();
for (NodeFindDanglingIndexResponse each : response.getNodes()) {
metaDataSortedByVersion.addAll(each.getDanglingIndexInfo());
}
metaDataSortedByVersion.sort(Comparator.comparingLong(IndexMetadata::getVersion));
if (metaDataSortedByVersion.isEmpty()) {
listener.onFailure(new IllegalArgumentException("No dangling index found for UUID [" + indexUUID + "]"));
return;
}
logger.debug(
"Metadata versions {} found for index UUID [{}], selecting the highest",
metaDataSortedByVersion.stream().map(IndexMetadata::getVersion).collect(Collectors.toList()),
indexUUID
);
listener.onResponse(metaDataSortedByVersion.get(metaDataSortedByVersion.size() - 1));
}
@Override
public void onFailure(Exception exp) {
listener.onFailure(exp);
}
}
);
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling.list;
import org.elasticsearch.action.ActionType;
/**
* Represents a request to list all dangling indices known to the cluster.
*/
public class ListDanglingIndicesAction extends ActionType<ListDanglingIndicesResponse> {
public static final ListDanglingIndicesAction INSTANCE = new ListDanglingIndicesAction();
public static final String NAME = "cluster:admin/indices/dangling/list";
private ListDanglingIndicesAction() {
super(NAME, ListDanglingIndicesResponse::new);
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling.list;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
public class ListDanglingIndicesRequest extends BaseNodesRequest<ListDanglingIndicesRequest> {
/**
* Filter the response by index UUID. Leave as null to find all indices.
*/
private final String indexUUID;
public ListDanglingIndicesRequest(StreamInput in) throws IOException {
super(in);
this.indexUUID = in.readOptionalString();
}
public ListDanglingIndicesRequest() {
super(Strings.EMPTY_ARRAY);
this.indexUUID = null;
}
public ListDanglingIndicesRequest(String indexUUID) {
super(Strings.EMPTY_ARRAY);
this.indexUUID = indexUUID;
}
public String getIndexUUID() {
return indexUUID;
}
@Override
public String toString() {
return "ListDanglingIndicesRequest{indexUUID='" + indexUUID + "'}";
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(this.indexUUID);
}
}

View File

@ -0,0 +1,168 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling.list;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.indices.dangling.DanglingIndexInfo;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
/**
* Models a response to a {@link ListDanglingIndicesRequest}. A list request queries every node in the
* cluster and aggregates their responses. When the aggregated response is converted to {@link XContent},
* information for each dangling index is presented under the "dangling_indices" key. If any nodes
* in the cluster failed to answer, the details are presented under the "_nodes.failures" key.
*/
public class ListDanglingIndicesResponse extends BaseNodesResponse<NodeListDanglingIndicesResponse> implements StatusToXContentObject {
public ListDanglingIndicesResponse(StreamInput in) throws IOException {
super(in);
}
public ListDanglingIndicesResponse(
ClusterName clusterName,
List<NodeListDanglingIndicesResponse> nodes,
List<FailedNodeException> failures
) {
super(clusterName, nodes, failures);
}
@Override
public RestStatus status() {
return this.hasFailures() ? RestStatus.INTERNAL_SERVER_ERROR : RestStatus.OK;
}
// Visible for testing
static Collection<AggregatedDanglingIndexInfo> resultsByIndexUUID(List<NodeListDanglingIndicesResponse> nodes) {
Map<String, AggregatedDanglingIndexInfo> byIndexUUID = new HashMap<>();
for (NodeListDanglingIndicesResponse nodeResponse : nodes) {
for (DanglingIndexInfo info : nodeResponse.getDanglingIndices()) {
final String indexUUID = info.getIndexUUID();
final AggregatedDanglingIndexInfo aggregatedInfo = byIndexUUID.computeIfAbsent(
indexUUID,
(_uuid) -> new AggregatedDanglingIndexInfo(indexUUID, info.getIndexName(), info.getCreationDateMillis())
);
aggregatedInfo.nodeIds.add(info.getNodeId());
}
}
return byIndexUUID.values();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray("dangling_indices");
for (AggregatedDanglingIndexInfo info : resultsByIndexUUID(this.getNodes())) {
builder.startObject();
builder.field("index_name", info.indexName);
builder.field("index_uuid", info.indexUUID);
builder.timeField("creation_date_millis", "creation_date", info.creationDateMillis);
builder.array("node_ids", info.nodeIds.toArray(new String[0]));
builder.endObject();
}
return builder.endArray();
}
@Override
protected List<NodeListDanglingIndicesResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodeListDanglingIndicesResponse::new);
}
@Override
protected void writeNodesTo(StreamOutput out, List<NodeListDanglingIndicesResponse> nodes) throws IOException {
out.writeList(nodes);
}
// visible for testing
static class AggregatedDanglingIndexInfo {
private final String indexUUID;
private final String indexName;
private final long creationDateMillis;
private final List<String> nodeIds;
AggregatedDanglingIndexInfo(String indexUUID, String indexName, long creationDateMillis) {
this.indexUUID = indexUUID;
this.indexName = indexName;
this.creationDateMillis = creationDateMillis;
this.nodeIds = new ArrayList<>();
}
// the methods below are used in the unit tests
public List<String> getNodeIds() {
return nodeIds;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AggregatedDanglingIndexInfo that = (AggregatedDanglingIndexInfo) o;
return creationDateMillis == that.creationDateMillis
&& indexUUID.equals(that.indexUUID)
&& indexName.equals(that.indexName)
&& nodeIds.equals(that.nodeIds);
}
@Override
public int hashCode() {
return Objects.hash(indexUUID, indexName, creationDateMillis, nodeIds);
}
@Override
public String toString() {
return String.format(
Locale.ROOT,
"AggregatedDanglingIndexInfo{indexUUID='%s', indexName='%s', creationDateMillis=%d, nodeIds=%s}",
indexUUID,
indexName,
creationDateMillis,
nodeIds
);
}
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling.list;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* Used when querying every node in the cluster for dangling indices, in response to a list request.
*/
public class NodeListDanglingIndicesRequest extends BaseNodeRequest {
/**
* Filter the response by index UUID. Leave as null to find all indices.
*/
private final String indexUUID;
public NodeListDanglingIndicesRequest(String indexUUID) {
this.indexUUID = indexUUID;
}
public NodeListDanglingIndicesRequest(StreamInput in) throws IOException {
super(in);
this.indexUUID = in.readOptionalString();
}
public String getIndexUUID() {
return indexUUID;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(indexUUID);
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling.list;
import org.elasticsearch.action.admin.indices.dangling.DanglingIndexInfo;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.List;
/**
* Used when querying every node in the cluster for dangling indices, in response to a list request.
*/
public class NodeListDanglingIndicesResponse extends BaseNodeResponse {
private final List<DanglingIndexInfo> indexMetaData;
public List<DanglingIndexInfo> getDanglingIndices() {
return this.indexMetaData;
}
public NodeListDanglingIndicesResponse(DiscoveryNode node, List<DanglingIndexInfo> indexMetaData) {
super(node);
this.indexMetaData = indexMetaData;
}
protected NodeListDanglingIndicesResponse(StreamInput in) throws IOException {
super(in);
this.indexMetaData = in.readList(DanglingIndexInfo::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(this.indexMetaData);
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling.list;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.indices.dangling.DanglingIndexInfo;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.gateway.DanglingIndicesState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Implements the listing of all dangling indices. All nodes in the cluster are queried, and
* their answers aggregated. Finding dangling indices is performed in {@link DanglingIndicesState}.
*/
public class TransportListDanglingIndicesAction extends TransportNodesAction<
ListDanglingIndicesRequest,
ListDanglingIndicesResponse,
NodeListDanglingIndicesRequest,
NodeListDanglingIndicesResponse> {
private final TransportService transportService;
private final DanglingIndicesState danglingIndicesState;
@Inject
public TransportListDanglingIndicesAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
DanglingIndicesState danglingIndicesState
) {
super(
ListDanglingIndicesAction.NAME,
threadPool,
clusterService,
transportService,
actionFilters,
ListDanglingIndicesRequest::new,
NodeListDanglingIndicesRequest::new,
ThreadPool.Names.MANAGEMENT,
NodeListDanglingIndicesResponse.class
);
this.transportService = transportService;
this.danglingIndicesState = danglingIndicesState;
}
@Override
protected ListDanglingIndicesResponse newResponse(
ListDanglingIndicesRequest request,
List<NodeListDanglingIndicesResponse> nodeListDanglingIndicesResponse,
List<FailedNodeException> failures
) {
return new ListDanglingIndicesResponse(clusterService.getClusterName(), nodeListDanglingIndicesResponse, failures);
}
@Override
protected NodeListDanglingIndicesRequest newNodeRequest(ListDanglingIndicesRequest request) {
return new NodeListDanglingIndicesRequest(request.getIndexUUID());
}
@Override
protected NodeListDanglingIndicesResponse newNodeResponse(StreamInput in) throws IOException {
return new NodeListDanglingIndicesResponse(in);
}
@Override
protected NodeListDanglingIndicesResponse nodeOperation(NodeListDanglingIndicesRequest request) {
final DiscoveryNode localNode = transportService.getLocalNode();
final List<DanglingIndexInfo> indexMetaData = new ArrayList<>();
final String indexFilter = request.getIndexUUID();
for (IndexMetadata each : danglingIndicesState.getDanglingIndices().values()) {
if (indexFilter == null || indexFilter.equals(each.getIndexUUID())) {
DanglingIndexInfo danglingIndexInfo = new DanglingIndexInfo(
localNode.getId(),
each.getIndex().getName(),
each.getIndexUUID(),
each.getCreationDate()
);
indexMetaData.add(danglingIndexInfo);
}
}
return new NodeListDanglingIndicesResponse(localNode, indexMetaData);
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Dangling indices are indices that exist on disk on one or more nodes but
* which do not currently exist in the cluster state. They arise in a
* number of situations, such as:
*
* <ul>
* <li>A user overflows the index graveyard by deleting more than 500 indices while a node is offline and then the node rejoins the
* cluster</li>
* <li>A node (unsafely) moves from one cluster to another, perhaps because the original cluster lost all its master nodes</li>
* <li>A user (unsafely) meddles with the contents of the data path, maybe restoring an old index folder from a backup</li>
* <li>A disk partially fails and the user has no replicas and no snapshots and wants to (unsafely) recover whatever they can</li>
* <li>A cluster loses all master nodes and those are (unsafely) restored from backup, but the backup does not contain the index</li>
* </ul>
*
* <p>The classes in this package form an API for managing dangling indices, allowing them to be listed, imported or deleted.
*/
package org.elasticsearch.action.admin.indices.dangling;

View File

@ -91,7 +91,7 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
* @throws NullPointerException if {@code nodesResponses} is {@code null}
* @see #newResponse(BaseNodesRequest, List, List)
*/
protected NodesResponse newResponse(NodesRequest request, AtomicReferenceArray nodesResponses) {
protected NodesResponse newResponse(NodesRequest request, AtomicReferenceArray<?> nodesResponses) {
final List<NodeResponse> responses = new ArrayList<>();
final List<FailedNodeException> failures = new ArrayList<>();

View File

@ -101,6 +101,10 @@ import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptReque
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.action.admin.indices.dangling.delete.DeleteDanglingIndexRequest;
import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDanglingIndexRequest;
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest;
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesResponse;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequestBuilder;
import org.elasticsearch.action.ingest.GetPipelineRequest;
@ -718,4 +722,34 @@ public interface ClusterAdminClient extends ElasticsearchClient {
* Get a script from the cluster state
*/
ActionFuture<GetStoredScriptResponse> getStoredScript(GetStoredScriptRequest request);
/**
* List dangling indices on all nodes.
*/
void listDanglingIndices(ListDanglingIndicesRequest request, ActionListener<ListDanglingIndicesResponse> listener);
/**
* List dangling indices on all nodes.
*/
ActionFuture<ListDanglingIndicesResponse> listDanglingIndices(ListDanglingIndicesRequest request);
/**
* Restore specified dangling indices.
*/
void importDanglingIndex(ImportDanglingIndexRequest request, ActionListener<AcknowledgedResponse> listener);
/**
* Restore specified dangling indices.
*/
ActionFuture<AcknowledgedResponse> importDanglingIndex(ImportDanglingIndexRequest request);
/**
* Delete specified dangling indices.
*/
void deleteDanglingIndex(DeleteDanglingIndexRequest request, ActionListener<AcknowledgedResponse> listener);
/**
* Delete specified dangling indices.
*/
ActionFuture<AcknowledgedResponse> deleteDanglingIndex(DeleteDanglingIndexRequest request);
}

View File

@ -21,11 +21,11 @@ package org.elasticsearch.client.support;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainAction;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequestBuilder;
@ -162,6 +162,13 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.dangling.delete.DeleteDanglingIndexAction;
import org.elasticsearch.action.admin.indices.dangling.delete.DeleteDanglingIndexRequest;
import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDanglingIndexAction;
import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDanglingIndexRequest;
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesAction;
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest;
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
@ -1168,6 +1175,36 @@ public abstract class AbstractClient implements Client {
execute(GetStoredScriptAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<ListDanglingIndicesResponse> listDanglingIndices(ListDanglingIndicesRequest request) {
return execute(ListDanglingIndicesAction.INSTANCE, request);
}
@Override
public void listDanglingIndices(ListDanglingIndicesRequest request, ActionListener<ListDanglingIndicesResponse> listener) {
execute(ListDanglingIndicesAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<AcknowledgedResponse> importDanglingIndex(ImportDanglingIndexRequest request) {
return execute(ImportDanglingIndexAction.INSTANCE, request);
}
@Override
public void importDanglingIndex(ImportDanglingIndexRequest request, ActionListener<AcknowledgedResponse> listener) {
execute(ImportDanglingIndexAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<AcknowledgedResponse> deleteDanglingIndex(DeleteDanglingIndexRequest request) {
return execute(DeleteDanglingIndexAction.INSTANCE, request);
}
@Override
public void deleteDanglingIndex(DeleteDanglingIndexRequest request, ActionListener<AcknowledgedResponse> listener) {
execute(DeleteDanglingIndexAction.INSTANCE, request, listener);
}
@Override
public GetStoredScriptRequestBuilder prepareGetStoredScript() {
return new GetStoredScriptRequestBuilder(this, GetStoredScriptAction.INSTANCE);

View File

@ -21,8 +21,9 @@ package org.elasticsearch.cluster;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexGraveyard;
import org.elasticsearch.cluster.metadata.IndexGraveyard.IndexGraveyardDiff;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.ImmutableOpenMap;
@ -256,18 +257,43 @@ public class ClusterChangedEvent {
if (metadataChanged() == false || isNewCluster()) {
return Collections.emptyList();
}
List<Index> deleted = null;
for (ObjectCursor<IndexMetadata> cursor : previousState.metadata().indices().values()) {
Set<Index> deleted = null;
final Metadata previousMetadata = previousState.metadata();
final Metadata currentMetadata = state.metadata();
for (ObjectCursor<IndexMetadata> cursor : previousMetadata.indices().values()) {
IndexMetadata index = cursor.value;
IndexMetadata current = state.metadata().index(index.getIndex());
IndexMetadata current = currentMetadata.index(index.getIndex());
if (current == null) {
if (deleted == null) {
deleted = new ArrayList<>();
deleted = new HashSet<>();
}
deleted.add(index.getIndex());
}
}
return deleted == null ? Collections.<Index>emptyList() : deleted;
final IndexGraveyard currentGraveyard = currentMetadata.indexGraveyard();
final IndexGraveyard previousGraveyard = previousMetadata.indexGraveyard();
// Look for new entries in the index graveyard, where there's no corresponding index in the
// previous metadata. This indicates that a dangling index has been explicitly deleted, so
// each node should make sure to delete any related data.
if (currentGraveyard != previousGraveyard) {
final IndexGraveyardDiff indexGraveyardDiff = (IndexGraveyardDiff) currentGraveyard.diff(previousGraveyard);
final List<IndexGraveyard.Tombstone> added = indexGraveyardDiff.getAdded();
if (added.isEmpty() == false) {
if (deleted == null) {
deleted = new HashSet<>();
}
for (IndexGraveyard.Tombstone tombstone : added) {
deleted.add(tombstone.getIndex());
}
}
}
return deleted == null ? Collections.<Index>emptyList() : new ArrayList<>(deleted);
}
private List<Index> indicesDeletedFromTombstones() {

View File

@ -682,6 +682,11 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
return indices.containsKey(index);
}
public boolean hasIndex(Index index) {
IndexMetadata metadata = index(index.getName());
return metadata != null && metadata.getIndexUUID().equals(index.getUUID());
}
public boolean hasConcreteIndex(String index) {
return getIndicesLookup().containsKey(index);
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.common.util;
import com.carrotsearch.hppc.ObjectArrayList;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefArray;
import org.apache.lucene.util.BytesRefBuilder;

View File

@ -37,13 +37,11 @@ import org.elasticsearch.index.Index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
@ -57,6 +55,13 @@ public class DanglingIndicesState implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(DanglingIndicesState.class);
/**
* Controls whether dangling indices should be automatically detected and imported into the cluster
* state upon discovery. This setting is deprecated - use the <code>_dangling</code> API instead.
* If disabled, dangling indices will not be automatically detected.
*
* @see org.elasticsearch.action.admin.indices.dangling
*/
public static final Setting<Boolean> AUTO_IMPORT_DANGLING_INDICES_SETTING = Setting.boolSetting(
"gateway.auto_import_dangling_indices",
true,
@ -66,24 +71,29 @@ public class DanglingIndicesState implements ClusterStateListener {
private final NodeEnvironment nodeEnv;
private final MetaStateService metaStateService;
private final LocalAllocateDangledIndices allocateDangledIndices;
private final LocalAllocateDangledIndices danglingIndicesAllocator;
private final boolean isAutoImportDanglingIndicesEnabled;
private final ClusterService clusterService;
private final Map<Index, IndexMetadata> danglingIndices = ConcurrentCollections.newConcurrentMap();
@Inject
public DanglingIndicesState(NodeEnvironment nodeEnv, MetaStateService metaStateService,
LocalAllocateDangledIndices allocateDangledIndices, ClusterService clusterService) {
LocalAllocateDangledIndices danglingIndicesAllocator, ClusterService clusterService) {
this.nodeEnv = nodeEnv;
this.metaStateService = metaStateService;
this.allocateDangledIndices = allocateDangledIndices;
this.danglingIndicesAllocator = danglingIndicesAllocator;
this.clusterService = clusterService;
this.isAutoImportDanglingIndicesEnabled = AUTO_IMPORT_DANGLING_INDICES_SETTING.get(clusterService.getSettings());
if (this.isAutoImportDanglingIndicesEnabled) {
clusterService.addListener(this);
} else {
logger.warn(AUTO_IMPORT_DANGLING_INDICES_SETTING.getKey() + " is disabled, dangling indices will not be detected or imported");
logger.warn(
AUTO_IMPORT_DANGLING_INDICES_SETTING.getKey()
+ " is disabled, dangling indices will not be automatically detected or imported and must be managed manually"
);
}
}
@ -96,20 +106,28 @@ public class DanglingIndicesState implements ClusterStateListener {
* new dangling indices, and allocating outstanding ones.
*/
public void processDanglingIndices(final Metadata metadata) {
assert this.isAutoImportDanglingIndicesEnabled;
if (nodeEnv.hasNodeFile() == false) {
return;
}
cleanupAllocatedDangledIndices(metadata);
findNewAndAddDanglingIndices(metadata);
allocateDanglingIndices();
allocateDanglingIndices(metadata);
}
/**
* The current set of dangling indices.
* Either return the current set of dangling indices, if auto-import is enabled, otherwise
* scan for dangling indices right away.
* @return a map of currently-known dangling indices
*/
Map<Index, IndexMetadata> getDanglingIndices() {
public Map<Index, IndexMetadata> getDanglingIndices() {
if (this.isAutoImportDanglingIndicesEnabled) {
// This might be a good use case for CopyOnWriteHashMap
return unmodifiableMap(new HashMap<>(danglingIndices));
} else {
return findNewDanglingIndices(emptyMap(), this.clusterService.state().metadata());
}
}
/**
@ -135,7 +153,13 @@ public class DanglingIndicesState implements ClusterStateListener {
* to the currently tracked dangling indices.
*/
void findNewAndAddDanglingIndices(final Metadata metadata) {
danglingIndices.putAll(findNewDanglingIndices(metadata));
final IndexGraveyard graveyard = metadata.indexGraveyard();
// If a tombstone is created for a dangling index, we need to make sure that the
// index is no longer considered dangling.
danglingIndices.keySet().removeIf(graveyard::containsIndex);
danglingIndices.putAll(findNewDanglingIndices(danglingIndices, metadata));
}
/**
@ -143,30 +167,26 @@ public class DanglingIndicesState implements ClusterStateListener {
* that have state on disk, but are not part of the provided metadata, or not detected
* as dangled already.
*/
Map<Index, IndexMetadata> findNewDanglingIndices(final Metadata metadata) {
public Map<Index, IndexMetadata> findNewDanglingIndices(Map<Index, IndexMetadata> existingDanglingIndices, final Metadata metadata) {
final Set<String> excludeIndexPathIds = new HashSet<>(metadata.indices().size() + danglingIndices.size());
for (ObjectCursor<IndexMetadata> cursor : metadata.indices().values()) {
excludeIndexPathIds.add(cursor.value.getIndex().getUUID());
}
excludeIndexPathIds.addAll(danglingIndices.keySet().stream().map(Index::getUUID).collect(Collectors.toList()));
for (Index index : existingDanglingIndices.keySet()) {
excludeIndexPathIds.add(index.getUUID());
}
try {
final List<IndexMetadata> indexMetadataList = metaStateService.loadIndicesStates(excludeIndexPathIds::contains);
Map<Index, IndexMetadata> newIndices = new HashMap<>(indexMetadataList.size());
final IndexGraveyard graveyard = metadata.indexGraveyard();
for (IndexMetadata indexMetadata : indexMetadataList) {
if (metadata.hasIndex(indexMetadata.getIndex().getName())) {
logger.warn("[{}] can not be imported as a dangling index, as index with same name already exists in cluster metadata",
indexMetadata.getIndex());
} else if (graveyard.containsIndex(indexMetadata.getIndex())) {
logger.warn("[{}] can not be imported as a dangling index, as an index with the same name and UUID exist in the " +
"index tombstones. This situation is likely caused by copying over the data directory for an index " +
"that was previously deleted.", indexMetadata.getIndex());
} else {
logger.info("[{}] dangling index exists on local file system, but not in cluster metadata, " +
"auto import to cluster state", indexMetadata.getIndex());
newIndices.put(indexMetadata.getIndex(), stripAliases(indexMetadata));
Index index = indexMetadata.getIndex();
if (graveyard.containsIndex(index) == false) {
newIndices.put(index, stripAliases(indexMetadata));
}
}
return newIndices;
} catch (IOException e) {
logger.warn("failed to list dangling indices", e);
@ -175,6 +195,33 @@ public class DanglingIndicesState implements ClusterStateListener {
}
/**
* Filters out dangling indices that cannot be automatically imported into the cluster state.
* @param metadata the current cluster metadata
* @param allIndices all currently known dangling indices
* @return a filtered list of dangling index metadata
*/
List<IndexMetadata> filterDanglingIndices(Metadata metadata, Map<Index, IndexMetadata> allIndices) {
List<IndexMetadata> filteredIndices = new ArrayList<>(allIndices.size());
allIndices.forEach((index, indexMetadata) -> {
if (metadata.hasIndex(indexMetadata.getIndex().getName())) {
logger.warn("[{}] can not be imported as a dangling index, as index with same name already exists in cluster metadata",
indexMetadata.getIndex());
} else {
logger.info(
"[{}] dangling index exists on local file system, but not in cluster metadata, auto import to cluster state",
indexMetadata.getIndex()
);
filteredIndices.add(stripAliases(indexMetadata));
}
});
return filteredIndices;
}
/**
* Removes all aliases from the supplied index metadata.
*
* Dangling importing indices with aliases is dangerous, it could for instance result in inability to write to an existing alias if it
* previously had only one index with any is_write_index indication.
*/
@ -189,15 +236,25 @@ public class DanglingIndicesState implements ClusterStateListener {
}
/**
* Allocates the provided list of the dangled indices by sending them to the master node
* for allocation.
* Allocates the detected list of dangling indices by sending them to the master node
* for allocation, provided auto-import is enabled via the
* {@link #AUTO_IMPORT_DANGLING_INDICES_SETTING} setting.
* @param metadata the current cluster metadata, used to filter out dangling indices that cannot be allocated
* for some reason.
*/
void allocateDanglingIndices() {
void allocateDanglingIndices(Metadata metadata) {
if (danglingIndices.isEmpty()) {
return;
}
final List<IndexMetadata> filteredIndices = filterDanglingIndices(metadata, danglingIndices);
if (filteredIndices.isEmpty()) {
return;
}
try {
allocateDangledIndices.allocateDangled(Collections.unmodifiableCollection(new ArrayList<>(danglingIndices.values())),
danglingIndicesAllocator.allocateDangled(filteredIndices,
new ActionListener<LocalAllocateDangledIndices.AllocateDangledResponse>() {
@Override
public void onResponse(LocalAllocateDangledIndices.AllocateDangledResponse response) {

View File

@ -300,7 +300,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
if (indexService != null) {
indexSettings = indexService.getIndexSettings();
indicesService.removeIndex(index, DELETED, "index no longer part of the metadata");
} else if (previousState.metadata().hasIndex(index.getName())) {
} else if (previousState.metadata().hasIndex(index)) {
// The deleted index was part of the previous cluster state, but not loaded on the local node
final IndexMetadata metadata = previousState.metadata().index(index);
indexSettings = new IndexSettings(metadata, settings);
@ -311,8 +311,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
// node was not part of the cluster. In this case, try reading the index
// metadata from disk. If its not there, there is nothing to delete.
// First, though, verify the precondition for applying this case by
// asserting that the previous cluster state is not initialized/recovered.
assert previousState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
// asserting that either this index is already in the graveyard, or the
// previous cluster state is not initialized/recovered.
assert state.metadata().indexGraveyard().containsIndex(index)
|| previousState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
final IndexMetadata metadata = indicesService.verifyIndexIsDeleted(index, event.state());
if (metadata != null) {
indexSettings = new IndexSettings(metadata, settings);

View File

@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.cluster.dangling;
import org.elasticsearch.action.admin.indices.dangling.delete.DeleteDanglingIndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestToXContentListener;
import java.io.IOException;
import java.util.List;
import static java.util.Collections.singletonList;
import static org.elasticsearch.rest.RestRequest.Method.DELETE;
import static org.elasticsearch.rest.RestStatus.ACCEPTED;
public class RestDeleteDanglingIndexAction extends BaseRestHandler {
@Override
public List<Route> routes() {
return singletonList(new Route(DELETE, "/_dangling/{index_uuid}"));
}
@Override
public String getName() {
return "delete_dangling_index";
}
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, NodeClient client) throws IOException {
final DeleteDanglingIndexRequest deleteRequest = new DeleteDanglingIndexRequest(
request.param("index_uuid"),
request.paramAsBoolean("accept_data_loss", false)
);
deleteRequest.timeout(request.paramAsTime("timeout", deleteRequest.timeout()));
deleteRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteRequest.masterNodeTimeout()));
return channel -> client.admin()
.cluster()
.deleteDanglingIndex(deleteRequest, new RestToXContentListener<AcknowledgedResponse>(channel) {
@Override
protected RestStatus getStatus(AcknowledgedResponse acknowledgedResponse) {
return ACCEPTED;
}
});
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.cluster.dangling;
import static java.util.Collections.singletonList;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestStatus.ACCEPTED;
import java.io.IOException;
import java.util.List;
import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDanglingIndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestToXContentListener;
public class RestImportDanglingIndexAction extends BaseRestHandler {
@Override
public List<Route> routes() {
return singletonList(new Route(POST, "/_dangling/{index_uuid}"));
}
@Override
public String getName() {
return "import_dangling_index";
}
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, NodeClient client) throws IOException {
final ImportDanglingIndexRequest importRequest = new ImportDanglingIndexRequest(
request.param("index_uuid"),
request.paramAsBoolean("accept_data_loss", false)
);
importRequest.timeout(request.paramAsTime("timeout", importRequest.timeout()));
importRequest.masterNodeTimeout(request.paramAsTime("master_timeout", importRequest.masterNodeTimeout()));
return channel -> client.admin()
.cluster()
.importDanglingIndex(importRequest, new RestToXContentListener<AcknowledgedResponse>(channel) {
@Override
protected RestStatus getStatus(AcknowledgedResponse acknowledgedResponse) {
return ACCEPTED;
}
});
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.cluster.dangling;
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestActions;
import java.io.IOException;
import java.util.List;
import static java.util.Collections.singletonList;
import static org.elasticsearch.rest.RestRequest.Method.GET;
public class RestListDanglingIndicesAction extends BaseRestHandler {
@Override
public List<Route> routes() {
return singletonList(new Route(GET, "/_dangling"));
}
@Override
public String getName() {
return "list_dangling_indices";
}
@Override
public BaseRestHandler.RestChannelConsumer prepareRequest(final RestRequest request, NodeClient client) throws IOException {
final ListDanglingIndicesRequest danglingIndicesRequest = new ListDanglingIndicesRequest();
return channel -> client.admin()
.cluster()
.listDanglingIndices(danglingIndicesRequest, new RestActions.NodesResponseRestListener<>(channel));
}
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* {@link org.elasticsearch.rest.RestHandler}s for managing dangling indices.
*
* @see org.elasticsearch.action.admin.indices.dangling
*/
package org.elasticsearch.rest.action.admin.cluster.dangling;

View File

@ -0,0 +1,157 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.dangling.list;
import org.elasticsearch.action.admin.indices.dangling.DanglingIndexInfo;
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesResponse.AggregatedDanglingIndexInfo;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesResponse.resultsByIndexUUID;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ListDanglingIndicesResponseTests extends ESTestCase {
public static final String UUID_1 = UUID.randomUUID().toString();
public static final String UUID_2 = UUID.randomUUID().toString();
/**
* Checks that {@link ListDanglingIndicesResponse#resultsByIndexUUID(List)} handles the
* basic base of empty input.
*/
public void testResultsByIndexUUIDWithEmptyListReturnsEmptyMap() {
assertThat(resultsByIndexUUID(emptyList()), empty());
}
/**
* Checks that <code>resultsByIndexUUID(List)</code> can aggregate a single dangling index
* on a single node.
*/
public void testResultsByIndexUUIDCanAggregateASingleResponse() {
final DiscoveryNode node = mock(DiscoveryNode.class);
when(node.getId()).thenReturn("some-node-id");
final List<DanglingIndexInfo> danglingIndexInfo = singletonList(
new DanglingIndexInfo("some-node-id", "some-index", UUID_1, 123456L)
);
final List<NodeListDanglingIndicesResponse> nodes = singletonList(new NodeListDanglingIndicesResponse(node, danglingIndexInfo));
final List<AggregatedDanglingIndexInfo> aggregated = new ArrayList<>(resultsByIndexUUID(nodes));
assertThat(aggregated, hasSize(1));
final AggregatedDanglingIndexInfo expected = new AggregatedDanglingIndexInfo(UUID_1, "some-index", 123456L);
expected.getNodeIds().add("some-node-id");
assertThat(aggregated.get(0), equalTo(expected));
}
/**
* Checks that <code>resultsByIndexUUID(List)</code> can aggregate a single dangling index
* across multiple nodes.
*/
public void testResultsByIndexUUIDCanAggregateAcrossMultipleNodes() {
final DiscoveryNode node1 = mock(DiscoveryNode.class);
final DiscoveryNode node2 = mock(DiscoveryNode.class);
when(node1.getId()).thenReturn("node-id-1");
when(node2.getId()).thenReturn("node-id-2");
final List<DanglingIndexInfo> danglingIndexInfo1 = singletonList(new DanglingIndexInfo("node-id-1", "some-index", UUID_1, 123456L));
final List<DanglingIndexInfo> danglingIndexInfo2 = singletonList(new DanglingIndexInfo("node-id-2", "some-index", UUID_1, 123456L));
final List<NodeListDanglingIndicesResponse> nodes = asList(
new NodeListDanglingIndicesResponse(node1, danglingIndexInfo1),
new NodeListDanglingIndicesResponse(node2, danglingIndexInfo2)
);
final List<AggregatedDanglingIndexInfo> aggregated = new ArrayList<>(resultsByIndexUUID(nodes));
assertThat(aggregated, hasSize(1));
final AggregatedDanglingIndexInfo expected = new AggregatedDanglingIndexInfo(UUID_1, "some-index", 123456L);
expected.getNodeIds().add("node-id-1");
expected.getNodeIds().add("node-id-2");
assertThat(aggregated.get(0), equalTo(expected));
}
/**
* Checks that <code>resultsByIndexUUID(List)</code> can aggregate multiple dangling indices
* on a single node.
*/
public void testResultsByIndexUUIDCanAggregateMultipleIndicesOnOneNode() {
final DiscoveryNode node1 = mock(DiscoveryNode.class);
when(node1.getId()).thenReturn("node-id-1");
final List<DanglingIndexInfo> danglingIndexInfo = asList(
new DanglingIndexInfo("node-id-1", "some-index", UUID_1, 123456L),
new DanglingIndexInfo("node-id-1", "some-other-index", UUID_2, 7891011L)
);
final List<NodeListDanglingIndicesResponse> nodes = singletonList(new NodeListDanglingIndicesResponse(node1, danglingIndexInfo));
final List<AggregatedDanglingIndexInfo> aggregated = new ArrayList<>(resultsByIndexUUID(nodes));
assertThat(aggregated, hasSize(2));
AggregatedDanglingIndexInfo info1 = new AggregatedDanglingIndexInfo(UUID_1, "some-index", 123456L);
AggregatedDanglingIndexInfo info2 = new AggregatedDanglingIndexInfo(UUID_2, "some-other-index", 7891011L);
info1.getNodeIds().add("node-id-1");
info2.getNodeIds().add("node-id-1");
assertThat(aggregated, containsInAnyOrder(info1, info2));
}
/**
* Checks that <code>resultsByIndexUUID(List)</code> can aggregate multiple dangling indices
* across multiple nodes.
*/
public void testResultsByIndexUUIDCanAggregateMultipleIndicesAcrossMultipleNodes() {
final DiscoveryNode node1 = mock(DiscoveryNode.class);
final DiscoveryNode node2 = mock(DiscoveryNode.class);
when(node1.getId()).thenReturn("node-id-1");
when(node2.getId()).thenReturn("node-id-2");
final List<DanglingIndexInfo> danglingIndexInfo1 = singletonList(new DanglingIndexInfo("node-id-1", "some-index", UUID_1, 123456L));
final List<DanglingIndexInfo> danglingIndexInfo2 = singletonList(
new DanglingIndexInfo("node-id-2", "some-other-index", UUID_2, 7891011L)
);
final List<NodeListDanglingIndicesResponse> nodes = asList(
new NodeListDanglingIndicesResponse(node1, danglingIndexInfo1),
new NodeListDanglingIndicesResponse(node2, danglingIndexInfo2)
);
final List<AggregatedDanglingIndexInfo> aggregated = new ArrayList<>(resultsByIndexUUID(nodes));
assertThat(aggregated, hasSize(2));
AggregatedDanglingIndexInfo info1 = new AggregatedDanglingIndexInfo(UUID_1, "some-index", 123456L);
AggregatedDanglingIndexInfo info2 = new AggregatedDanglingIndexInfo(UUID_2, "some-other-index", 7891011L);
info1.getNodeIds().add("node-id-1");
info2.getNodeIds().add("node-id-2");
assertThat(aggregated, containsInAnyOrder(info1, info2));
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.IndexGraveyard;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
@ -33,12 +34,20 @@ import org.hamcrest.Matchers;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.List;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.gateway.DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -78,14 +87,36 @@ public class DanglingIndicesStateTests extends ESTestCase {
final Settings.Builder settings = Settings.builder().put(indexSettings).put(IndexMetadata.SETTING_INDEX_UUID, "test1UUID");
IndexMetadata dangledIndex = IndexMetadata.builder("test1").settings(settings).build();
metaStateService.writeIndex("test_write", dangledIndex);
Map<Index, IndexMetadata> newDanglingIndices = danglingState.findNewDanglingIndices(metadata);
Map<Index, IndexMetadata> newDanglingIndices = danglingState.findNewDanglingIndices(emptyMap(), metadata);
assertTrue(newDanglingIndices.containsKey(dangledIndex.getIndex()));
metadata = Metadata.builder().put(dangledIndex, false).build();
newDanglingIndices = danglingState.findNewDanglingIndices(metadata);
newDanglingIndices = danglingState.findNewDanglingIndices(emptyMap(), metadata);
assertFalse(newDanglingIndices.containsKey(dangledIndex.getIndex()));
}
}
/**
* Check that a dangling index is not reported as newly discovered when we
* already known about it.
*/
public void testDanglingIndicesNotDiscoveredWhenAlreadyKnown() throws Exception {
try (NodeEnvironment env = newNodeEnvironment()) {
MetaStateService metaStateService = new MetaStateService(env, xContentRegistry());
DanglingIndicesState danglingState = createDanglingIndicesState(env, metaStateService);
Metadata metadata = Metadata.builder().build();
final Settings.Builder settings = Settings.builder().put(indexSettings).put(IndexMetadata.SETTING_INDEX_UUID, "test1UUID");
IndexMetadata dangledIndex = IndexMetadata.builder("test1").settings(settings).build();
metaStateService.writeIndex("test_write", dangledIndex);
Map<Index, IndexMetadata> newDanglingIndices = danglingState.findNewDanglingIndices(
singletonMap(dangledIndex.getIndex(), dangledIndex),
metadata
);
assertThat(newDanglingIndices, is(anEmptyMap()));
}
}
public void testInvalidIndexFolder() throws Exception {
try (NodeEnvironment env = newNodeEnvironment()) {
MetaStateService metaStateService = new MetaStateService(env, xContentRegistry());
@ -102,7 +133,7 @@ public class DanglingIndicesStateTests extends ESTestCase {
}
}
try {
danglingState.findNewDanglingIndices(metadata);
danglingState.findNewDanglingIndices(emptyMap(), metadata);
fail("no exception thrown for invalid folder name");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), equalTo("[invalidUUID] invalid index folder name, rename to [test1UUID]"));
@ -124,7 +155,7 @@ public class DanglingIndicesStateTests extends ESTestCase {
// check that several runs when not in the metadata still keep the dangled index around
int numberOfChecks = randomIntBetween(1, 10);
for (int i = 0; i < numberOfChecks; i++) {
Map<Index, IndexMetadata> newDanglingIndices = danglingState.findNewDanglingIndices(metadata);
Map<Index, IndexMetadata> newDanglingIndices = danglingState.findNewDanglingIndices(emptyMap(), metadata);
assertThat(newDanglingIndices.size(), equalTo(1));
assertThat(newDanglingIndices.keySet(), Matchers.hasItems(dangledIndex.getIndex()));
assertTrue(danglingState.getDanglingIndices().isEmpty());
@ -142,7 +173,7 @@ public class DanglingIndicesStateTests extends ESTestCase {
// check that several runs when in the metadata, but not cleaned yet, still keeps dangled
for (int i = 0; i < numberOfChecks; i++) {
Map<Index, IndexMetadata> newDanglingIndices = danglingState.findNewDanglingIndices(metadata);
Map<Index, IndexMetadata> newDanglingIndices = danglingState.findNewDanglingIndices(emptyMap(), metadata);
assertTrue(newDanglingIndices.isEmpty());
assertThat(danglingState.getDanglingIndices().size(), equalTo(1));
@ -165,7 +196,42 @@ public class DanglingIndicesStateTests extends ESTestCase {
final IndexGraveyard graveyard = IndexGraveyard.builder().addTombstone(dangledIndex.getIndex()).build();
final Metadata metadata = Metadata.builder().indexGraveyard(graveyard).build();
assertThat(danglingState.findNewDanglingIndices(metadata).size(), equalTo(0));
final Map<Index, IndexMetadata> newDanglingIndices = danglingState.findNewDanglingIndices(emptyMap(), metadata);
assertThat(newDanglingIndices, is(emptyMap()));
}
}
public void testDanglingIndicesNotImportedWhenIndexNameIsAlreadyUsed() throws Exception {
try (NodeEnvironment env = newNodeEnvironment()) {
MetaStateService metaStateService = new MetaStateService(env, xContentRegistry());
DanglingIndicesState danglingState = createDanglingIndicesState(env, metaStateService);
final Settings.Builder danglingSettings = Settings.builder()
.put(indexSettings)
.put(IndexMetadata.SETTING_INDEX_UUID, "test1UUID");
IndexMetadata dangledIndex = IndexMetadata.builder("test_index").settings(danglingSettings).build();
metaStateService.writeIndex("test_write", dangledIndex);
// Build another index with the same name but a different UUID
final Settings.Builder existingSettings = Settings.builder()
.put(indexSettings)
.put(IndexMetadata.SETTING_INDEX_UUID, "test2UUID");
IndexMetadata existingIndex = IndexMetadata.builder("test_index").settings(existingSettings).build();
metaStateService.writeIndex("test_write", existingIndex);
final ImmutableOpenMap<String, IndexMetadata> indices = ImmutableOpenMap.<String, IndexMetadata>builder()
.fPut(dangledIndex.getIndex().getName(), existingIndex)
.build();
final Metadata metadata = Metadata.builder().indices(indices).build();
// All dangling indices should be found...
final Map<Index, IndexMetadata> newDanglingIndices = danglingState.findNewDanglingIndices(emptyMap(), metadata);
assertThat(newDanglingIndices, is(aMapWithSize(1)));
// ...but the filter method should remove those where another index exists with the same name
final List<IndexMetadata> filteredIndices = danglingState.filterDanglingIndices(metadata, newDanglingIndices);
assertThat(filteredIndices, is(empty()));
}
}
@ -183,7 +249,7 @@ public class DanglingIndicesStateTests extends ESTestCase {
assertThat(dangledIndex.getAliases().size(), equalTo(1));
final Metadata metadata = Metadata.builder().build();
Map<Index, IndexMetadata> newDanglingIndices = danglingState.findNewDanglingIndices(metadata);
Map<Index, IndexMetadata> newDanglingIndices = danglingState.findNewDanglingIndices(emptyMap(), metadata);
assertThat(newDanglingIndices.size(), equalTo(1));
Map.Entry<Index, IndexMetadata> entry = newDanglingIndices.entrySet().iterator().next();
assertThat(entry.getKey().getName(), equalTo("test1"));
@ -191,7 +257,10 @@ public class DanglingIndicesStateTests extends ESTestCase {
}
}
public void testDanglingIndicesAreNotAllocatedWhenDisabled() throws Exception {
/**
* Check that when auto-imports are disabled, then no change listener is registered with the cluster state.
*/
public void testClusterStateListenerNotRegisterWhenSettingDisabled() throws Exception {
try (NodeEnvironment env = newNodeEnvironment()) {
MetaStateService metaStateService = new MetaStateService(env, xContentRegistry());
LocalAllocateDangledIndices localAllocateDangledIndices = mock(LocalAllocateDangledIndices.class);
@ -201,17 +270,20 @@ public class DanglingIndicesStateTests extends ESTestCase {
final ClusterService clusterServiceMock = mock(ClusterService.class);
when(clusterServiceMock.getSettings()).thenReturn(allocateSettings);
final DanglingIndicesState danglingIndicesState = new DanglingIndicesState(
new DanglingIndicesState(
env,
metaStateService,
localAllocateDangledIndices,
clusterServiceMock
);
assertFalse("Expected dangling imports to be disabled", danglingIndicesState.isAutoImportDanglingIndicesEnabled());
verify(clusterServiceMock, never()).addListener(any());
}
}
/**
* Check that when auto-imports are enabled, then dangling indices are automatically imported.
*/
public void testDanglingIndicesAreAllocatedWhenEnabled() throws Exception {
try (NodeEnvironment env = newNodeEnvironment()) {
MetaStateService metaStateService = new MetaStateService(env, xContentRegistry());
@ -233,9 +305,10 @@ public class DanglingIndicesStateTests extends ESTestCase {
IndexMetadata dangledIndex = IndexMetadata.builder("test1").settings(settings).build();
metaStateService.writeIndex("test_write", dangledIndex);
danglingIndicesState.findNewAndAddDanglingIndices(Metadata.builder().build());
final Metadata metadata = Metadata.builder().build();
danglingIndicesState.findNewAndAddDanglingIndices(metadata);
danglingIndicesState.allocateDanglingIndices();
danglingIndicesState.allocateDanglingIndices(metadata);
verify(localAllocateDangledIndices).allocateDangled(any(), any());
}

View File

@ -29,9 +29,11 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -302,4 +304,34 @@ public final class XContentTestUtils {
}
return object.toXContentBuilder(xContent);
}
public static JsonMapView createJsonMapView(InputStream inputStream) {
final Map<String, Object> responseMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, inputStream, true);
return new JsonMapView(responseMap);
}
public static class JsonMapView {
private final Map<String, Object> map;
public JsonMapView(Map<String, Object> map) {
this.map = map;
}
@SuppressWarnings("unchecked")
public <T> T get(String path) {
String[] keys = path.split("\\.");
Object context = map;
for (String key : keys) {
if (context instanceof Map) {
context = ((Map<String, Object>) context).get(key);
} else if (context instanceof List) {
context = ((List<Object>) context).get(Integer.parseInt(key));
} else {
throw new IllegalStateException("neither list nor map");
}
}
return (T) context;
}
}
}