Index creation waits for active shard copies before returning (#18985)

Before returning, index creation now waits for the configured number
of shard copies to be started. In the past, a client would create an
index and then potentially have to check the cluster health to wait
to execute write operations. With the cluster health semantics changing
so that index creation does not cause the cluster health to go RED,
this change enables waiting for the desired number of active shards
to be active before returning from index creation.

Relates #9126
This commit is contained in:
Ali Beyad 2016-07-15 11:19:27 -04:00 committed by GitHub
parent 7759c23272
commit d78f40fb1e
47 changed files with 1282 additions and 121 deletions

View File

@ -19,6 +19,8 @@
package org.elasticsearch.action; package org.elasticsearch.action;
import java.util.function.Consumer;
/** /**
* A listener for action responses or failures. * A listener for action responses or failures.
*/ */
@ -33,4 +35,31 @@ public interface ActionListener<Response> {
* A failure caused by an exception at some phase of the task. * A failure caused by an exception at some phase of the task.
*/ */
void onFailure(Exception e); void onFailure(Exception e);
/**
* Creates a listener that listens for a response (or failure) and executes the
* corresponding consumer when the response (or failure) is received.
*
* @param onResponse the consumer of the response, when the listener receives one
* @param onFailure the consumer of the failure, when the listener receives one
* @param <Response> the type of the response
* @return a listener that listens for responses and invokes the consumer when received
*/
static <Response> ActionListener<Response> wrap(Consumer<Response> onResponse, Consumer<Exception> onFailure) {
return new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
onResponse.accept(response);
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
onFailure.accept(e);
}
};
}
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.create; package org.elasticsearch.action.admin.indices.create;
import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -55,6 +56,8 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ
private final Set<ClusterBlock> blocks = new HashSet<>(); private final Set<ClusterBlock> blocks = new HashSet<>();
private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
public CreateIndexClusterStateUpdateRequest(TransportMessage originalMessage, String cause, String index, boolean updateAllTypes) { public CreateIndexClusterStateUpdateRequest(TransportMessage originalMessage, String cause, String index, boolean updateAllTypes) {
this.originalMessage = originalMessage; this.originalMessage = originalMessage;
@ -98,6 +101,11 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ
return this; return this;
} }
public CreateIndexClusterStateUpdateRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}
public TransportMessage originalMessage() { public TransportMessage originalMessage() {
return originalMessage; return originalMessage;
} }
@ -142,4 +150,8 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ
public boolean updateAllTypes() { public boolean updateAllTypes() {
return updateAllTypes; return updateAllTypes;
} }
public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}
} }

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -77,6 +78,8 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
private boolean updateAllTypes = false; private boolean updateAllTypes = false;
private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
public CreateIndexRequest() { public CreateIndexRequest() {
} }
@ -440,6 +443,30 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
return this; return this;
} }
public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}
/**
* Sets the number of shard copies that should be active for index creation to return.
* Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
* (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
* wait for all shards (primary and all replicas) to be active before returning.
* Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
* Index creation will only wait up until the timeout value for the number of shard copies
* to be active before returning. Check {@link CreateIndexResponse#isShardsAcked()} to
* determine if the requisite shard copies were all started before returning or timing out.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public CreateIndexRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
@ -462,6 +489,7 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
aliases.add(Alias.read(in)); aliases.add(Alias.read(in));
} }
updateAllTypes = in.readBoolean(); updateAllTypes = in.readBoolean();
waitForActiveShards = ActiveShardCount.readFrom(in);
} }
@Override @Override
@ -486,5 +514,6 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
alias.writeTo(out); alias.writeTo(out);
} }
out.writeBoolean(updateAllTypes); out.writeBoolean(updateAllTypes);
waitForActiveShards.writeTo(out);
} }
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.create; package org.elasticsearch.action.admin.indices.create;
import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder; import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -249,4 +250,23 @@ public class CreateIndexRequestBuilder extends AcknowledgedRequestBuilder<Create
request.updateAllTypes(updateAllTypes); request.updateAllTypes(updateAllTypes);
return this; return this;
} }
/**
* Sets the number of shard copies that should be active for index creation to return.
* Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
* (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
* wait for all shards (primary and all replicas) to be active before returning.
* Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
* Index creation will only wait up until the timeout value for the number of shard copies
* to be active before returning. Check {@link CreateIndexResponse#isShardsAcked()} to
* determine if the requisite shard copies were all started before returning or timing out.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public CreateIndexRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
request.waitForActiveShards(waitForActiveShards);
return this;
}
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.indices.create;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException; import java.io.IOException;
@ -30,22 +31,41 @@ import java.io.IOException;
*/ */
public class CreateIndexResponse extends AcknowledgedResponse { public class CreateIndexResponse extends AcknowledgedResponse {
private boolean shardsAcked;
protected CreateIndexResponse() { protected CreateIndexResponse() {
} }
protected CreateIndexResponse(boolean acknowledged) { protected CreateIndexResponse(boolean acknowledged, boolean shardsAcked) {
super(acknowledged); super(acknowledged);
assert acknowledged || shardsAcked == false; // if its not acknowledged, then shards acked should be false too
this.shardsAcked = shardsAcked;
} }
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
readAcknowledged(in); readAcknowledged(in);
shardsAcked = in.readBoolean();
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
writeAcknowledged(out); writeAcknowledged(out);
out.writeBoolean(shardsAcked);
}
/**
* Returns true if the requisite number of shards were started before
* returning from the index creation operation. If {@link #isAcknowledged()}
* is false, then this also returns false.
*/
public boolean isShardsAcked() {
return shardsAcked;
}
public void addCustomFields(XContentBuilder builder) throws IOException {
builder.field("shards_acknowledged", isShardsAcked());
} }
} }

View File

@ -23,7 +23,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -31,7 +30,6 @@ import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -77,24 +75,12 @@ public class TransportCreateIndexAction extends TransportMasterNodeAction<Create
final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, indexName, request.updateAllTypes()) final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, indexName, request.updateAllTypes())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings()).mappings(request.mappings()) .settings(request.settings()).mappings(request.mappings())
.aliases(request.aliases()).customs(request.customs()); .aliases(request.aliases()).customs(request.customs())
.waitForActiveShards(request.waitForActiveShards());
createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() { createIndexService.createIndex(updateRequest, ActionListener.wrap(response ->
listener.onResponse(new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcked())),
@Override listener::onFailure));
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new CreateIndexResponse(response.isAcknowledged()));
}
@Override
public void onFailure(Exception t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create", t, request.index());
} else {
logger.debug("[{}] failed to create", t, request.index());
}
listener.onFailure(t);
}
});
} }
} }

View File

@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
@ -206,4 +207,22 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
} }
} }
/**
* Sets the number of shard copies that should be active for creation of the
* new rollover index to return. Defaults to {@link ActiveShardCount#DEFAULT}, which will
* wait for one shard copy (the primary) to become active. Set this value to
* {@link ActiveShardCount#ALL} to wait for all shards (primary and all replicas) to be active
* before returning. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
* Index creation will only wait up until the timeout value for the number of shard copies
* to be active before returning. Check {@link RolloverResponse#isShardsAcked()} to
* determine if the requisite shard copies were all started before returning or timing out.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
this.createIndexRequest.waitForActiveShards(waitForActiveShards);
}
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.indices.rollover; package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -70,4 +71,23 @@ public class RolloverRequestBuilder extends MasterNodeOperationRequestBuilder<Ro
this.request.getCreateIndexRequest().mapping(type, source); this.request.getCreateIndexRequest().mapping(type, source);
return this; return this;
} }
/**
* Sets the number of shard copies that should be active for creation of the
* new rollover index to return. Defaults to {@link ActiveShardCount#DEFAULT}, which will
* wait for one shard copy (the primary) to become active. Set this value to
* {@link ActiveShardCount#ALL} to wait for all shards (primary and all replicas) to be active
* before returning. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
* Index creation will only wait up until the timeout value for the number of shard copies
* to be active before returning. Check {@link RolloverResponse#isShardsAcked()} to
* determine if the requisite shard copies were all started before returning or timing out.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public RolloverRequestBuilder waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.request.setWaitForActiveShards(waitForActiveShards);
return this;
}
} }

View File

@ -39,22 +39,28 @@ public final class RolloverResponse extends ActionResponse implements ToXContent
private static final String DRY_RUN = "dry_run"; private static final String DRY_RUN = "dry_run";
private static final String ROLLED_OVER = "rolled_over"; private static final String ROLLED_OVER = "rolled_over";
private static final String CONDITIONS = "conditions"; private static final String CONDITIONS = "conditions";
private static final String ACKNOWLEDGED = "acknowledged";
private static final String SHARDS_ACKED = "shards_acknowledged";
private String oldIndex; private String oldIndex;
private String newIndex; private String newIndex;
private Set<Map.Entry<String, Boolean>> conditionStatus; private Set<Map.Entry<String, Boolean>> conditionStatus;
private boolean dryRun; private boolean dryRun;
private boolean rolledOver; private boolean rolledOver;
private boolean acknowledged;
private boolean shardsAcked;
RolloverResponse() { RolloverResponse() {
} }
RolloverResponse(String oldIndex, String newIndex, Set<Condition.Result> conditionResults, RolloverResponse(String oldIndex, String newIndex, Set<Condition.Result> conditionResults,
boolean dryRun, boolean rolledOver) { boolean dryRun, boolean rolledOver, boolean acknowledged, boolean shardsAcked) {
this.oldIndex = oldIndex; this.oldIndex = oldIndex;
this.newIndex = newIndex; this.newIndex = newIndex;
this.dryRun = dryRun; this.dryRun = dryRun;
this.rolledOver = rolledOver; this.rolledOver = rolledOver;
this.acknowledged = acknowledged;
this.shardsAcked = shardsAcked;
this.conditionStatus = conditionResults.stream() this.conditionStatus = conditionResults.stream()
.map(result -> new AbstractMap.SimpleEntry<>(result.condition.toString(), result.matched)) .map(result -> new AbstractMap.SimpleEntry<>(result.condition.toString(), result.matched))
.collect(Collectors.toSet()); .collect(Collectors.toSet());
@ -89,12 +95,31 @@ public final class RolloverResponse extends ActionResponse implements ToXContent
} }
/** /**
* Returns if the rollover was not simulated and the conditions were met * Returns true if the rollover was not simulated and the conditions were met
*/ */
public boolean isRolledOver() { public boolean isRolledOver() {
return rolledOver; return rolledOver;
} }
/**
* Returns true if the creation of the new rollover index and switching of the
* alias to the newly created index was successful, and returns false otherwise.
* If {@link #isDryRun()} is true, then this will also return false. If this
* returns false, then {@link #isShardsAcked()} will also return false.
*/
public boolean isAcknowledged() {
return acknowledged;
}
/**
* Returns true if the requisite number of shards were started in the newly
* created rollover index before returning. If {@link #isAcknowledged()} is
* false, then this will also return false.
*/
public boolean isShardsAcked() {
return shardsAcked;
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
@ -110,6 +135,8 @@ public final class RolloverResponse extends ActionResponse implements ToXContent
conditionStatus = conditions; conditionStatus = conditions;
dryRun = in.readBoolean(); dryRun = in.readBoolean();
rolledOver = in.readBoolean(); rolledOver = in.readBoolean();
acknowledged = in.readBoolean();
shardsAcked = in.readBoolean();
} }
@Override @Override
@ -124,6 +151,8 @@ public final class RolloverResponse extends ActionResponse implements ToXContent
} }
out.writeBoolean(dryRun); out.writeBoolean(dryRun);
out.writeBoolean(rolledOver); out.writeBoolean(rolledOver);
out.writeBoolean(acknowledged);
out.writeBoolean(shardsAcked);
} }
@Override @Override
@ -132,6 +161,8 @@ public final class RolloverResponse extends ActionResponse implements ToXContent
builder.field(NEW_INDEX, newIndex); builder.field(NEW_INDEX, newIndex);
builder.field(ROLLED_OVER, rolledOver); builder.field(ROLLED_OVER, rolledOver);
builder.field(DRY_RUN, dryRun); builder.field(DRY_RUN, dryRun);
builder.field(ACKNOWLEDGED, acknowledged);
builder.field(SHARDS_ACKED, shardsAcked);
builder.startObject(CONDITIONS); builder.startObject(CONDITIONS);
for (Map.Entry<String, Boolean> entry : conditionStatus) { for (Map.Entry<String, Boolean> entry : conditionStatus) {
builder.field(entry.getKey(), entry.getValue()); builder.field(entry.getKey(), entry.getValue());

View File

@ -25,11 +25,12 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpda
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.ActiveShardsObserver;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasAction; import org.elasticsearch.cluster.metadata.AliasAction;
@ -58,6 +59,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
private static final Pattern INDEX_NAME_PATTERN = Pattern.compile("^.*-(\\d)+$"); private static final Pattern INDEX_NAME_PATTERN = Pattern.compile("^.*-(\\d)+$");
private final MetaDataCreateIndexService createIndexService; private final MetaDataCreateIndexService createIndexService;
private final MetaDataIndexAliasesService indexAliasesService; private final MetaDataIndexAliasesService indexAliasesService;
private final ActiveShardsObserver activeShardsObserver;
private final Client client; private final Client client;
@Inject @Inject
@ -70,6 +72,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
this.createIndexService = createIndexService; this.createIndexService = createIndexService;
this.indexAliasesService = indexAliasesService; this.indexAliasesService = indexAliasesService;
this.client = client; this.client = client;
this.activeShardsObserver = new ActiveShardsObserver(settings, clusterService, threadPool);
} }
@Override @Override
@ -110,42 +113,34 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
: generateRolloverIndexName(sourceIndexName); : generateRolloverIndexName(sourceIndexName);
if (rolloverRequest.isDryRun()) { if (rolloverRequest.isDryRun()) {
listener.onResponse( listener.onResponse(
new RolloverResponse(sourceIndexName, rolloverIndexName, conditionResults, true, false)); new RolloverResponse(sourceIndexName, rolloverIndexName, conditionResults, true, false, false, false));
return; return;
} }
if (conditionResults.size() == 0 || conditionResults.stream().anyMatch(result -> result.matched)) { if (conditionResults.size() == 0 || conditionResults.stream().anyMatch(result -> result.matched)) {
createIndexService.createIndex(prepareCreateIndexRequest(rolloverIndexName, rolloverRequest), CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(rolloverIndexName, rolloverRequest);
new ActionListener<ClusterStateUpdateResponse>() { createIndexService.createIndex(updateRequest, ActionListener.wrap(createIndexClusterStateUpdateResponse -> {
@Override // switch the alias to point to the newly created index
public void onResponse(ClusterStateUpdateResponse response) { indexAliasesService.indicesAliases(
// switch the alias to point to the newly created index prepareRolloverAliasesUpdateRequest(sourceIndexName, rolloverIndexName,
indexAliasesService.indicesAliases( rolloverRequest),
prepareRolloverAliasesUpdateRequest(sourceIndexName, rolloverIndexName, ActionListener.wrap(aliasClusterStateUpdateResponse -> {
rolloverRequest), if (aliasClusterStateUpdateResponse.isAcknowledged()) {
new ActionListener<ClusterStateUpdateResponse>() { activeShardsObserver.waitForActiveShards(rolloverIndexName,
@Override rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { rolloverRequest.masterNodeTimeout(),
listener.onResponse( isShardsAcked -> listener.onResponse(new RolloverResponse(sourceIndexName, rolloverIndexName,
new RolloverResponse(sourceIndexName, rolloverIndexName, conditionResults, false, true, true, isShardsAcked)),
conditionResults, false, true)); listener::onFailure);
} } else {
listener.onResponse(new RolloverResponse(sourceIndexName, rolloverIndexName, conditionResults,
@Override false, true, false, false));
public void onFailure(Exception e) { }
listener.onFailure(e); }, listener::onFailure));
} }, listener::onFailure));
});
}
@Override
public void onFailure(Exception t) {
listener.onFailure(t);
}
});
} else { } else {
// conditions not met // conditions not met
listener.onResponse( listener.onResponse(
new RolloverResponse(sourceIndexName, sourceIndexName, conditionResults, false, false) new RolloverResponse(sourceIndexName, sourceIndexName, conditionResults, false, false, false, false)
); );
} }
} }
@ -216,6 +211,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
.masterNodeTimeout(createIndexRequest.masterNodeTimeout()) .masterNodeTimeout(createIndexRequest.masterNodeTimeout())
.settings(createIndexRequest.settings()) .settings(createIndexRequest.settings())
.aliases(createIndexRequest.aliases()) .aliases(createIndexRequest.aliases())
.waitForActiveShards(ActiveShardCount.NONE) // not waiting for shards here, will wait on the alias switch operation
.mappings(createIndexRequest.mappings()); .mappings(createIndexRequest.mappings());
} }

View File

@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
@ -36,7 +37,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.action.ValidateActions.addValidationError;
@ -126,6 +126,24 @@ public class ShrinkRequest extends AcknowledgedRequest<ShrinkRequest> implements
return sourceIndex; return sourceIndex;
} }
/**
* Sets the number of shard copies that should be active for creation of the
* new shrunken index to return. Defaults to {@link ActiveShardCount#DEFAULT}, which will
* wait for one shard copy (the primary) to become active. Set this value to
* {@link ActiveShardCount#ALL} to wait for all shards (primary and all replicas) to be active
* before returning. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
* Index creation will only wait up until the timeout value for the number of shard copies
* to be active before returning. Check {@link ShrinkResponse#isShardsAcked()} to
* determine if the requisite shard copies were all started before returning or timing out.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
this.getShrinkIndexRequest().waitForActiveShards(waitForActiveShards);
}
public void source(BytesReference source) { public void source(BytesReference source) {
XContentType xContentType = XContentFactory.xContentType(source); XContentType xContentType = XContentFactory.xContentType(source);
if (xContentType != null) { if (xContentType != null) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.indices.shrink; package org.elasticsearch.action.admin.indices.shrink;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder; import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -44,4 +45,23 @@ public class ShrinkRequestBuilder extends AcknowledgedRequestBuilder<ShrinkReque
this.request.getShrinkIndexRequest().settings(settings); this.request.getShrinkIndexRequest().settings(settings);
return this; return this;
} }
/**
* Sets the number of shard copies that should be active for creation of the
* new shrunken index to return. Defaults to {@link ActiveShardCount#DEFAULT}, which will
* wait for one shard copy (the primary) to become active. Set this value to
* {@link ActiveShardCount#ALL} to wait for all shards (primary and all replicas) to be active
* before returning. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
* Index creation will only wait up until the timeout value for the number of shard copies
* to be active before returning. Check {@link ShrinkResponse#isShardsAcked()} to
* determine if the requisite shard copies were all started before returning or timing out.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public ShrinkRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
this.request.setWaitForActiveShards(waitForActiveShards);
return this;
}
} }

View File

@ -25,7 +25,7 @@ public final class ShrinkResponse extends CreateIndexResponse {
ShrinkResponse() { ShrinkResponse() {
} }
ShrinkResponse(boolean acknowledged) { ShrinkResponse(boolean acknowledged, boolean shardsAcked) {
super(acknowledged); super(acknowledged, shardsAcked);
} }
} }

View File

@ -29,7 +29,6 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -40,7 +39,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -93,22 +91,8 @@ public class TransportShrinkAction extends TransportMasterNodeAction<ShrinkReque
IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i); IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
return shard == null ? null : shard.getPrimary().getDocs(); return shard == null ? null : shard.getPrimary().getDocs();
}, indexNameExpressionResolver); }, indexNameExpressionResolver);
createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() { createIndexService.createIndex(updateRequest, ActionListener.wrap(response ->
@Override listener.onResponse(new ShrinkResponse(response.isAcknowledged(), response.isShardsAcked())), listener::onFailure));
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new ShrinkResponse(response.isAcknowledged()));
}
@Override
public void onFailure(Exception t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create shrink index", t, updateRequest.index());
} else {
logger.debug("[{}] failed to create shrink index", t, updateRequest.index());
}
listener.onFailure(t);
}
});
} }
@Override @Override
@ -162,6 +146,7 @@ public class TransportShrinkAction extends TransportMasterNodeAction<ShrinkReque
.settings(targetIndex.settings()) .settings(targetIndex.settings())
.aliases(targetIndex.aliases()) .aliases(targetIndex.aliases())
.customs(targetIndex.customs()) .customs(targetIndex.customs())
.waitForActiveShards(targetIndex.waitForActiveShards())
.shrinkFrom(metaData.getIndex()); .shrinkFrom(metaData.getIndex());
} }

View File

@ -0,0 +1,211 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
/**
* A class whose instances represent a value for counting the number
* of active shard copies for a given shard in an index.
*/
public final class ActiveShardCount implements Writeable {
private static final int ACTIVE_SHARD_COUNT_DEFAULT = -2;
private static final int ALL_ACTIVE_SHARDS = -1;
public static final ActiveShardCount DEFAULT = new ActiveShardCount(ACTIVE_SHARD_COUNT_DEFAULT);
public static final ActiveShardCount ALL = new ActiveShardCount(ALL_ACTIVE_SHARDS);
public static final ActiveShardCount NONE = new ActiveShardCount(0);
public static final ActiveShardCount ONE = new ActiveShardCount(1);
private final int value;
private ActiveShardCount(final int value) {
this.value = value;
}
/**
* Get an ActiveShardCount instance for the given value. The value is first validated to ensure
* it is a valid shard count and throws an IllegalArgumentException if validation fails. Valid
* values are any non-negative number. Directly use {@link ActiveShardCount#DEFAULT} for the
* default value (which is one shard copy) or {@link ActiveShardCount#ALL} to specify all the shards.
*/
public static ActiveShardCount from(final int value) {
if (value < 0) {
throw new IllegalArgumentException("shard count cannot be a negative value");
}
return get(value);
}
private static ActiveShardCount get(final int value) {
switch (validateValue(value)) {
case ACTIVE_SHARD_COUNT_DEFAULT:
return DEFAULT;
case ALL_ACTIVE_SHARDS:
return ALL;
case 1:
return ONE;
case 0:
return NONE;
default:
return new ActiveShardCount(value);
}
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeInt(value);
}
public static ActiveShardCount readFrom(final StreamInput in) throws IOException {
return get(in.readInt());
}
private static int validateValue(final int value) {
if (value < 0 && value != ACTIVE_SHARD_COUNT_DEFAULT && value != ALL_ACTIVE_SHARDS) {
throw new IllegalArgumentException("Invalid ActiveShardCount[" + value + "]");
}
return value;
}
/**
* Resolve this instance to an actual integer value for the number of active shard counts.
* If {@link ActiveShardCount#ALL} is specified, then the given {@link IndexMetaData} is
* used to determine what the actual active shard count should be. The default value indicates
* one active shard.
*/
public int resolve(final IndexMetaData indexMetaData) {
if (this == ActiveShardCount.DEFAULT) {
return 1;
} else if (this == ActiveShardCount.ALL) {
return indexMetaData.getNumberOfReplicas() + 1;
} else {
return value;
}
}
/**
* Parses the active shard count from the given string. Valid values are "all" for
* all shard copies, null for the default value (which defaults to one shard copy),
* or a numeric value greater than or equal to 0. Any other input will throw an
* IllegalArgumentException.
*/
public static ActiveShardCount parseString(final String str) {
if (str == null) {
return ActiveShardCount.DEFAULT;
} else if (str.equals("all")) {
return ActiveShardCount.ALL;
} else {
int val;
try {
val = Integer.parseInt(str);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("cannot parse ActiveShardCount[" + str + "]", e);
}
return ActiveShardCount.from(val);
}
}
/**
* Returns true iff the given cluster state's routing table contains enough active
* shards to meet the required shard count represented by this instance.
*/
public boolean enoughShardsActive(final ClusterState clusterState, final String indexName) {
if (this == ActiveShardCount.NONE) {
// not waiting for any active shards
return true;
}
final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
if (indexMetaData == null) {
// its possible the index was deleted while waiting for active shard copies,
// in this case, we'll just consider it that we have enough active shard copies
// and we can stop waiting
return true;
}
final IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(indexName);
assert indexRoutingTable != null;
if (indexRoutingTable.allPrimaryShardsActive() == false) {
// all primary shards aren't active yet
return false;
}
for (final IntObjectCursor<IndexShardRoutingTable> shardRouting : indexRoutingTable.getShards()) {
if (enoughShardsActive(shardRouting.value, indexMetaData) == false) {
// not enough active shard copies yet
return false;
}
}
return true;
}
/**
* Returns true iff the active shard count in the shard routing table is enough
* to meet the required shard count represented by this instance.
*/
public boolean enoughShardsActive(final IndexShardRoutingTable shardRoutingTable, final IndexMetaData indexMetaData) {
if (shardRoutingTable.activeShards().size() < resolve(indexMetaData)) {
// not enough active shard copies yet
return false;
}
return true;
}
@Override
public int hashCode() {
return Integer.hashCode(value);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
@SuppressWarnings("unchecked") ActiveShardCount that = (ActiveShardCount) o;
return value == that.value;
}
@Override
public String toString() {
final String valStr;
switch (value) {
case ALL_ACTIVE_SHARDS:
valStr = "ALL";
break;
case ACTIVE_SHARD_COUNT_DEFAULT:
valStr = "DEFAULT";
break;
default:
valStr = Integer.toString(value);
}
return "ActiveShardCount[" + valStr + "]";
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.function.Consumer;
/**
* This class provides primitives for waiting for a configured number of shards
* to become active before sending a response on an {@link ActionListener}.
*/
public class ActiveShardsObserver extends AbstractComponent {
private final ClusterService clusterService;
private final ThreadPool threadPool;
public ActiveShardsObserver(final Settings settings, final ClusterService clusterService, final ThreadPool threadPool) {
super(settings);
this.clusterService = clusterService;
this.threadPool = threadPool;
}
/**
* Waits on the specified number of active shards to be started before executing the
*
* @param indexName the index to wait for active shards on
* @param activeShardCount the number of active shards to wait on before returning
* @param timeout the timeout value
* @param onResult a function that is executed in response to the requisite shards becoming active or a timeout (whichever comes first)
* @param onFailure a function that is executed in response to an error occurring during waiting for the active shards
*/
public void waitForActiveShards(final String indexName,
final ActiveShardCount activeShardCount,
final TimeValue timeout,
final Consumer<Boolean> onResult,
final Consumer<Exception> onFailure) {
// wait for the configured number of active shards to be allocated before executing the result consumer
if (activeShardCount == ActiveShardCount.NONE) {
// not waiting, so just run whatever we were to run when the waiting is
onResult.accept(true);
return;
}
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext());
if (activeShardCount.enoughShardsActive(observer.observedState(), indexName)) {
onResult.accept(true);
} else {
final ClusterStateObserver.ChangePredicate shardsAllocatedPredicate =
new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(final ClusterState newState) {
return activeShardCount.enoughShardsActive(newState, indexName);
}
};
final ClusterStateObserver.Listener observerListener = new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
onResult.accept(true);
}
@Override
public void onClusterServiceClose() {
logger.debug("[{}] cluster service closed while waiting for enough shards to be started.", indexName);
onFailure.accept(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
onResult.accept(false);
}
};
observer.waitForNextChange(observerListener, shardsAllocatedPredicate, timeout);
}
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.ack;
/**
* A cluster state update response with specific fields for index creation.
*/
public class CreateIndexClusterStateUpdateResponse extends ClusterStateUpdateResponse {
private final boolean shardsAcked;
public CreateIndexClusterStateUpdateResponse(boolean acknowledged, boolean shardsAcked) {
super(acknowledged);
this.shardsAcked = shardsAcked;
}
/**
* Returns whether the requisite number of shard copies started before the completion of the operation.
*/
public boolean isShardsAcked() {
return shardsAcked;
}
}

View File

@ -27,9 +27,11 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.support.ActiveShardsObserver;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.ack.CreateIndexClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.block.ClusterBlocks;
@ -68,6 +70,7 @@ import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndexCreationException; import org.elasticsearch.indices.IndexCreationException;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.threadpool.ThreadPool;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
@ -108,13 +111,15 @@ public class MetaDataCreateIndexService extends AbstractComponent {
private final Environment env; private final Environment env;
private final NodeServicesProvider nodeServicesProvider; private final NodeServicesProvider nodeServicesProvider;
private final IndexScopedSettings indexScopedSettings; private final IndexScopedSettings indexScopedSettings;
private final ActiveShardsObserver activeShardsObserver;
@Inject @Inject
public MetaDataCreateIndexService(Settings settings, ClusterService clusterService, public MetaDataCreateIndexService(Settings settings, ClusterService clusterService,
IndicesService indicesService, AllocationService allocationService, IndicesService indicesService, AllocationService allocationService,
AliasValidator aliasValidator, AliasValidator aliasValidator,
Set<IndexTemplateFilter> indexTemplateFilters, Environment env, NodeServicesProvider nodeServicesProvider, IndexScopedSettings indexScopedSettings) { Set<IndexTemplateFilter> indexTemplateFilters, Environment env,
NodeServicesProvider nodeServicesProvider, IndexScopedSettings indexScopedSettings,
ThreadPool threadPool) {
super(settings); super(settings);
this.clusterService = clusterService; this.clusterService = clusterService;
this.indicesService = indicesService; this.indicesService = indicesService;
@ -135,6 +140,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
} }
this.indexTemplateFilter = new IndexTemplateFilter.Compound(templateFilters); this.indexTemplateFilter = new IndexTemplateFilter.Compound(templateFilters);
} }
this.activeShardsObserver = new ActiveShardsObserver(settings, clusterService, threadPool);
} }
public void validateIndexName(String index, ClusterState state) { public void validateIndexName(String index, ClusterState state) {
@ -176,7 +182,38 @@ public class MetaDataCreateIndexService extends AbstractComponent {
} }
} }
public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) { /**
* Creates an index in the cluster state and waits for the specified number of shard copies to
* become active (as specified in {@link CreateIndexClusterStateUpdateRequest#waitForActiveShards()})
* before sending the response on the listener. If the index creation was successfully applied on
* the cluster state, then {@link CreateIndexClusterStateUpdateResponse#isAcknowledged()} will return
* true, otherwise it will return false and no waiting will occur for started shards
* ({@link CreateIndexClusterStateUpdateResponse#isShardsAcked()} will also be false). If the index
* creation in the cluster state was successful and the requisite shard copies were started before
* the timeout, then {@link CreateIndexClusterStateUpdateResponse#isShardsAcked()} will
* return true, otherwise if the operation timed out, then it will return false.
*
* @param request the index creation cluster state update request
* @param listener the listener on which to send the index creation cluster state update response
*/
public void createIndex(final CreateIndexClusterStateUpdateRequest request,
final ActionListener<CreateIndexClusterStateUpdateResponse> listener) {
onlyCreateIndex(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
activeShardsObserver.waitForActiveShards(request.index(), request.waitForActiveShards(), request.ackTimeout(),
shardsAcked -> {
logger.debug("[{}] index created, but the operation timed out while waiting for " +
"enough shards to be started.", request.index());
listener.onResponse(new CreateIndexClusterStateUpdateResponse(response.isAcknowledged(), shardsAcked));
}, listener::onFailure);
} else {
listener.onResponse(new CreateIndexClusterStateUpdateResponse(false, false));
}
}, listener::onFailure));
}
private void onlyCreateIndex(final CreateIndexClusterStateUpdateRequest request,
final ActionListener<ClusterStateUpdateResponse> listener) {
Settings.Builder updatedSettingsBuilder = Settings.builder(); Settings.Builder updatedSettingsBuilder = Settings.builder();
updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX); updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX);
indexScopedSettings.validate(updatedSettingsBuilder); indexScopedSettings.validate(updatedSettingsBuilder);
@ -308,6 +345,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
.setRoutingNumShards(routingNumShards); .setRoutingNumShards(routingNumShards);
// Set up everything, now locally create the index to see that things are ok, and apply // Set up everything, now locally create the index to see that things are ok, and apply
final IndexMetaData tmpImd = tmpImdBuilder.settings(actualIndexSettings).build(); final IndexMetaData tmpImd = tmpImdBuilder.settings(actualIndexSettings).build();
if (request.waitForActiveShards().resolve(tmpImd) > tmpImd.getNumberOfReplicas() + 1) {
throw new IllegalArgumentException("invalid wait_for_active_shards[" + request.waitForActiveShards() +
"]: cannot be greater than number of shard copies [" +
(tmpImd.getNumberOfReplicas() + 1) + "]");
}
// create the index here (on the master) to validate it can be created, as well as adding the mapping // create the index here (on the master) to validate it can be created, as well as adding the mapping
final IndexService indexService = indicesService.createIndex(nodeServicesProvider, tmpImd, Collections.emptyList()); final IndexService indexService = indicesService.createIndex(nodeServicesProvider, tmpImd, Collections.emptyList());
createdIndex = indexService.index(); createdIndex = indexService.index();
@ -408,6 +450,16 @@ public class MetaDataCreateIndexService extends AbstractComponent {
} }
} }
} }
@Override
public void onFailure(String source, Exception e) {
if (e instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create", e, request.index());
} else {
logger.debug("[{}] failed to create", e, request.index());
}
super.onFailure(source, e);
}
}); });
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.rest.action.admin.indices;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BaseRestHandler;
@ -51,6 +52,7 @@ public class RestRolloverIndexAction extends BaseRestHandler {
rolloverIndexRequest.dryRun(request.paramAsBoolean("dry_run", false)); rolloverIndexRequest.dryRun(request.paramAsBoolean("dry_run", false));
rolloverIndexRequest.timeout(request.paramAsTime("timeout", rolloverIndexRequest.timeout())); rolloverIndexRequest.timeout(request.paramAsTime("timeout", rolloverIndexRequest.timeout()));
rolloverIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", rolloverIndexRequest.masterNodeTimeout())); rolloverIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", rolloverIndexRequest.masterNodeTimeout()));
rolloverIndexRequest.setWaitForActiveShards(ActiveShardCount.parseString(request.param("wait_for_active_shards")));
client.admin().indices().rolloverIndex(rolloverIndexRequest, new RestToXContentListener<>(channel)); client.admin().indices().rolloverIndex(rolloverIndexRequest, new RestToXContentListener<>(channel));
} }
} }

View File

@ -21,14 +21,19 @@ package org.elasticsearch.rest.action.admin.indices;
import org.elasticsearch.action.admin.indices.shrink.ShrinkRequest; import org.elasticsearch.action.admin.indices.shrink.ShrinkRequest;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.action.admin.indices.shrink.ShrinkResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.AcknowledgedRestListener; import org.elasticsearch.rest.action.support.AcknowledgedRestListener;
import java.io.IOException;
/** /**
* *
*/ */
@ -56,6 +61,12 @@ public class RestShrinkIndexAction extends BaseRestHandler {
} }
shrinkIndexRequest.timeout(request.paramAsTime("timeout", shrinkIndexRequest.timeout())); shrinkIndexRequest.timeout(request.paramAsTime("timeout", shrinkIndexRequest.timeout()));
shrinkIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", shrinkIndexRequest.masterNodeTimeout())); shrinkIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", shrinkIndexRequest.masterNodeTimeout()));
client.admin().indices().shrinkIndex(shrinkIndexRequest, new AcknowledgedRestListener<>(channel)); shrinkIndexRequest.setWaitForActiveShards(ActiveShardCount.parseString(request.param("wait_for_active_shards")));
client.admin().indices().shrinkIndex(shrinkIndexRequest, new AcknowledgedRestListener<ShrinkResponse>(channel) {
@Override
public void addCustomFields(XContentBuilder builder, ShrinkResponse response) throws IOException {
response.addCustomFields(builder);
}
});
} }
} }

View File

@ -22,14 +22,18 @@ package org.elasticsearch.rest.action.admin.indices.create;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.AcknowledgedRestListener; import org.elasticsearch.rest.action.support.AcknowledgedRestListener;
import java.io.IOException;
/** /**
* *
*/ */
@ -52,6 +56,12 @@ public class RestCreateIndexAction extends BaseRestHandler {
createIndexRequest.updateAllTypes(request.paramAsBoolean("update_all_types", false)); createIndexRequest.updateAllTypes(request.paramAsBoolean("update_all_types", false));
createIndexRequest.timeout(request.paramAsTime("timeout", createIndexRequest.timeout())); createIndexRequest.timeout(request.paramAsTime("timeout", createIndexRequest.timeout()));
createIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", createIndexRequest.masterNodeTimeout())); createIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", createIndexRequest.masterNodeTimeout()));
client.admin().indices().create(createIndexRequest, new AcknowledgedRestListener<CreateIndexResponse>(channel)); createIndexRequest.waitForActiveShards(ActiveShardCount.parseString(request.param("wait_for_active_shards")));
client.admin().indices().create(createIndexRequest, new AcknowledgedRestListener<CreateIndexResponse>(channel) {
@Override
public void addCustomFields(XContentBuilder builder, CreateIndexResponse response) throws IOException {
response.addCustomFields(builder);
}
});
} }
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.cluster.allocation;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
@ -29,10 +30,10 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
@ -54,11 +55,11 @@ public final class ClusterAllocationExplainIT extends ESIntegTestCase {
}); });
logger.info("--> creating 'test' index"); logger.info("--> creating 'test' index");
prepareCreate("test").setSettings(Settings.builder() assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "1m") .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "1m")
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 5) .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 5)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)).get(); .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1))
ensureGreen("test"); .setWaitForActiveShards(ActiveShardCount.ALL).get());
logger.info("--> stopping a random node"); logger.info("--> stopping a random node");
assertTrue(internalCluster().stopRandomDataNode()); assertTrue(internalCluster().stopRandomDataNode());
@ -89,6 +90,7 @@ public final class ClusterAllocationExplainIT extends ESIntegTestCase {
.setSettings(Settings.builder() .setSettings(Settings.builder()
.put("index.number_of_shards", 5) .put("index.number_of_shards", 5)
.put("index.number_of_replicas", 1)) .put("index.number_of_replicas", 1))
.setWaitForActiveShards(ActiveShardCount.ALL) // wait on all shards
.get(); .get();
client().admin().indices().prepareCreate("only-baz") client().admin().indices().prepareCreate("only-baz")
@ -96,6 +98,7 @@ public final class ClusterAllocationExplainIT extends ESIntegTestCase {
.put("index.routing.allocation.include.bar", "baz") .put("index.routing.allocation.include.bar", "baz")
.put("index.number_of_shards", 5) .put("index.number_of_shards", 5)
.put("index.number_of_replicas", 1)) .put("index.number_of_replicas", 1))
.setWaitForActiveShards(ActiveShardCount.ALL)
.get(); .get();
client().admin().indices().prepareCreate("only-foo") client().admin().indices().prepareCreate("only-foo")
@ -105,9 +108,6 @@ public final class ClusterAllocationExplainIT extends ESIntegTestCase {
.put("index.number_of_replicas", 1)) .put("index.number_of_replicas", 1))
.get(); .get();
ensureGreen("anywhere", "only-baz");
ensureYellow("only-foo");
ClusterAllocationExplainResponse resp = client().admin().cluster().prepareAllocationExplain() ClusterAllocationExplainResponse resp = client().admin().cluster().prepareAllocationExplain()
.setIndex("only-foo") .setIndex("only-foo")
.setShard(0) .setShard(0)

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -289,7 +290,7 @@ public class CreateIndexIT extends ESIntegTestCase {
public void testRestartIndexCreationAfterFullClusterRestart() throws Exception { public void testRestartIndexCreationAfterFullClusterRestart() throws Exception {
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable",
"none")).get(); "none")).get();
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).get(); client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(indexSettings()).get();
internalCluster().fullRestart(); internalCluster().fullRestart();
ensureGreen("test"); ensureGreen("test");
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.metadata.AliasAction; import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -166,6 +167,8 @@ public class TransportRolloverActionTests extends ESTestCase {
String alias = randomAsciiOfLength(10); String alias = randomAsciiOfLength(10);
String rolloverIndex = randomAsciiOfLength(10); String rolloverIndex = randomAsciiOfLength(10);
final RolloverRequest rolloverRequest = new RolloverRequest(alias, randomAsciiOfLength(10)); final RolloverRequest rolloverRequest = new RolloverRequest(alias, randomAsciiOfLength(10));
final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE;
rolloverRequest.setWaitForActiveShards(activeShardCount);
final Settings settings = Settings.builder() final Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()) .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.indices.shrink;
import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriter;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.EmptyClusterInfoService;
@ -130,6 +131,8 @@ public class TransportShrinkActionTests extends ESTestCase {
int numSourceShards = clusterState.metaData().index(indexName).getNumberOfShards(); int numSourceShards = clusterState.metaData().index(indexName).getNumberOfShards();
DocsStats stats = new DocsStats(randomIntBetween(0, (IndexWriter.MAX_DOCS) / numSourceShards), randomIntBetween(1, 1000)); DocsStats stats = new DocsStats(randomIntBetween(0, (IndexWriter.MAX_DOCS) / numSourceShards), randomIntBetween(1, 1000));
ShrinkRequest target = new ShrinkRequest("target", indexName); ShrinkRequest target = new ShrinkRequest("target", indexName);
final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE;
target.setWaitForActiveShards(activeShardCount);
CreateIndexClusterStateUpdateRequest request = TransportShrinkAction.prepareCreateIndexRequest( CreateIndexClusterStateUpdateRequest request = TransportShrinkAction.prepareCreateIndexRequest(
target, clusterState, (i) -> stats, target, clusterState, (i) -> stats,
new IndexNameExpressionResolver(Settings.EMPTY)); new IndexNameExpressionResolver(Settings.EMPTY));
@ -137,6 +140,7 @@ public class TransportShrinkActionTests extends ESTestCase {
assertEquals(indexName, request.shrinkFrom().getName()); assertEquals(indexName, request.shrinkFrom().getName());
assertEquals("1", request.settings().get("index.number_of_shards")); assertEquals("1", request.settings().get("index.number_of_shards"));
assertEquals("shrink_index", request.cause()); assertEquals("shrink_index", request.cause());
assertEquals(request.waitForActiveShards(), activeShardCount);
} }
private DiscoveryNode newNode(String nodeId) { private DiscoveryNode newNode(String nodeId) {

View File

@ -160,7 +160,7 @@ public class MetaDataIndexTemplateServiceTests extends ESSingleNodeTestCase {
null, null,
new HashSet<>(), new HashSet<>(),
null, null,
null, null); null, null, null);
MetaDataIndexTemplateService service = new MetaDataIndexTemplateService(Settings.EMPTY, null, createIndexService, new AliasValidator(Settings.EMPTY), null, null); MetaDataIndexTemplateService service = new MetaDataIndexTemplateService(Settings.EMPTY, null, createIndexService, new AliasValidator(Settings.EMPTY), null, null);
final List<Throwable> throwables = new ArrayList<>(); final List<Throwable> throwables = new ArrayList<>();
@ -191,6 +191,7 @@ public class MetaDataIndexTemplateServiceTests extends ESSingleNodeTestCase {
new HashSet<>(), new HashSet<>(),
null, null,
nodeServicesProvider, nodeServicesProvider,
null,
null); null);
MetaDataIndexTemplateService service = new MetaDataIndexTemplateService( MetaDataIndexTemplateService service = new MetaDataIndexTemplateService(
Settings.EMPTY, clusterService, createIndexService, new AliasValidator(Settings.EMPTY), indicesService, nodeServicesProvider); Settings.EMPTY, clusterService, createIndexService, new AliasValidator(Settings.EMPTY), indicesService, nodeServicesProvider);

View File

@ -0,0 +1,305 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.nio.ByteBuffer;
import static org.hamcrest.Matchers.equalTo;
/**
* Tests for the {@link ActiveShardCount} class
*/
public class ActiveShardCountTests extends ESTestCase {
public void testFromIntValue() {
assertSame(ActiveShardCount.from(0), ActiveShardCount.NONE);
final int value = randomIntBetween(1, 50);
IndexMetaData indexMetaData = IndexMetaData.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0)
.build();
assertEquals(ActiveShardCount.from(value).resolve(indexMetaData), value);
expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.from(randomIntBetween(-10, -1)));
}
public void testResolve() {
// one shard
IndexMetaData indexMetaData = IndexMetaData.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0)
.build();
assertThat(ActiveShardCount.ALL.resolve(indexMetaData), equalTo(1));
assertThat(ActiveShardCount.DEFAULT.resolve(indexMetaData), equalTo(1));
assertThat(ActiveShardCount.NONE.resolve(indexMetaData), equalTo(0));
final int value = randomIntBetween(2, 20);
assertThat(ActiveShardCount.from(value).resolve(indexMetaData), equalTo(value));
// more than one shard
final int numNewShards = randomIntBetween(1, 20);
indexMetaData = IndexMetaData.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(numNewShards)
.build();
assertThat(ActiveShardCount.ALL.resolve(indexMetaData), equalTo(numNewShards + 1));
assertThat(ActiveShardCount.DEFAULT.resolve(indexMetaData), equalTo(1));
assertThat(ActiveShardCount.NONE.resolve(indexMetaData), equalTo(0));
assertThat(ActiveShardCount.from(value).resolve(indexMetaData), equalTo(value));
}
public void testSerialization() throws IOException {
doWriteRead(ActiveShardCount.ALL);
doWriteRead(ActiveShardCount.DEFAULT);
doWriteRead(ActiveShardCount.NONE);
doWriteRead(ActiveShardCount.from(randomIntBetween(1, 50)));
}
public void testParseString() {
assertSame(ActiveShardCount.parseString("all"), ActiveShardCount.ALL);
assertSame(ActiveShardCount.parseString(null), ActiveShardCount.DEFAULT);
assertSame(ActiveShardCount.parseString("0"), ActiveShardCount.NONE);
int value = randomIntBetween(1, 50);
assertEquals(ActiveShardCount.parseString(value + ""), ActiveShardCount.from(value));
expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.parseString(randomAsciiOfLengthBetween(4, 8)));
expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.parseString("-1")); // magic numbers not exposed through API
expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.parseString("-2"));
expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.parseString(randomIntBetween(-10, -3) + ""));
}
private void doWriteRead(ActiveShardCount activeShardCount) throws IOException {
final BytesStreamOutput out = new BytesStreamOutput();
activeShardCount.writeTo(out);
final ByteBufferStreamInput in = new ByteBufferStreamInput(ByteBuffer.wrap(out.bytes().toBytesRef().bytes));
ActiveShardCount readActiveShardCount = ActiveShardCount.readFrom(in);
if (activeShardCount == ActiveShardCount.DEFAULT
|| activeShardCount == ActiveShardCount.ALL
|| activeShardCount == ActiveShardCount.NONE) {
assertSame(activeShardCount, readActiveShardCount);
} else {
assertEquals(activeShardCount, readActiveShardCount);
}
}
public void testEnoughShardsActiveZero() {
final String indexName = "test-idx";
final int numberOfShards = randomIntBetween(1, 5);
final int numberOfReplicas = randomIntBetween(4, 7);
final ActiveShardCount waitForActiveShards = ActiveShardCount.from(0);
ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startPrimaries(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startAllShards(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
}
public void testEnoughShardsActiveLevelOne() {
runTestForOneActiveShard(ActiveShardCount.ONE);
}
public void testEnoughShardsActiveLevelDefault() {
// default is 1
runTestForOneActiveShard(ActiveShardCount.DEFAULT);
}
public void testEnoughShardsActiveRandom() {
final String indexName = "test-idx";
final int numberOfShards = randomIntBetween(1, 5);
final int numberOfReplicas = randomIntBetween(4, 7);
final ActiveShardCount waitForActiveShards = ActiveShardCount.from(randomIntBetween(2, numberOfReplicas));
ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startPrimaries(clusterState, indexName);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startAllShards(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
}
public void testEnoughShardsActiveLevelAll() {
final String indexName = "test-idx";
final int numberOfShards = randomIntBetween(1, 5);
final int numberOfReplicas = randomIntBetween(4, 7);
// both values should represent "all"
final ActiveShardCount waitForActiveShards = randomBoolean() ? ActiveShardCount.from(numberOfReplicas + 1) : ActiveShardCount.ALL;
ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startPrimaries(clusterState, indexName);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startAllShards(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
}
private void runTestForOneActiveShard(final ActiveShardCount activeShardCount) {
final String indexName = "test-idx";
final int numberOfShards = randomIntBetween(1, 5);
final int numberOfReplicas = randomIntBetween(4, 7);
assert activeShardCount == ActiveShardCount.ONE || activeShardCount == ActiveShardCount.DEFAULT;
final ActiveShardCount waitForActiveShards = activeShardCount;
ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startPrimaries(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startAllShards(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
}
private ClusterState initializeWithNewIndex(final String indexName, final int numShards, final int numReplicas) {
// initial index creation and new routing table info
final IndexMetaData indexMetaData = IndexMetaData.builder(indexName)
.settings(settings(Version.CURRENT)
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()))
.numberOfShards(numShards)
.numberOfReplicas(numReplicas)
.build();
final MetaData metaData = MetaData.builder().put(indexMetaData, true).build();
final RoutingTable routingTable = RoutingTable.builder().addAsNew(indexMetaData).build();
return ClusterState.builder(new ClusterName("test_cluster")).metaData(metaData).routingTable(routingTable).build();
}
private ClusterState startPrimaries(final ClusterState clusterState, final String indexName) {
RoutingTable routingTable = clusterState.routingTable();
IndexRoutingTable indexRoutingTable = routingTable.index(indexName);
IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex());
for (final ObjectCursor<IndexShardRoutingTable> shardEntry : indexRoutingTable.getShards().values()) {
final IndexShardRoutingTable shardRoutingTable = shardEntry.value;
for (ShardRouting shardRouting : shardRoutingTable.getShards()) {
if (shardRouting.primary()) {
shardRouting = shardRouting.initialize(randomAsciiOfLength(8), null, shardRouting.getExpectedShardSize())
.moveToStarted();
}
newIndexRoutingTable.addShard(shardRouting);
}
}
routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build();
return ClusterState.builder(clusterState).routingTable(routingTable).build();
}
private ClusterState startLessThanWaitOnShards(final ClusterState clusterState, final String indexName,
final ActiveShardCount waitForActiveShards) {
RoutingTable routingTable = clusterState.routingTable();
IndexRoutingTable indexRoutingTable = routingTable.index(indexName);
IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex());
for (final ObjectCursor<IndexShardRoutingTable> shardEntry : indexRoutingTable.getShards().values()) {
final IndexShardRoutingTable shardRoutingTable = shardEntry.value;
assert shardRoutingTable.getSize() > 2;
// want less than half, and primary is already started
int numToStart = waitForActiveShards.resolve(clusterState.metaData().index(indexName)) - 2;
for (ShardRouting shardRouting : shardRoutingTable.getShards()) {
if (shardRouting.primary()) {
assertTrue(shardRouting.active());
} else {
if (numToStart > 0) {
shardRouting = shardRouting.initialize(randomAsciiOfLength(8), null, shardRouting.getExpectedShardSize())
.moveToStarted();
numToStart--;
}
}
newIndexRoutingTable.addShard(shardRouting);
}
}
routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build();
return ClusterState.builder(clusterState).routingTable(routingTable).build();
}
private ClusterState startWaitOnShards(final ClusterState clusterState, final String indexName,
final ActiveShardCount waitForActiveShards) {
RoutingTable routingTable = clusterState.routingTable();
IndexRoutingTable indexRoutingTable = routingTable.index(indexName);
IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex());
for (final ObjectCursor<IndexShardRoutingTable> shardEntry : indexRoutingTable.getShards().values()) {
final IndexShardRoutingTable shardRoutingTable = shardEntry.value;
assert shardRoutingTable.getSize() > 2;
int numToStart = waitForActiveShards.resolve(clusterState.metaData().index(indexName)) - 1; // primary is already started
for (ShardRouting shardRouting : shardRoutingTable.getShards()) {
if (shardRouting.primary()) {
assertTrue(shardRouting.active());
} else {
if (shardRouting.active() == false) {
if (numToStart > 0) {
shardRouting = shardRouting.initialize(randomAsciiOfLength(8), null, shardRouting.getExpectedShardSize())
.moveToStarted();
numToStart--;
}
} else {
numToStart--;
}
}
newIndexRoutingTable.addShard(shardRouting);
}
}
routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build();
return ClusterState.builder(clusterState).routingTable(routingTable).build();
}
private ClusterState startAllShards(final ClusterState clusterState, final String indexName) {
RoutingTable routingTable = clusterState.routingTable();
IndexRoutingTable indexRoutingTable = routingTable.index(indexName);
IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex());
for (final ObjectCursor<IndexShardRoutingTable> shardEntry : indexRoutingTable.getShards().values()) {
final IndexShardRoutingTable shardRoutingTable = shardEntry.value;
for (ShardRouting shardRouting : shardRoutingTable.getShards()) {
if (shardRouting.primary()) {
assertTrue(shardRouting.active());
} else {
if (shardRouting.active() == false) {
shardRouting = shardRouting.initialize(randomAsciiOfLength(8), null, shardRouting.getExpectedShardSize())
.moveToStarted();
}
}
newIndexRoutingTable.addShard(shardRouting);
}
}
routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build();
return ClusterState.builder(clusterState).routingTable(routingTable).build();
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING;
import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
/**
* Tests that the index creation operation waits for the appropriate
* number of active shards to be started before returning.
*/
public class ActiveShardsObserverIT extends ESIntegTestCase {
public void testCreateIndexNoActiveShardsTimesOut() throws Exception {
final String indexName = "test-idx";
Settings.Builder settingsBuilder = Settings.builder()
.put(indexSettings())
.put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 5))
.put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0);
if (internalCluster().getNodeNames().length > 0) {
String exclude = String.join(",", internalCluster().getNodeNames());
settingsBuilder.put("index.routing.allocation.exclude._name", exclude);
}
Settings settings = settingsBuilder.build();
assertFalse(prepareCreate(indexName)
.setSettings(settings)
.setWaitForActiveShards(randomBoolean() ? ActiveShardCount.from(1) : ActiveShardCount.ALL)
.setTimeout("100ms")
.get()
.isShardsAcked());
}
public void testCreateIndexNoActiveShardsNoWaiting() throws Exception {
final String indexName = "test-idx";
Settings.Builder settingsBuilder = Settings.builder()
.put(indexSettings())
.put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 5))
.put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0);
if (internalCluster().getNodeNames().length > 0) {
String exclude = String.join(",", internalCluster().getNodeNames());
settingsBuilder.put("index.routing.allocation.exclude._name", exclude);
}
Settings settings = settingsBuilder.build();
CreateIndexResponse response = prepareCreate(indexName)
.setSettings(settings)
.setWaitForActiveShards(ActiveShardCount.from(0))
.get();
assertTrue(response.isAcknowledged());
}
public void testCreateIndexNotEnoughActiveShardsTimesOut() throws Exception {
final String indexName = "test-idx";
final int numDataNodes = internalCluster().numDataNodes();
final int numReplicas = numDataNodes + randomInt(4);
Settings settings = Settings.builder()
.put(indexSettings())
.put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 7))
.put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), numReplicas)
.build();
assertFalse(prepareCreate(indexName)
.setSettings(settings)
.setWaitForActiveShards(ActiveShardCount.from(randomIntBetween(numDataNodes + 1, numReplicas + 1)))
.setTimeout("100ms")
.get()
.isShardsAcked());
}
public void testCreateIndexEnoughActiveShards() throws Exception {
final String indexName = "test-idx";
Settings settings = Settings.builder()
.put(indexSettings())
.put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 7))
.put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), internalCluster().numDataNodes() + randomIntBetween(0, 3))
.build();
ActiveShardCount waitForActiveShards = ActiveShardCount.from(randomIntBetween(0, internalCluster().numDataNodes()));
assertAcked(prepareCreate(indexName).setSettings(settings).setWaitForActiveShards(waitForActiveShards).get());
}
public void testCreateIndexWaitsForAllActiveShards() throws Exception {
final String indexName = "test-idx";
// not enough data nodes, index creation times out
final int numReplicas = internalCluster().numDataNodes() + randomInt(4);
Settings settings = Settings.builder()
.put(indexSettings())
.put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 5))
.put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), numReplicas)
.build();
assertFalse(prepareCreate(indexName)
.setSettings(settings)
.setWaitForActiveShards(ActiveShardCount.ALL)
.setTimeout("100ms")
.get()
.isShardsAcked());
if (client().admin().indices().prepareExists(indexName).get().isExists()) {
assertAcked(client().admin().indices().prepareDelete(indexName));
}
// enough data nodes, all shards are active
settings = Settings.builder()
.put(indexSettings())
.put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 7))
.put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), internalCluster().numDataNodes() - 1)
.build();
assertAcked(prepareCreate(indexName).setSettings(settings).setWaitForActiveShards(ActiveShardCount.ALL).get());
}
public void testCreateIndexStopsWaitingWhenIndexDeleted() throws Exception {
final String indexName = "test-idx";
Settings settings = Settings.builder()
.put(indexSettings())
.put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 5))
.put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), internalCluster().numDataNodes() - 1)
.build();
logger.info("--> start the index creation process");
ListenableActionFuture<CreateIndexResponse> responseListener =
prepareCreate(indexName)
.setSettings(settings)
.setWaitForActiveShards(ActiveShardCount.ALL)
.execute();
logger.info("--> wait until the cluster state contains the new index");
assertBusy(() -> assertTrue(client().admin().cluster().prepareState().get().getState().metaData().hasIndex(indexName)));
logger.info("--> delete the index");
assertAcked(client().admin().indices().prepareDelete(indexName));
logger.info("--> ensure the create index request completes");
assertAcked(responseListener.get());
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster;
import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -40,7 +41,7 @@ import static org.hamcrest.Matchers.equalTo;
public class SimpleDataNodesIT extends ESIntegTestCase { public class SimpleDataNodesIT extends ESIntegTestCase {
public void testDataNodes() throws Exception { public void testDataNodes() throws Exception {
internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build()); internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build());
client().admin().indices().create(createIndexRequest("test")).actionGet(); client().admin().indices().create(createIndexRequest("test").waitForActiveShards(ActiveShardCount.NONE)).actionGet();
try { try {
client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test")).timeout(timeValueSeconds(1))).actionGet(); client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test")).timeout(timeValueSeconds(1))).actionGet();
fail("no allocation should happen"); fail("no allocation should happen");

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster.allocation;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterHealthStatus;
@ -91,7 +92,7 @@ public class ClusterRerouteIT extends ESIntegTestCase {
final String node_2 = nodesIds.get(1); final String node_2 = nodesIds.get(1);
logger.info("--> create an index with 1 shard, 1 replica, nothing should allocate"); logger.info("--> create an index with 1 shard, 1 replica, nothing should allocate");
client().admin().indices().prepareCreate("test") client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE)
.setSettings(Settings.builder().put("index.number_of_shards", 1)) .setSettings(Settings.builder().put("index.number_of_shards", 1))
.execute().actionGet(); .execute().actionGet();
@ -203,7 +204,7 @@ public class ClusterRerouteIT extends ESIntegTestCase {
assertThat(healthResponse.isTimedOut(), equalTo(false)); assertThat(healthResponse.isTimedOut(), equalTo(false));
logger.info("--> create an index with 1 shard, 1 replica, nothing should allocate"); logger.info("--> create an index with 1 shard, 1 replica, nothing should allocate");
client().admin().indices().prepareCreate("test") client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE)
.setSettings(Settings.builder().put("index.number_of_shards", 1)) .setSettings(Settings.builder().put("index.number_of_shards", 1))
.execute().actionGet(); .execute().actionGet();
@ -253,14 +254,13 @@ public class ClusterRerouteIT extends ESIntegTestCase {
assertThat(state.getRoutingNodes().unassigned().size(), equalTo(1)); assertThat(state.getRoutingNodes().unassigned().size(), equalTo(1));
assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_1).getId()).iterator().next().state(), equalTo(ShardRoutingState.INITIALIZING)); assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_1).getId()).iterator().next().state(), equalTo(ShardRoutingState.INITIALIZING));
healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
logger.info("--> get the state, verify shard 1 primary allocated"); logger.info("--> get the state, verify shard 1 primary allocated");
state = client().admin().cluster().prepareState().execute().actionGet().getState(); final String nodeToCheck = node_1;
assertThat(state.getRoutingNodes().unassigned().size(), equalTo(1)); assertBusy(() -> {
assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_1).getId()).iterator().next().state(), equalTo(ShardRoutingState.STARTED)); ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
String nodeId = clusterState.nodes().resolveNode(nodeToCheck).getId();
assertThat(clusterState.getRoutingNodes().node(nodeId).iterator().next().state(), equalTo(ShardRoutingState.STARTED));
});
} }
public void testRerouteExplain() { public void testRerouteExplain() {

View File

@ -216,6 +216,7 @@ public class MetaDataCreateIndexServiceTests extends ESTestCase {
new HashSet<>(), new HashSet<>(),
null, null,
null, null,
null,
null); null);
} }
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing;
import com.carrotsearch.hppc.cursors.IntObjectCursor; import com.carrotsearch.hppc.cursors.IntObjectCursor;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
@ -157,7 +158,8 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
createStaleReplicaScenario(); createStaleReplicaScenario();
logger.info("--> explicitly promote old primary shard"); logger.info("--> explicitly promote old primary shard");
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> storeStatuses = client().admin().indices().prepareShardStores("test").get().getStoreStatuses().get("test"); final String idxName = "test";
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> storeStatuses = client().admin().indices().prepareShardStores(idxName).get().getStoreStatuses().get(idxName);
ClusterRerouteRequestBuilder rerouteBuilder = client().admin().cluster().prepareReroute(); ClusterRerouteRequestBuilder rerouteBuilder = client().admin().cluster().prepareReroute();
for (IntObjectCursor<List<IndicesShardStoresResponse.StoreStatus>> shardStoreStatuses : storeStatuses) { for (IntObjectCursor<List<IndicesShardStoresResponse.StoreStatus>> shardStoreStatuses : storeStatuses) {
int shardId = shardStoreStatuses.key; int shardId = shardStoreStatuses.key;
@ -165,22 +167,30 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
logger.info("--> adding allocation command for shard {}", shardId); logger.info("--> adding allocation command for shard {}", shardId);
// force allocation based on node id // force allocation based on node id
if (useStaleReplica) { if (useStaleReplica) {
rerouteBuilder.add(new AllocateStalePrimaryAllocationCommand("test", shardId, storeStatus.getNode().getId(), true)); rerouteBuilder.add(new AllocateStalePrimaryAllocationCommand(idxName, shardId, storeStatus.getNode().getId(), true));
} else { } else {
rerouteBuilder.add(new AllocateEmptyPrimaryAllocationCommand("test", shardId, storeStatus.getNode().getId(), true)); rerouteBuilder.add(new AllocateEmptyPrimaryAllocationCommand(idxName, shardId, storeStatus.getNode().getId(), true));
} }
} }
rerouteBuilder.get(); rerouteBuilder.get();
logger.info("--> check that the stale primary shard gets allocated and that documents are available"); logger.info("--> check that the stale primary shard gets allocated and that documents are available");
ensureYellow("test"); ensureYellow(idxName);
assertHitCount(client().prepareSearch("test").setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1L : 0L); if (useStaleReplica == false) {
// When invoking AllocateEmptyPrimaryAllocationCommand, due to the UnassignedInfo.Reason being changed to INDEX_CREATION,
// its possible that the shard has not completed initialization, even though the cluster health is yellow, so the
// search can throw an "all shards failed" exception. We will wait until the shard initialization has completed before
// verifying the search hit count.
assertBusy(() -> assertTrue(clusterService().state().routingTable().index(idxName).allPrimaryShardsActive()));
}
assertHitCount(client().prepareSearch(idxName).setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1L : 0L);
} }
public void testForcePrimaryShardIfAllocationDecidersSayNoAfterIndexCreation() throws ExecutionException, InterruptedException { public void testForcePrimaryShardIfAllocationDecidersSayNoAfterIndexCreation() throws ExecutionException, InterruptedException {
String node = internalCluster().startNode(); String node = internalCluster().startNode();
client().admin().indices().prepareCreate("test").setSettings(Settings.builder() client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(Settings.builder()
.put("index.routing.allocation.exclude._name", node) .put("index.routing.allocation.exclude._name", node)
.put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get(); .put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get();

View File

@ -34,7 +34,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
@ -351,7 +350,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
for (AllocationStatus allocationStatus : AllocationStatus.values()) { for (AllocationStatus allocationStatus : AllocationStatus.values()) {
BytesStreamOutput out = new BytesStreamOutput(); BytesStreamOutput out = new BytesStreamOutput();
allocationStatus.writeTo(out); allocationStatus.writeTo(out);
ByteBufferStreamInput in = new ByteBufferStreamInput(ByteBuffer.wrap(out.bytes().toBytes())); ByteBufferStreamInput in = new ByteBufferStreamInput(ByteBuffer.wrap(out.bytes().toBytesRef().bytes));
AllocationStatus readStatus = AllocationStatus.readFrom(in); AllocationStatus readStatus = AllocationStatus.readFrom(in);
assertThat(readStatus, equalTo(allocationStatus)); assertThat(readStatus, equalTo(allocationStatus));
} }

View File

@ -24,6 +24,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -205,7 +206,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build()); internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build());
logger.info("--> create an index"); logger.info("--> create an index");
client().admin().indices().prepareCreate("test").execute().actionGet(); client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).execute().actionGet();
logger.info("--> closing master node"); logger.info("--> closing master node");
internalCluster().closeNonSharedNodes(false); internalCluster().closeNonSharedNodes(false);

View File

@ -158,7 +158,7 @@ public class ClusterStateChanges {
allocationService, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, indicesService, nodeServicesProvider); allocationService, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, indicesService, nodeServicesProvider);
MetaDataCreateIndexService createIndexService = new MetaDataCreateIndexService(settings, clusterService, indicesService, MetaDataCreateIndexService createIndexService = new MetaDataCreateIndexService(settings, clusterService, indicesService,
allocationService, new AliasValidator(settings), Collections.emptySet(), environment, allocationService, new AliasValidator(settings), Collections.emptySet(), environment,
nodeServicesProvider, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS); nodeServicesProvider, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, threadPool);
transportCloseIndexAction = new TransportCloseIndexAction(settings, transportService, clusterService, threadPool, transportCloseIndexAction = new TransportCloseIndexAction(settings, transportService, clusterService, threadPool,
indexStateService, clusterSettings, actionFilters, indexNameExpressionResolver, destructiveOperations); indexStateService, clusterSettings, actionFilters, indexNameExpressionResolver, destructiveOperations);

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -140,7 +141,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
CreateIndexRequest request = new CreateIndexRequest(name, Settings.builder() CreateIndexRequest request = new CreateIndexRequest(name, Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)) .put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3))
.put(SETTING_NUMBER_OF_REPLICAS, randomInt(2)) .put(SETTING_NUMBER_OF_REPLICAS, randomInt(2))
.build()); .build()).waitForActiveShards(ActiveShardCount.NONE);
state = cluster.createIndex(state, request); state = cluster.createIndex(state, request);
assertTrue(state.metaData().hasIndex(name)); assertTrue(state.metaData().hasIndex(name));
} }

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
@ -213,7 +214,7 @@ public class FlushIT extends ESIntegTestCase {
public void testUnallocatedShardsDoesNotHang() throws InterruptedException { public void testUnallocatedShardsDoesNotHang() throws InterruptedException {
// create an index but disallow allocation // create an index but disallow allocation
prepareCreate("test").setSettings(Settings.builder().put("index.routing.allocation.include._name", "nonexistent")).get(); prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(Settings.builder().put("index.routing.allocation.include._name", "nonexistent")).get();
// this should not hang but instead immediately return with empty result set // this should not hang but instead immediately return with empty result set
List<ShardsSyncedFlushResult> shardsResult = client().admin().indices().prepareSyncedFlush("test").get().getShardsResultPerIndex().get("test"); List<ShardsSyncedFlushResult> shardsResult = client().admin().indices().prepareSyncedFlush("test").get().getShardsResultPerIndex().get("test");

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
@ -98,7 +99,7 @@ public class SimpleIndexStateIT extends ESIntegTestCase {
public void testFastCloseAfterCreateContinuesCreateAfterOpen() { public void testFastCloseAfterCreateContinuesCreateAfterOpen() {
logger.info("--> creating test index that cannot be allocated"); logger.info("--> creating test index that cannot be allocated");
client().admin().indices().prepareCreate("test").setSettings(Settings.builder() client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(Settings.builder()
.put("index.routing.allocation.include.tag", "no_such_node").build()).get(); .put("index.routing.allocation.include.tag", "no_such_node").build()).get();
ClusterHealthResponse health = client().admin().cluster().prepareHealth("test").setWaitForNodes(">=2").get(); ClusterHealthResponse health = client().admin().cluster().prepareHealth("test").setWaitForNodes(">=2").get();

View File

@ -29,7 +29,9 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -45,6 +47,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.ttl.IndicesTTLService; import org.elasticsearch.indices.ttl.IndicesTTLService;
@ -443,9 +446,9 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
logger.info("--> create an index that will have no allocated shards"); logger.info("--> create an index that will have no allocated shards");
assertAcked(prepareCreate("test-idx-none", 1, Settings.builder().put("number_of_shards", 6) assertAcked(prepareCreate("test-idx-none", 1, Settings.builder().put("number_of_shards", 6)
.put("index.routing.allocation.include.tag", "nowhere") .put("index.routing.allocation.include.tag", "nowhere")
.put("number_of_replicas", 0))); .put("number_of_replicas", 0)).setWaitForActiveShards(ActiveShardCount.NONE).get());
assertTrue(client().admin().indices().prepareExists("test-idx-none").get().isExists());
logger.info("--> create repository");
logger.info("--> creating repository"); logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse = client().admin().cluster().preparePutRepository("test-repo") PutRepositoryResponse putRepositoryResponse = client().admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder().put("location", randomRepoPath())).execute().actionGet(); .setType("fs").setSettings(Settings.builder().put("location", randomRepoPath())).execute().actionGet();

View File

@ -39,6 +39,7 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterStateUpdateTask;
@ -755,7 +756,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
.put("location", randomRepoPath()))); .put("location", randomRepoPath())));
logger.info("--> creating index that cannot be allocated"); logger.info("--> creating index that cannot be allocated");
prepareCreate("test-idx", 2, Settings.builder().put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + ".tag", "nowhere").put("index.number_of_shards", 3)).get(); prepareCreate("test-idx", 2, Settings.builder().put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + ".tag", "nowhere").put("index.number_of_shards", 3)).setWaitForActiveShards(ActiveShardCount.NONE).get();
logger.info("--> snapshot"); logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get(); CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get();

View File

@ -13,6 +13,10 @@
} }
}, },
"params": { "params": {
"wait_for_active_shards": {
"type" : "string",
"description" : "Set the number of active shards to wait for before the operation returns."
},
"timeout": { "timeout": {
"type" : "time", "type" : "time",
"description" : "Explicit operation timeout" "description" : "Explicit operation timeout"

View File

@ -25,6 +25,10 @@
"master_timeout": { "master_timeout": {
"type" : "time", "type" : "time",
"description" : "Specify timeout for connection to master" "description" : "Specify timeout for connection to master"
},
"wait_for_active_shards": {
"type" : "string",
"description" : "Set the number of active shards to wait for on the newly created rollover index before the operation returns."
} }
} }
}, },

View File

@ -25,6 +25,10 @@
"master_timeout": { "master_timeout": {
"type" : "time", "type" : "time",
"description" : "Specify timeout for connection to master" "description" : "Specify timeout for connection to master"
},
"wait_for_active_shards": {
"type" : "string",
"description" : "Set the number of active shards to wait for on the shrunken index before the operation returns."
} }
} }
}, },

View File

@ -30,6 +30,35 @@
- match: { test_index.settings.index.number_of_replicas: "0"} - match: { test_index.settings.index.number_of_replicas: "0"}
---
"Create index with too large wait_for_active_shards":
- do:
indices.create:
index: test_index
timeout: 100ms
master_timeout: 100ms
wait_for_active_shards: 6
body:
settings:
number_of_replicas: 5
- match: { shards_acknowledged: false }
---
"Create index with wait_for_active_shards set to all":
- do:
indices.create:
index: test_index
wait_for_active_shards: all
body:
settings:
number_of_replicas: "0"
- match: { acknowledged: true }
- match: { shards_acknowledged: true }
--- ---
"Create index with aliases": "Create index with aliases":

View File

@ -4,6 +4,7 @@
- do: - do:
indices.create: indices.create:
index: logs-1 index: logs-1
wait_for_active_shards: 1
body: body:
aliases: aliases:
logs_index: {} logs_index: {}
@ -30,11 +31,12 @@
# perform alias rollover # perform alias rollover
- do: - do:
indices.rollover: indices.rollover:
alias: "logs_search" alias: "logs_search"
body: wait_for_active_shards: 1
conditions: body:
max_docs: 1 conditions:
max_docs: 1
- match: { old_index: logs-1 } - match: { old_index: logs-1 }
- match: { new_index: logs-2 } - match: { new_index: logs-2 }

View File

@ -6,6 +6,7 @@
- do: - do:
indices.create: indices.create:
index: source index: source
wait_for_active_shards: 1
body: body:
settings: settings:
number_of_replicas: "0" number_of_replicas: "0"
@ -53,6 +54,7 @@
indices.shrink: indices.shrink:
index: "source" index: "source"
target: "target" target: "target"
wait_for_active_shards: 1
body: body:
settings: settings:
index.number_of_replicas: 0 index.number_of_replicas: 0

View File

@ -30,6 +30,7 @@ import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistResponse; import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
@ -123,6 +124,17 @@ public class ElasticsearchAssertions {
assertVersionSerializable(response); assertVersionSerializable(response);
} }
/**
* Assert that an index creation was fully acknowledged, meaning that both the index creation cluster
* state update was successful and that the requisite number of shard copies were started before returning.
*/
public static void assertAcked(CreateIndexResponse response) {
assertThat(response.getClass().getSimpleName() + " failed - not acked", response.isAcknowledged(), equalTo(true));
assertVersionSerializable(response);
assertTrue(response.getClass().getSimpleName() + " failed - index creation acked but not all shards were started",
response.isShardsAcked());
}
/** /**
* Executes the request and fails if the request has not been blocked. * Executes the request and fails if the request has not been blocked.
* *