Improve Close Index Response (#39687)

This changes the `CloseIndexResponse` so that it reports closing result
for each index. Shard failures or exception are also reported per index,
and the global acknowledgment flag is computed from the index results
only.

The response looks like:
```
{
  "acknowledged" : true,
  "shards_acknowledged" : true,
  "indices" : {
    "docs" : {
      "closed" : true
    }
  }
}
```

The response reports shard failures like:
```
{
  "acknowledged" : false,
  "shards_acknowledged" : false,
  "indices" : {
    "docs-1" : {
      "closed" : true
    },
    "docs-2" : {
      "closed" : false,
      "shards" : {
        "1" : {
          "failures" : [
            {
              "shard" : 1,
              "index" : "docs-2",
              "status" : "BAD_REQUEST",
              "reason" : {
                "type" : "index_closed_exception",
                "reason" : "closed",
                "index_uuid" : "JFmQwr_aSPiZbkAH_KEF7A",
                "index" : "docs-2"
              }
            }
          ]
        }
      }
    },
    "docs-3" : {
      "closed" : true
    }
  }
}
```

Co-authored-by: Tanguy Leroux <tlrx.dev@gmail.com>
This commit is contained in:
Tanguy Leroux 2019-05-23 20:18:34 +02:00 committed by Nhat Nguyen
parent 63eccb16de
commit 6bec876682
10 changed files with 575 additions and 72 deletions

View File

@ -82,3 +82,40 @@
- is_true: acknowledged
- match: { acknowledged: true }
- match: { shards_acknowledged: true }
---
"Close index response with result per index":
- skip:
version: " - 7.99.99"
reason: "close index response reports result per index starting version 8.0.0"
- do:
indices.create:
index: index_1
body:
settings:
number_of_replicas: 0
- do:
indices.create:
index: index_2
body:
settings:
number_of_replicas: 0
- do:
indices.create:
index: index_3
body:
settings:
number_of_replicas: 0
- do:
indices.close:
index: "index_*"
- match: { acknowledged: true }
- match: { shards_acknowledged: true }
- match: { indices.index_1.closed: true }
- match: { indices.index_2.closed: true }
- match: { indices.index_3.closed: true }

View File

@ -18,20 +18,40 @@
*/
package org.elasticsearch.action.admin.indices.close;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.master.ShardsAcknowledgedResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import static java.util.Collections.emptyList;
import static java.util.Collections.unmodifiableList;
public class CloseIndexResponse extends ShardsAcknowledgedResponse {
private List<IndexResult> indices;
CloseIndexResponse() {
}
public CloseIndexResponse(final boolean acknowledged, final boolean shardsAcknowledged) {
public CloseIndexResponse(final boolean acknowledged, final boolean shardsAcknowledged, final List<IndexResult> indices) {
super(acknowledged, shardsAcknowledged);
this.indices = unmodifiableList(Objects.requireNonNull(indices));
}
public List<IndexResult> getIndices() {
return indices;
}
@Override
@ -40,6 +60,11 @@ public class CloseIndexResponse extends ShardsAcknowledgedResponse {
if (in.getVersion().onOrAfter(Version.V_7_2_0)) {
readShardsAcknowledged(in);
}
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
indices = unmodifiableList(in.readList(IndexResult::new));
} else {
indices = unmodifiableList(emptyList());
}
}
@Override
@ -48,5 +73,225 @@ public class CloseIndexResponse extends ShardsAcknowledgedResponse {
if (out.getVersion().onOrAfter(Version.V_7_2_0)) {
writeShardsAcknowledged(out);
}
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
out.writeList(indices);
}
}
protected void addCustomFields(final XContentBuilder builder, final Params params) throws IOException {
super.addCustomFields(builder, params);
builder.startObject("indices");
for (IndexResult index : indices) {
index.toXContent(builder, params);
}
builder.endObject();
}
@Override
public String toString() {
return Strings.toString(this);
}
public static class IndexResult implements Writeable, ToXContentFragment {
private final Index index;
private final @Nullable Exception exception;
private final @Nullable ShardResult[] shards;
public IndexResult(final Index index) {
this(index, null, null);
}
public IndexResult(final Index index, final Exception failure) {
this(index, Objects.requireNonNull(failure), null);
}
public IndexResult(final Index index, final ShardResult[] shards) {
this(index, null, Objects.requireNonNull(shards));
}
private IndexResult(final Index index, @Nullable final Exception exception, @Nullable final ShardResult[] shards) {
this.index = Objects.requireNonNull(index);
this.exception = exception;
this.shards = shards;
}
IndexResult(final StreamInput in) throws IOException {
this.index = new Index(in);
this.exception = in.readException();
this.shards = in.readOptionalArray(ShardResult::new, ShardResult[]::new);
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
index.writeTo(out);
out.writeException(exception);
out.writeOptionalArray(shards);
}
public Index getIndex() {
return index;
}
public Exception getException() {
return exception;
}
public ShardResult[] getShards() {
return shards;
}
public boolean hasFailures() {
if (exception != null) {
return true;
}
if (shards != null) {
for (ShardResult shard : shards) {
if (shard.hasFailures()) {
return true;
}
}
}
return false;
}
@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject(index.getName());
{
if (hasFailures()) {
builder.field("closed", false);
if (exception != null) {
builder.startObject("exception");
ElasticsearchException.generateFailureXContent(builder, params, exception, true);
builder.endObject();
} else {
builder.startObject("failedShards");
for (ShardResult shard : shards) {
if (shard.hasFailures()) {
shard.toXContent(builder, params);
}
}
builder.endObject();
}
} else {
builder.field("closed", true);
}
}
return builder.endObject();
}
@Override
public String toString() {
return Strings.toString(this);
}
}
public static class ShardResult implements Writeable, ToXContentFragment {
private final int id;
private final ShardResult.Failure[] failures;
public ShardResult(final int id, final Failure[] failures) {
this.id = id;
this.failures = failures;
}
ShardResult(final StreamInput in) throws IOException {
this.id = in.readVInt();
this.failures = in.readOptionalArray(Failure::readFailure, ShardResult.Failure[]::new);
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeVInt(id);
out.writeOptionalArray(failures);
}
public boolean hasFailures() {
return failures != null && failures.length > 0;
}
public int getId() {
return id;
}
public Failure[] getFailures() {
return failures;
}
@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject(String.valueOf(id));
{
builder.startArray("failures");
if (failures != null) {
for (Failure failure : failures) {
builder.startObject();
failure.toXContent(builder, params);
builder.endObject();
}
}
builder.endArray();
}
return builder.endObject();
}
@Override
public String toString() {
return Strings.toString(this);
}
public static class Failure extends DefaultShardOperationFailedException implements Writeable {
private @Nullable String nodeId;
private Failure() {
}
public Failure(final String index, final int shardId, final Throwable reason) {
this(index, shardId, reason, null);
}
public Failure(final String index, final int shardId, final Throwable reason, final String nodeId) {
super(index, shardId, reason);
this.nodeId = nodeId;
}
public String getNodeId() {
return nodeId;
}
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
nodeId = in.readOptionalString();
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(nodeId);
}
@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
if (nodeId != null) {
builder.field("node", nodeId);
}
return super.toXContent(builder, params);
}
@Override
public String toString() {
return Strings.toString(this);
}
static Failure readFailure(final StreamInput in) throws IOException {
final Failure failure = new Failure();
failure.readFrom(in);
return failure;
}
}
}
}

View File

@ -40,6 +40,8 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Collections;
/**
* Close index action
*/
@ -109,7 +111,7 @@ public class TransportCloseIndexAction extends TransportMasterNodeAction<CloseIn
final ActionListener<CloseIndexResponse> listener) throws Exception {
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
if (concreteIndices == null || concreteIndices.length == 0) {
listener.onResponse(new CloseIndexResponse(true, false));
listener.onResponse(new CloseIndexResponse(true, false, Collections.emptyList()));
return;
}

View File

@ -29,10 +29,11 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NotifyOnceListener;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.IndexResult;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.ShardResult;
import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction;
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
import org.elasticsearch.action.support.ActiveShardsObserver;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
@ -52,6 +53,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
@ -72,6 +74,8 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@ -144,27 +148,22 @@ public class MetaDataIndexStateService {
public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) {
if (oldState == newState) {
assert blockedIndices.isEmpty() : "List of blocked indices is not empty but cluster state wasn't changed";
listener.onResponse(new CloseIndexResponse(true, false));
listener.onResponse(new CloseIndexResponse(true, false, Collections.emptyList()));
} else {
assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed";
threadPool.executor(ThreadPool.Names.MANAGEMENT)
.execute(new WaitForClosedBlocksApplied(blockedIndices, request,
ActionListener.wrap(results ->
ActionListener.wrap(verifyResults ->
clusterService.submitStateUpdateTask("close-indices", new ClusterStateUpdateTask(Priority.URGENT) {
boolean acknowledged = true;
private final List<IndexResult> indices = new ArrayList<>();
@Override
public ClusterState execute(final ClusterState currentState) throws Exception {
final ClusterState updatedState = closeRoutingTable(currentState, blockedIndices, results);
for (Map.Entry<Index, AcknowledgedResponse> result : results.entrySet()) {
IndexMetaData updatedMetaData = updatedState.metaData().index(result.getKey());
if (updatedMetaData != null && updatedMetaData.getState() != IndexMetaData.State.CLOSE) {
acknowledged = false;
break;
}
}
return allocationService.reroute(updatedState, "indices closed");
Tuple<ClusterState, Collection<IndexResult>> closingResult =
closeRoutingTable(currentState, blockedIndices, verifyResults);
assert verifyResults.size() == closingResult.v2().size();
indices.addAll(closingResult.v2());
return allocationService.reroute(closingResult.v1(), "indices closed");
}
@Override
@ -176,27 +175,28 @@ public class MetaDataIndexStateService {
public void clusterStateProcessed(final String source,
final ClusterState oldState, final ClusterState newState) {
final String[] indices = results.entrySet().stream()
.filter(result -> result.getValue().isAcknowledged())
.map(result -> result.getKey().getName())
.filter(index -> newState.routingTable().hasIndex(index))
final boolean acknowledged = indices.stream().noneMatch(IndexResult::hasFailures);
final String[] waitForIndices = indices.stream()
.filter(result -> result.hasFailures() == false)
.filter(result -> newState.routingTable().hasIndex(result.getIndex()))
.map(result -> result.getIndex().getName())
.toArray(String[]::new);
if (indices.length > 0) {
activeShardsObserver.waitForActiveShards(indices, request.waitForActiveShards(),
if (waitForIndices.length > 0) {
activeShardsObserver.waitForActiveShards(waitForIndices, request.waitForActiveShards(),
request.ackTimeout(), shardsAcknowledged -> {
if (shardsAcknowledged == false) {
logger.debug("[{}] indices closed, but the operation timed out while waiting " +
"for enough shards to be started.", Arrays.toString(indices));
"for enough shards to be started.", Arrays.toString(waitForIndices));
}
// acknowledged maybe be false but some indices may have been correctly closed, so
// we maintain a kind of coherency by overriding the shardsAcknowledged value
// (see ShardsAcknowledgedResponse constructor)
boolean shardsAcked = acknowledged ? shardsAcknowledged : false;
listener.onResponse(new CloseIndexResponse(acknowledged, shardsAcked));
listener.onResponse(new CloseIndexResponse(acknowledged, shardsAcked, indices));
}, listener::onFailure);
} else {
listener.onResponse(new CloseIndexResponse(acknowledged, false));
listener.onResponse(new CloseIndexResponse(acknowledged, false, indices));
}
}
}),
@ -292,11 +292,11 @@ public class MetaDataIndexStateService {
private final Map<Index, ClusterBlock> blockedIndices;
private final CloseIndexClusterStateUpdateRequest request;
private final ActionListener<Map<Index, AcknowledgedResponse>> listener;
private final ActionListener<Map<Index, IndexResult>> listener;
private WaitForClosedBlocksApplied(final Map<Index, ClusterBlock> blockedIndices,
final CloseIndexClusterStateUpdateRequest request,
final ActionListener<Map<Index, AcknowledgedResponse>> listener) {
final ActionListener<Map<Index, IndexResult>> listener) {
if (blockedIndices == null || blockedIndices.isEmpty()) {
throw new IllegalArgumentException("Cannot wait for closed blocks to be applied, list of blocked indices is empty or null");
}
@ -312,7 +312,7 @@ public class MetaDataIndexStateService {
@Override
protected void doRun() throws Exception {
final Map<Index, AcknowledgedResponse> results = ConcurrentCollections.newConcurrentMap();
final Map<Index, IndexResult> results = ConcurrentCollections.newConcurrentMap();
final CountDown countDown = new CountDown(blockedIndices.size());
final ClusterState state = clusterService.state();
blockedIndices.forEach((index, block) -> {
@ -325,47 +325,51 @@ public class MetaDataIndexStateService {
});
}
private void waitForShardsReadyForClosing(final Index index, final ClusterBlock closingBlock,
final ClusterState state, final Consumer<AcknowledgedResponse> onResponse) {
private void waitForShardsReadyForClosing(final Index index,
final ClusterBlock closingBlock,
final ClusterState state,
final Consumer<IndexResult> onResponse) {
final IndexMetaData indexMetaData = state.metaData().index(index);
if (indexMetaData == null) {
logger.debug("index {} has been blocked before closing and is now deleted, ignoring", index);
onResponse.accept(new AcknowledgedResponse(true));
onResponse.accept(new IndexResult(index));
return;
}
final IndexRoutingTable indexRoutingTable = state.routingTable().index(index);
if (indexRoutingTable == null || indexMetaData.getState() == IndexMetaData.State.CLOSE) {
assert state.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
logger.debug("index {} has been blocked before closing and is already closed, ignoring", index);
onResponse.accept(new AcknowledgedResponse(true));
onResponse.accept(new IndexResult(index));
return;
}
final ImmutableOpenIntMap<IndexShardRoutingTable> shards = indexRoutingTable.getShards();
final AtomicArray<AcknowledgedResponse> results = new AtomicArray<>(shards.size());
final AtomicArray<ShardResult> results = new AtomicArray<>(shards.size());
final CountDown countDown = new CountDown(shards.size());
for (IntObjectCursor<IndexShardRoutingTable> shard : shards) {
final IndexShardRoutingTable shardRoutingTable = shard.value;
final ShardId shardId = shardRoutingTable.shardId();
final int shardId = shardRoutingTable.shardId().id();
sendVerifyShardBeforeCloseRequest(shardRoutingTable, closingBlock, new NotifyOnceListener<ReplicationResponse>() {
@Override
public void innerOnResponse(final ReplicationResponse replicationResponse) {
ReplicationResponse.ShardInfo shardInfo = replicationResponse.getShardInfo();
results.setOnce(shardId.id(), new AcknowledgedResponse(shardInfo.getFailed() == 0));
ShardResult.Failure[] failures = Arrays.stream(replicationResponse.getShardInfo().getFailures())
.map(f -> new ShardResult.Failure(f.index(), f.shardId(), f.getCause(), f.nodeId()))
.toArray(ShardResult.Failure[]::new);
results.setOnce(shardId, new ShardResult(shardId, failures));
processIfFinished();
}
@Override
public void innerOnFailure(final Exception e) {
results.setOnce(shardId.id(), new AcknowledgedResponse(false));
ShardResult.Failure failure = new ShardResult.Failure(index.getName(), shardId, e);
results.setOnce(shardId, new ShardResult(shardId, new ShardResult.Failure[]{failure}));
processIfFinished();
}
private void processIfFinished() {
if (countDown.countDown()) {
final boolean acknowledged = results.asList().stream().allMatch(AcknowledgedResponse::isAcknowledged);
onResponse.accept(new AcknowledgedResponse(acknowledged));
onResponse.accept(new IndexResult(index, results.toArray(new ShardResult[results.length()])));
}
}
});
@ -396,9 +400,9 @@ public class MetaDataIndexStateService {
/**
* Step 3 - Move index states from OPEN to CLOSE in cluster state for indices that are ready for closing.
*/
static ClusterState closeRoutingTable(final ClusterState currentState,
final Map<Index, ClusterBlock> blockedIndices,
final Map<Index, AcknowledgedResponse> results) {
static Tuple<ClusterState, Collection<IndexResult>> closeRoutingTable(final ClusterState currentState,
final Map<Index, ClusterBlock> blockedIndices,
final Map<Index, IndexResult> verifyResult) {
// Remove the index routing table of closed indices if the cluster is in a mixed version
// that does not support the replication of closed indices
@ -409,9 +413,10 @@ public class MetaDataIndexStateService {
final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
final Set<String> closedIndices = new HashSet<>();
for (Map.Entry<Index, AcknowledgedResponse> result : results.entrySet()) {
Map<Index, IndexResult> closingResults = new HashMap<>(verifyResult);
for (Map.Entry<Index, IndexResult> result : verifyResult.entrySet()) {
final Index index = result.getKey();
final boolean acknowledged = result.getValue().isAcknowledged();
final boolean acknowledged = result.getValue().hasFailures() == false;
try {
if (acknowledged == false) {
logger.debug("verification of shards before closing {} failed", index);
@ -424,7 +429,11 @@ public class MetaDataIndexStateService {
continue;
}
final ClusterBlock closingBlock = blockedIndices.get(index);
assert closingBlock != null;
if (currentState.blocks().hasIndexBlock(index.getName(), closingBlock) == false) {
// we should report error in this case as the index can be left as open.
closingResults.put(result.getKey(), new IndexResult(result.getKey(), new IllegalStateException(
"verification of shards before closing " + index + " succeeded but block has been removed in the meantime")));
logger.debug("verification of shards before closing {} succeeded but block has been removed in the meantime", index);
continue;
}
@ -450,9 +459,9 @@ public class MetaDataIndexStateService {
logger.debug("index {} has been deleted since it was blocked before closing, ignoring", index);
}
}
logger.info("completed closing of indices {}", closedIndices);
return ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build();
return Tuple.tuple(ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build(),
closingResults.values());
}
public void openIndex(final OpenIndexClusterStateUpdateRequest request,

View File

@ -19,15 +19,30 @@
package org.elasticsearch.action.admin.indices.close;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.elasticsearch.test.VersionUtils.randomVersionBetween;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class CloseIndexResponseTests extends ESTestCase {
@ -53,6 +68,7 @@ public class CloseIndexResponseTests extends ESTestCase {
final AcknowledgedResponse deserializedResponse = new AcknowledgedResponse();
try (StreamInput in = out.bytes().streamInput()) {
in.setVersion(out.getVersion());
deserializedResponse.readFrom(in);
}
assertThat(deserializedResponse.isAcknowledged(), equalTo(response.isAcknowledged()));
@ -71,16 +87,130 @@ public class CloseIndexResponseTests extends ESTestCase {
assertThat(deserializedResponse.isAcknowledged(), equalTo(response.isAcknowledged()));
}
}
{
final CloseIndexResponse response = randomResponse();
try (BytesStreamOutput out = new BytesStreamOutput()) {
Version version = randomVersionBetween(random(), Version.V_7_2_0, Version.CURRENT);
out.setVersion(version);
response.writeTo(out);
final CloseIndexResponse deserializedResponse = new CloseIndexResponse();
try (StreamInput in = out.bytes().streamInput()) {
in.setVersion(version);
deserializedResponse.readFrom(in);
}
assertThat(deserializedResponse.isAcknowledged(), equalTo(response.isAcknowledged()));
assertThat(deserializedResponse.isShardsAcknowledged(), equalTo(response.isShardsAcknowledged()));
if (version.onOrAfter(Version.V_7_3_0)) {
assertThat(deserializedResponse.getIndices(), hasSize(response.getIndices().size()));
} else {
assertThat(deserializedResponse.getIndices(), empty());
}
}
}
}
private CloseIndexResponse randomResponse() {
final boolean acknowledged = randomBoolean();
boolean acknowledged = true;
final String[] indicesNames = generateRandomStringArray(10, 10, false, true);
final List<CloseIndexResponse.IndexResult> indexResults = new ArrayList<>();
for (String indexName : indicesNames) {
final Index index = new Index(indexName, "_na_");
if (randomBoolean()) {
indexResults.add(new CloseIndexResponse.IndexResult(index));
} else {
if (randomBoolean()) {
acknowledged = false;
indexResults.add(new CloseIndexResponse.IndexResult(index, randomException(index, 0)));
} else {
final int nbShards = randomIntBetween(1, 5);
CloseIndexResponse.ShardResult[] shards = new CloseIndexResponse.ShardResult[nbShards];
for (int i = 0; i < nbShards; i++) {
CloseIndexResponse.ShardResult.Failure[] failures = null;
if (randomBoolean()) {
acknowledged = false;
failures = new CloseIndexResponse.ShardResult.Failure[randomIntBetween(1, 3)];
for (int j = 0; j < failures.length; j++) {
String nodeId = randomAlphaOfLength(5);
failures[j] = new CloseIndexResponse.ShardResult.Failure(indexName, i, randomException(index, i), nodeId);
}
}
shards[i] = new CloseIndexResponse.ShardResult(i, failures);
}
indexResults.add(new CloseIndexResponse.IndexResult(index, shards));
}
}
}
final boolean shardsAcknowledged = acknowledged ? randomBoolean() : false;
return new CloseIndexResponse(acknowledged, shardsAcknowledged);
return new CloseIndexResponse(acknowledged, shardsAcknowledged, indexResults);
}
private static ElasticsearchException randomException(final Index index, final int id) {
return randomFrom(
new IndexNotFoundException(index),
new ActionNotFoundTransportException("test"),
new NoShardAvailableActionException(new ShardId(index, id)));
}
private static void assertCloseIndexResponse(final CloseIndexResponse actual, final CloseIndexResponse expected) {
assertThat(actual.isAcknowledged(), equalTo(expected.isAcknowledged()));
assertThat(actual.isShardsAcknowledged(), equalTo(expected.isShardsAcknowledged()));
for (int i = 0; i < expected.getIndices().size(); i++) {
CloseIndexResponse.IndexResult expectedIndexResult = expected.getIndices().get(i);
CloseIndexResponse.IndexResult actualIndexResult = actual.getIndices().get(i);
assertThat(actualIndexResult.getIndex(), equalTo(expectedIndexResult.getIndex()));
assertThat(actualIndexResult.hasFailures(), equalTo(expectedIndexResult.hasFailures()));
if (expectedIndexResult.hasFailures() == false) {
assertThat(actualIndexResult.getException(), nullValue());
if (actualIndexResult.getShards() != null) {
assertThat(Arrays.stream(actualIndexResult.getShards())
.allMatch(shardResult -> shardResult.hasFailures() == false), is(true));
}
}
if (expectedIndexResult.getException() != null) {
assertThat(actualIndexResult.getShards(), nullValue());
assertThat(actualIndexResult.getException(), notNullValue());
assertThat(actualIndexResult.getException().getMessage(), equalTo(expectedIndexResult.getException().getMessage()));
assertThat(actualIndexResult.getException().getClass(), equalTo(expectedIndexResult.getException().getClass()));
assertArrayEquals(actualIndexResult.getException().getStackTrace(), expectedIndexResult.getException().getStackTrace());
} else {
assertThat(actualIndexResult.getException(), nullValue());
}
if (expectedIndexResult.getShards() != null) {
assertThat(actualIndexResult.getShards().length, equalTo(expectedIndexResult.getShards().length));
for (int j = 0; j < expectedIndexResult.getShards().length; j++) {
CloseIndexResponse.ShardResult expectedShardResult = expectedIndexResult.getShards()[j];
CloseIndexResponse.ShardResult actualShardResult = actualIndexResult.getShards()[j];
assertThat(actualShardResult.getId(), equalTo(expectedShardResult.getId()));
assertThat(actualShardResult.hasFailures(), equalTo(expectedShardResult.hasFailures()));
if (expectedShardResult.hasFailures()) {
assertThat(actualShardResult.getFailures().length, equalTo(expectedShardResult.getFailures().length));
for (int k = 0; k < expectedShardResult.getFailures().length; k++) {
CloseIndexResponse.ShardResult.Failure expectedFailure = expectedShardResult.getFailures()[k];
CloseIndexResponse.ShardResult.Failure actualFailure = actualShardResult.getFailures()[k];
assertThat(actualFailure.getNodeId(), equalTo(expectedFailure.getNodeId()));
assertThat(actualFailure.index(), equalTo(expectedFailure.index()));
assertThat(actualFailure.shardId(), equalTo(expectedFailure.shardId()));
assertThat(actualFailure.getCause().getMessage(), equalTo(expectedFailure.getCause().getMessage()));
assertThat(actualFailure.getCause().getClass(), equalTo(expectedFailure.getCause().getClass()));
assertArrayEquals(actualFailure.getCause().getStackTrace(), expectedFailure.getCause().getStackTrace());
}
} else {
assertThat(actualShardResult.getFailures(), nullValue());
}
}
} else {
assertThat(actualIndexResult.getShards(), nullValue());
}
}
}
}

View File

@ -21,7 +21,8 @@ package org.elasticsearch.cluster.metadata;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.IndexResult;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.RestoreInProgress;
@ -51,6 +52,7 @@ import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -70,6 +72,7 @@ import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting
import static org.elasticsearch.cluster.shards.ClusterShardLimitIT.ShardCounts.forDataNodeCount;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ -81,7 +84,7 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
public void testCloseRoutingTable() {
final Set<Index> nonBlockedIndices = new HashSet<>();
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
final Map<Index, AcknowledgedResponse> results = new HashMap<>();
final Map<Index, IndexResult> results = new HashMap<>();
ClusterState state = ClusterState.builder(new ClusterName("testCloseRoutingTable")).build();
for (int i = 0; i < randomIntBetween(1, 25); i++) {
@ -93,12 +96,17 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
} else {
final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosingBlock();
state = addBlockedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state, closingBlock);
blockedIndices.put(state.metaData().index(indexName).getIndex(), closingBlock);
results.put(state.metaData().index(indexName).getIndex(), new AcknowledgedResponse(randomBoolean()));
final Index index = state.metaData().index(indexName).getIndex();
blockedIndices.put(index, closingBlock);
if (randomBoolean()) {
results.put(index, new CloseIndexResponse.IndexResult(index));
} else {
results.put(index, new CloseIndexResponse.IndexResult(index, new Exception("test")));
}
}
}
final ClusterState updatedState = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results);
final ClusterState updatedState = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results).v1();
assertThat(updatedState.metaData().indices().size(), equalTo(nonBlockedIndices.size() + blockedIndices.size()));
for (Index nonBlockedIndex : nonBlockedIndices) {
@ -106,7 +114,7 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
assertThat(updatedState.blocks().hasIndexBlockWithId(nonBlockedIndex.getName(), INDEX_CLOSED_BLOCK_ID), is(false));
}
for (Index blockedIndex : blockedIndices.keySet()) {
if (results.get(blockedIndex).isAcknowledged()) {
if (results.get(blockedIndex).hasFailures() == false) {
assertIsClosed(blockedIndex.getName(), updatedState);
} else {
assertIsOpened(blockedIndex.getName(), updatedState);
@ -118,7 +126,7 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
public void testCloseRoutingTableRemovesRoutingTable() {
final Set<Index> nonBlockedIndices = new HashSet<>();
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
final Map<Index, AcknowledgedResponse> results = new HashMap<>();
final Map<Index, IndexResult> results = new HashMap<>();
final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosingBlock();
ClusterState state = ClusterState.builder(new ClusterName("testCloseRoutingTableRemovesRoutingTable")).build();
@ -130,8 +138,13 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
nonBlockedIndices.add(state.metaData().index(indexName).getIndex());
} else {
state = addBlockedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state, closingBlock);
blockedIndices.put(state.metaData().index(indexName).getIndex(), closingBlock);
results.put(state.metaData().index(indexName).getIndex(), new AcknowledgedResponse(randomBoolean()));
final Index index = state.metaData().index(indexName).getIndex();
blockedIndices.put(index, closingBlock);
if (randomBoolean()) {
results.put(index, new CloseIndexResponse.IndexResult(index));
} else {
results.put(index, new CloseIndexResponse.IndexResult(index, new Exception("test")));
}
}
}
@ -143,7 +156,7 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
new HashSet<>(Arrays.asList(DiscoveryNode.Role.values())), Version.V_7_2_0)))
.build();
state = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results);
state = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results).v1();
assertThat(state.metaData().indices().size(), equalTo(nonBlockedIndices.size() + blockedIndices.size()));
for (Index nonBlockedIndex : nonBlockedIndices) {
@ -151,7 +164,7 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
assertThat(state.blocks().hasIndexBlockWithId(nonBlockedIndex.getName(), INDEX_CLOSED_BLOCK_ID), is(false));
}
for (Index blockedIndex : blockedIndices.keySet()) {
if (results.get(blockedIndex).isAcknowledged()) {
if (results.get(blockedIndex).hasFailures() == false) {
IndexMetaData indexMetaData = state.metaData().index(blockedIndex);
assertThat(indexMetaData.getState(), is(IndexMetaData.State.CLOSE));
Settings indexSettings = indexMetaData.getSettings();
@ -330,6 +343,33 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
}
}
public void testCloseFailedIfBlockDisappeared() {
ClusterState state = ClusterState.builder(new ClusterName("failedIfBlockDisappeared")).build();
Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
int numIndices = between(1, 10);
Set<Index> disappearedIndices = new HashSet<>();
Map<Index, IndexResult> verifyResults = new HashMap<>();
for (int i = 0; i < numIndices; i++) {
String indexName = "test-" + i;
state = addOpenedIndex(indexName, randomIntBetween(1, 3), randomIntBetween(0, 3), state);
Index index = state.metaData().index(indexName).getIndex();
state = MetaDataIndexStateService.addIndexClosedBlocks(new Index[]{index}, blockedIndices, state);
if (randomBoolean()) {
state = ClusterState.builder(state)
.blocks(ClusterBlocks.builder().blocks(state.blocks()).removeIndexBlocks(indexName).build())
.build();
disappearedIndices.add(index);
}
verifyResults.put(index, new IndexResult(index));
}
Collection<IndexResult> closingResults =
MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, unmodifiableMap(verifyResults)).v2();
assertThat(closingResults, hasSize(numIndices));
Set<Index> failedIndices = closingResults.stream().filter(IndexResult::hasFailures)
.map(IndexResult::getIndex).collect(Collectors.toSet());
assertThat(failedIndices, equalTo(disappearedIndices));
}
public static ClusterState createClusterForShardLimitTest(int nodesInCluster, int openIndexShards, int openIndexReplicas,
int closedIndexShards, int closedIndexReplicas, Settings clusterSettings) {
ImmutableOpenMap.Builder<String, DiscoveryNode> dataNodes = ImmutableOpenMap.builder();

View File

@ -18,7 +18,7 @@
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.index.Index;
@ -43,7 +43,7 @@ public class MetaDataIndexStateServiceUtils {
*/
public static ClusterState closeRoutingTable(final ClusterState state,
final Map<Index, ClusterBlock> blockedIndices,
final Map<Index, AcknowledgedResponse> results) {
return MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results);
final Map<Index, CloseIndexResponse.IndexResult> results) {
return MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results).v1();
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@ -40,7 +41,6 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.support.master.TransportMasterNodeActionUtils;
@ -227,8 +227,8 @@ public class ClusterStateChanges {
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
ClusterState newState = MetaDataIndexStateServiceUtils.addIndexClosedBlocks(concreteIndices, blockedIndices, state);
newState = MetaDataIndexStateServiceUtils.closeRoutingTable(newState, blockedIndices, blockedIndices.keySet().stream()
.collect(Collectors.toMap(Function.identity(), index -> new AcknowledgedResponse(true))));
newState = MetaDataIndexStateServiceUtils.closeRoutingTable(newState, blockedIndices,
blockedIndices.keySet().stream().collect(Collectors.toMap(Function.identity(), CloseIndexResponse.IndexResult::new)));
return allocationService.reroute(newState, "indices closed");
}

View File

@ -20,6 +20,7 @@ package org.elasticsearch.indices.state;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client;
@ -45,6 +46,7 @@ import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
@ -64,6 +66,7 @@ import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class CloseIndexIT extends ESIntegTestCase {
@ -115,7 +118,7 @@ public class CloseIndexIT extends ESIntegTestCase {
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs)
.mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList()));
assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName)));
assertBusy(() -> closeIndices(indexName));
assertIndexIsClosed(indexName);
assertAcked(client().admin().indices().prepareOpen(indexName));
@ -130,13 +133,17 @@ public class CloseIndexIT extends ESIntegTestCase {
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(1, 10))
.mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList()));
}
// First close should be acked
assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName)));
// First close should be fully acked
assertBusy(() -> closeIndices(indexName));
assertIndexIsClosed(indexName);
// Second close should be acked too
final ActiveShardCount activeShardCount = randomFrom(ActiveShardCount.NONE, ActiveShardCount.DEFAULT, ActiveShardCount.ALL);
assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName).setWaitForActiveShards(activeShardCount)));
assertBusy(() -> {
CloseIndexResponse response = client().admin().indices().prepareClose(indexName).setWaitForActiveShards(activeShardCount).get();
assertAcked(response);
assertTrue(response.getIndices().isEmpty());
});
assertIndexIsClosed(indexName);
}
@ -150,7 +157,7 @@ public class CloseIndexIT extends ESIntegTestCase {
assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.OPEN));
assertThat(clusterState.routingTable().allShards().stream().allMatch(ShardRouting::unassigned), is(true));
assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName).setWaitForActiveShards(ActiveShardCount.NONE)));
assertBusy(() -> closeIndices(client().admin().indices().prepareClose(indexName).setWaitForActiveShards(ActiveShardCount.NONE)));
assertIndexIsClosed(indexName);
}
@ -198,7 +205,7 @@ public class CloseIndexIT extends ESIntegTestCase {
indexer.setAssertNoFailuresOnStop(false);
waitForDocs(randomIntBetween(10, 50), indexer);
assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName)));
assertBusy(() -> closeIndices(indexName));
indexer.stop();
nbDocs += indexer.totalIndexedDocs();
@ -348,6 +355,9 @@ public class CloseIndexIT extends ESIntegTestCase {
assertThat(client().admin().cluster().prepareHealth(indexName).get().getStatus(), is(ClusterHealthStatus.GREEN));
assertTrue(closeIndexResponse.isAcknowledged());
assertTrue(closeIndexResponse.isShardsAcknowledged());
assertThat(closeIndexResponse.getIndices().get(0), notNullValue());
assertThat(closeIndexResponse.getIndices().get(0).hasFailures(), is(false));
assertThat(closeIndexResponse.getIndices().get(0).getIndex().getName(), equalTo(indexName));
assertIndexIsClosed(indexName);
}
@ -451,6 +461,36 @@ public class CloseIndexIT extends ESIntegTestCase {
}
}
private static void closeIndices(final String... indices) {
closeIndices(client().admin().indices().prepareClose(indices));
}
private static void closeIndices(final CloseIndexRequestBuilder requestBuilder) {
final CloseIndexResponse response = requestBuilder.get();
assertThat(response.isAcknowledged(), is(true));
assertThat(response.isShardsAcknowledged(), is(true));
final String[] indices = requestBuilder.request().indices();
if (indices != null) {
assertThat(response.getIndices().size(), equalTo(indices.length));
for (String index : indices) {
CloseIndexResponse.IndexResult indexResult = response.getIndices().stream()
.filter(result -> index.equals(result.getIndex().getName())).findFirst().get();
assertThat(indexResult, notNullValue());
assertThat(indexResult.hasFailures(), is(false));
assertThat(indexResult.getException(), nullValue());
assertThat(indexResult.getShards(), notNullValue());
Arrays.stream(indexResult.getShards()).forEach(shardResult -> {
assertThat(shardResult.hasFailures(), is(false));
assertThat(shardResult.getFailures(), notNullValue());
assertThat(shardResult.getFailures().length, equalTo(0));
});
}
} else {
assertThat(response.getIndices().size(), equalTo(0));
}
}
static void assertIndexIsClosed(final String... indices) {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
for (String index : indices) {

View File

@ -44,7 +44,7 @@ public class CloseFollowerIndexStepTests extends AbstractStepTestCase<CloseFollo
assertThat(closeIndexRequest.indices()[0], equalTo("follower-index"));
@SuppressWarnings("unchecked")
ActionListener<CloseIndexResponse> listener = (ActionListener<CloseIndexResponse>) invocation.getArguments()[1];
listener.onResponse(new CloseIndexResponse(true, true));
listener.onResponse(new CloseIndexResponse(true, true, Collections.emptyList()));
return null;
}).when(indicesClient).close(Mockito.any(), Mockito.any());