diff --git a/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java new file mode 100644 index 00000000000..3286ecdd43e --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java @@ -0,0 +1,111 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.create; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; +import org.elasticsearch.cluster.block.ClusterBlock; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; + +import java.util.Map; +import java.util.Set; + +import static com.google.common.collect.Maps.newHashMap; + +/** + * Cluster state update request that allows to create an index + */ +public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequest { + + final String cause; + final String index; + + private IndexMetaData.State state = IndexMetaData.State.OPEN; + + private Settings settings = ImmutableSettings.Builder.EMPTY_SETTINGS; + + private Map mappings = Maps.newHashMap(); + + private Map customs = newHashMap(); + + private Set blocks = Sets.newHashSet(); + + + CreateIndexClusterStateUpdateRequest(String cause, String index) { + this.cause = cause; + this.index = index; + } + + public CreateIndexClusterStateUpdateRequest settings(Settings settings) { + this.settings = settings; + return this; + } + + public CreateIndexClusterStateUpdateRequest mappings(Map mappings) { + this.mappings.putAll(mappings); + return this; + } + + public CreateIndexClusterStateUpdateRequest customs(Map customs) { + this.customs.putAll(customs); + return this; + } + + public CreateIndexClusterStateUpdateRequest blocks(Set blocks) { + this.blocks.addAll(blocks); + return this; + } + + public CreateIndexClusterStateUpdateRequest state(IndexMetaData.State state) { + this.state = state; + return this; + } + + public String cause() { + return cause; + } + + public String index() { + return index; + } + + public IndexMetaData.State state() { + return state; + } + + public Settings settings() { + return settings; + } + + public Map mappings() { + return mappings; + } + + public Map customs() { + return customs; + } + + public Set blocks() { + return blocks; + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java index 9814efc7f8f..f8760f8bea7 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java @@ -26,7 +26,6 @@ import org.elasticsearch.ElasticSearchParseException; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.support.master.AcknowledgedRequest; -import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -35,7 +34,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; @@ -48,7 +46,6 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; import static org.elasticsearch.common.settings.ImmutableSettings.readSettingsFromStream; import static org.elasticsearch.common.settings.ImmutableSettings.writeSettingsToStream; -import static org.elasticsearch.common.unit.TimeValue.readTimeValue; /** * A request to create an index. Best created with {@link org.elasticsearch.client.Requests#createIndexRequest(String)}. @@ -59,7 +56,7 @@ import static org.elasticsearch.common.unit.TimeValue.readTimeValue; * @see org.elasticsearch.client.Requests#createIndexRequest(String) * @see CreateIndexResponse */ -public class CreateIndexRequest extends MasterNodeOperationRequest { +public class CreateIndexRequest extends AcknowledgedRequest { private String cause = ""; @@ -71,8 +68,6 @@ public class CreateIndexRequest extends MasterNodeOperationRequest customs = newHashMap(); - private TimeValue timeout = AcknowledgedRequest.DEFAULT_ACK_TIMEOUT; - CreateIndexRequest() { } @@ -173,6 +168,7 @@ public class CreateIndexRequest extends MasterNodeOperationRequest source) { boolean found = false; for (Map.Entry entry : source.entrySet()) { @@ -338,38 +336,13 @@ public class CreateIndexRequest extends MasterNodeOperationRequest10s. - */ - public TimeValue timeout() { - return timeout; - } - - /** - * Timeout to wait for the index creation to be acknowledged by current cluster nodes. Defaults - * to 10s. - */ - public CreateIndexRequest timeout(TimeValue timeout) { - this.timeout = timeout; - return this; - } - - /** - * Timeout to wait for the index creation to be acknowledged by current cluster nodes. Defaults - * to 10s. - */ - public CreateIndexRequest timeout(String timeout) { - return timeout(TimeValue.parseTimeValue(timeout, null)); - } - @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); cause = in.readString(); index = in.readString(); settings = readSettingsFromStream(in); - timeout = readTimeValue(in); + readTimeout(in); int size = in.readVInt(); for (int i = 0; i < size; i++) { mappings.put(in.readString(), in.readString()); @@ -388,7 +361,7 @@ public class CreateIndexRequest extends MasterNodeOperationRequest entry : mappings.entrySet()) { out.writeString(entry.getKey()); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java b/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java index 2dcf7d4994d..429e2c0c340 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java @@ -20,21 +20,20 @@ package org.elasticsearch.action.admin.indices.create; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder; import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.client.internal.InternalIndicesAdminClient; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import java.util.Map; /** - * + * Builder for a create index request */ -public class CreateIndexRequestBuilder extends MasterNodeOperationRequestBuilder { +public class CreateIndexRequestBuilder extends AcknowledgedRequestBuilder { public CreateIndexRequestBuilder(IndicesAdminClient indicesClient) { super((InternalIndicesAdminClient) indicesClient, new CreateIndexRequest()); @@ -200,24 +199,6 @@ public class CreateIndexRequestBuilder extends MasterNodeOperationRequestBuilder return this; } - /** - * Timeout to wait for the index creation to be acknowledged by current cluster nodes. Defaults - * to 10s. - */ - public CreateIndexRequestBuilder setTimeout(TimeValue timeout) { - request.timeout(timeout); - return this; - } - - /** - * Timeout to wait for the index creation to be acknowledged by current cluster nodes. Defaults - * to 10s. - */ - public CreateIndexRequestBuilder setTimeout(String timeout) { - request.timeout(timeout); - return this; - } - @Override protected void doExecute(ActionListener listener) { ((IndicesAdminClient) client).create(request, listener); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java index 7052861e8c5..c110a491574 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.admin.indices.create; -import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -28,34 +28,24 @@ import java.io.IOException; /** * A response for a create index action. */ -public class CreateIndexResponse extends ActionResponse { - - private boolean acknowledged; +public class CreateIndexResponse extends AcknowledgedResponse { CreateIndexResponse() { } CreateIndexResponse(boolean acknowledged) { - this.acknowledged = acknowledged; - } - - /** - * Has the index creation been acknowledged by all current cluster nodes within the - * provided {@link CreateIndexRequest#timeout(org.elasticsearch.common.unit.TimeValue)}. - */ - public boolean isAcknowledged() { - return acknowledged; + super(acknowledged); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - acknowledged = in.readBoolean(); + readAcknowledged(in, null); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeBoolean(acknowledged); + writeAcknowledged(out, null); } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java b/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java index 1185016adf7..081ff8223e0 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java @@ -24,6 +24,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; @@ -80,26 +82,27 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi cause = "api"; } - createIndexService.createIndex(new MetaDataCreateIndexService.Request(cause, request.index()).settings(request.settings()) - .mappings(request.mappings()) - .customs(request.customs()) - .timeout(request.timeout()) - .masterTimeout(request.masterNodeTimeout()), - new MetaDataCreateIndexService.Listener() { - @Override - public void onResponse(MetaDataCreateIndexService.Response response) { - listener.onResponse(new CreateIndexResponse(response.acknowledged())); - } + CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(cause, request.index()) + .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) + .settings(request.settings()).mappings(request.mappings()) + .customs(request.customs()); - @Override - public void onFailure(Throwable t) { - if (t instanceof IndexAlreadyExistsException) { - logger.trace("[{}] failed to create", t, request.index()); - } else { - logger.debug("[{}] failed to create", t, request.index()); - } - listener.onFailure(t); - } - }); + createIndexService.createIndex(updateRequest, new ClusterStateUpdateListener() { + + @Override + public void onResponse(ClusterStateUpdateResponse response) { + listener.onResponse(new CreateIndexResponse(response.isAcknowledged())); + } + + @Override + public void onFailure(Throwable t) { + if (t instanceof IndexAlreadyExistsException) { + logger.trace("[{}] failed to create", t, request.index()); + } else { + logger.debug("[{}] failed to create", t, request.index()); + } + listener.onFailure(t); + } + }); } } diff --git a/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 5edfa236e04..ae123dd45c7 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -71,7 +71,6 @@ public class ClusterModule extends AbstractModule implements SpawnModules { bind(RoutingService.class).asEagerSingleton(); bind(ShardStateAction.class).asEagerSingleton(); - bind(NodeIndexCreatedAction.class).asEagerSingleton(); bind(NodeIndexDeletedAction.class).asEagerSingleton(); bind(NodeMappingRefreshAction.class).asEagerSingleton(); bind(MappingUpdatedAction.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexCreatedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexCreatedAction.java deleted file mode 100644 index 71c64970c6b..00000000000 --- a/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexCreatedAction.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cluster.action.index; - -import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -/** - * - */ -public class NodeIndexCreatedAction extends AbstractComponent { - - private final ThreadPool threadPool; - private final TransportService transportService; - private final List listeners = new CopyOnWriteArrayList(); - - @Inject - public NodeIndexCreatedAction(Settings settings, ThreadPool threadPool, TransportService transportService) { - super(settings); - this.threadPool = threadPool; - this.transportService = transportService; - transportService.registerHandler(NodeIndexCreatedTransportHandler.ACTION, new NodeIndexCreatedTransportHandler()); - } - - public void add(Listener listener) { - listeners.add(listener); - } - - public void remove(Listener listener) { - listeners.remove(listener); - } - - public void nodeIndexCreated(final ClusterState clusterState, final String index, final String nodeId) throws ElasticSearchException { - DiscoveryNodes nodes = clusterState.nodes(); - if (nodes.localNodeMaster()) { - threadPool.generic().execute(new Runnable() { - @Override - public void run() { - innerNodeIndexCreated(index, nodeId); - } - }); - } else { - transportService.sendRequest(clusterState.nodes().masterNode(), - NodeIndexCreatedTransportHandler.ACTION, new NodeIndexCreatedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME); - } - } - - private void innerNodeIndexCreated(String index, String nodeId) { - for (Listener listener : listeners) { - listener.onNodeIndexCreated(index, nodeId); - } - } - - public static interface Listener { - void onNodeIndexCreated(String index, String nodeId); - } - - private class NodeIndexCreatedTransportHandler extends BaseTransportRequestHandler { - - static final String ACTION = "cluster/nodeIndexCreated"; - - @Override - public NodeIndexCreatedMessage newInstance() { - return new NodeIndexCreatedMessage(); - } - - @Override - public void messageReceived(NodeIndexCreatedMessage message, TransportChannel channel) throws Exception { - innerNodeIndexCreated(message.index, message.nodeId); - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - } - - static class NodeIndexCreatedMessage extends TransportRequest { - String index; - String nodeId; - - NodeIndexCreatedMessage() { - } - - NodeIndexCreatedMessage(String index, String nodeId) { - this.index = index; - this.nodeId = nodeId; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(index); - out.writeString(nodeId); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - index = in.readString(); - nodeId = in.readString(); - } - } -} diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index a394166439d..cca1cf3c952 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -24,21 +24,23 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.google.common.base.Charsets; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.Version; -import org.elasticsearch.action.support.master.MasterNodeOperationRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; -import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction; +import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractComponent; @@ -66,21 +68,19 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.File; import java.io.FileInputStream; -import java.io.IOException; import java.io.InputStreamReader; -import java.util.*; -import java.util.concurrent.ScheduledFuture; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import static com.google.common.collect.Maps.newHashMap; import static org.elasticsearch.cluster.metadata.IndexMetaData.*; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; /** - * + * Service responsible for submitting create index requests */ public class MetaDataCreateIndexService extends AbstractComponent { @@ -89,29 +89,27 @@ public class MetaDataCreateIndexService extends AbstractComponent { private final ClusterService clusterService; private final IndicesService indicesService; private final AllocationService allocationService; - private final NodeIndexCreatedAction nodeIndexCreatedAction; private final MetaDataService metaDataService; private final Version version; private final String riverIndexName; @Inject public MetaDataCreateIndexService(Settings settings, Environment environment, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService, - AllocationService allocationService, NodeIndexCreatedAction nodeIndexCreatedAction, MetaDataService metaDataService, Version version, @RiverIndexName String riverIndexName) { + AllocationService allocationService, MetaDataService metaDataService, Version version, @RiverIndexName String riverIndexName) { super(settings); this.environment = environment; this.threadPool = threadPool; this.clusterService = clusterService; this.indicesService = indicesService; this.allocationService = allocationService; - this.nodeIndexCreatedAction = nodeIndexCreatedAction; this.metaDataService = metaDataService; this.version = version; this.riverIndexName = riverIndexName; } - public void createIndex(final Request request, final Listener userListener) { + public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) { ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder(); - for (Map.Entry entry : request.settings.getAsMap().entrySet()) { + for (Map.Entry entry : request.settings().getAsMap().entrySet()) { if (!entry.getKey().startsWith("index.")) { updatedSettingsBuilder.put("index." + entry.getKey(), entry.getValue()); } else { @@ -122,11 +120,11 @@ public class MetaDataCreateIndexService extends AbstractComponent { // we lock here, and not within the cluster service callback since we don't want to // block the whole cluster state handling - final Semaphore mdLock = metaDataService.indexMetaDataLock(request.index); + final Semaphore mdLock = metaDataService.indexMetaDataLock(request.index()); // quick check to see if we can acquire a lock, otherwise spawn to a thread pool if (mdLock.tryAcquire()) { - createIndex(request, userListener, mdLock); + createIndex(request, listener, mdLock); return; } @@ -134,16 +132,17 @@ public class MetaDataCreateIndexService extends AbstractComponent { @Override public void run() { try { - if (!mdLock.tryAcquire(request.masterTimeout.nanos(), TimeUnit.NANOSECONDS)) { - userListener.onFailure(new ProcessClusterEventTimeoutException(request.masterTimeout, "acquire index lock")); + if (!mdLock.tryAcquire(request.masterNodeTimeout().nanos(), TimeUnit.NANOSECONDS)) { + listener.onFailure(new ProcessClusterEventTimeoutException(request.masterNodeTimeout(), "acquire index lock")); return; } } catch (InterruptedException e) { - userListener.onFailure(e); + Thread.interrupted(); + listener.onFailure(e); return; } - createIndex(request, userListener, mdLock); + createIndex(request, listener, mdLock); } }); } @@ -172,17 +171,39 @@ public class MetaDataCreateIndexService extends AbstractComponent { } } - private void createIndex(final Request request, final Listener userListener, Semaphore mdLock) { - final CreateIndexListener listener = new CreateIndexListener(mdLock, request, userListener); - clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() { + private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ClusterStateUpdateListener listener, final Semaphore mdLock) { + clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]", Priority.URGENT, new AckedClusterStateUpdateTask() { + + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } + + @Override + public void onAllNodesAcked(@Nullable Throwable t) { + mdLock.release(); + listener.onResponse(new ClusterStateUpdateResponse(true)); + } + + @Override + public void onAckTimeout() { + mdLock.release(); + listener.onResponse(new ClusterStateUpdateResponse(false)); + } + + @Override + public TimeValue ackTimeout() { + return request.ackTimeout(); + } @Override public TimeValue timeout() { - return request.masterTimeout; + return request.masterNodeTimeout(); } @Override public void onFailure(String source, Throwable t) { + mdLock.release(); listener.onFailure(t); } @@ -202,11 +223,11 @@ public class MetaDataCreateIndexService extends AbstractComponent { // add the request mapping Map> mappings = Maps.newHashMap(); - for (Map.Entry entry : request.mappings.entrySet()) { + for (Map.Entry entry : request.mappings().entrySet()) { mappings.put(entry.getKey(), parseMapping(entry.getValue())); } - for (Map.Entry entry : request.customs.entrySet()) { + for (Map.Entry entry : request.customs().entrySet()) { customs.put(entry.getKey(), entry.getValue()); } @@ -237,7 +258,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { File mappingsDir = new File(environment.configFile(), "mappings"); if (mappingsDir.exists() && mappingsDir.isDirectory()) { // first index level - File indexMappingsDir = new File(mappingsDir, request.index); + File indexMappingsDir = new File(mappingsDir, request.index()); if (indexMappingsDir.exists() && indexMappingsDir.isDirectory()) { addMappings(mappings, indexMappingsDir); } @@ -255,17 +276,17 @@ public class MetaDataCreateIndexService extends AbstractComponent { indexSettingsBuilder.put(templates.get(i).settings()); } // now, put the request settings, so they override templates - indexSettingsBuilder.put(request.settings); + indexSettingsBuilder.put(request.settings()); if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null) { - if (request.index.equals(riverIndexName)) { + if (request.index().equals(riverIndexName)) { indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1)); } else { indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5)); } } if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null) { - if (request.index.equals(riverIndexName)) { + if (request.index().equals(riverIndexName)) { indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1)); } else { indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1)); @@ -286,10 +307,10 @@ public class MetaDataCreateIndexService extends AbstractComponent { // Set up everything, now locally create the index to see that things are ok, and apply // create the index here (on the master) to validate it can be created, as well as adding the mapping - indicesService.createIndex(request.index, actualIndexSettings, clusterService.localNode().id()); + indicesService.createIndex(request.index(), actualIndexSettings, clusterService.localNode().id()); indexCreated = true; // now add the mappings - IndexService indexService = indicesService.indexServiceSafe(request.index); + IndexService indexService = indicesService.indexServiceSafe(request.index()); MapperService mapperService = indexService.mapperService(); // first, add the default mapping if (mappings.containsKey(MapperService.DEFAULT_MAPPING)) { @@ -319,14 +340,14 @@ public class MetaDataCreateIndexService extends AbstractComponent { mappingsMetaData.put(mapper.type(), mappingMd); } - final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index).settings(actualIndexSettings); + final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index()).settings(actualIndexSettings); for (MappingMetaData mappingMd : mappingsMetaData.values()) { indexMetaDataBuilder.putMapping(mappingMd); } for (Map.Entry customEntry : customs.entrySet()) { indexMetaDataBuilder.putCustom(customEntry.getKey(), customEntry.getValue()); } - indexMetaDataBuilder.state(request.state); + indexMetaDataBuilder.state(request.state()); final IndexMetaData indexMetaData; try { indexMetaData = indexMetaDataBuilder.build(); @@ -339,57 +360,31 @@ public class MetaDataCreateIndexService extends AbstractComponent { .put(indexMetaData, false) .build(); - logger.info("[{}] creating index, cause [{}], shards [{}]/[{}], mappings {}", request.index, request.cause, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), mappings.keySet()); + logger.info("[{}] creating index, cause [{}], shards [{}]/[{}], mappings {}", request.index(), request.cause(), indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), mappings.keySet()); ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); - if (!request.blocks.isEmpty()) { - for (ClusterBlock block : request.blocks) { - blocks.addIndexBlock(request.index, block); + if (!request.blocks().isEmpty()) { + for (ClusterBlock block : request.blocks()) { + blocks.addIndexBlock(request.index(), block); } } - if (request.state == State.CLOSE) { - blocks.addIndexBlock(request.index, MetaDataIndexStateService.INDEX_CLOSED_BLOCK); + if (request.state() == State.CLOSE) { + blocks.addIndexBlock(request.index(), MetaDataIndexStateService.INDEX_CLOSED_BLOCK); } ClusterState updatedState = ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build(); - if (request.state == State.OPEN) { + if (request.state() == State.OPEN) { RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable()) - .addAsNew(updatedState.metaData().index(request.index)); + .addAsNew(updatedState.metaData().index(request.index())); RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(routingTableBuilder).build()); updatedState = ClusterState.builder(updatedState).routingResult(routingResult).build(); } - - // we wait for events from all nodes that the index has been added to the metadata - final AtomicInteger counter = new AtomicInteger(currentState.nodes().size()); - - final NodeIndexCreatedAction.Listener nodeIndexCreatedListener = new NodeIndexCreatedAction.Listener() { - @Override - public void onNodeIndexCreated(String index, String nodeId) { - if (index.equals(request.index)) { - if (counter.decrementAndGet() == 0) { - listener.onResponse(new Response(true, indexMetaData)); - nodeIndexCreatedAction.remove(this); - } - } - } - }; - - nodeIndexCreatedAction.add(nodeIndexCreatedListener); - - listener.future = threadPool.schedule(request.timeout, ThreadPool.Names.SAME, new Runnable() { - @Override - public void run() { - listener.onResponse(new Response(false, indexMetaData)); - nodeIndexCreatedAction.remove(nodeIndexCreatedListener); - } - }); - return updatedState; } finally { if (indexCreated) { // Index was already partially created - need to clean up - indicesService.removeIndex(request.index, failureReason != null ? failureReason : "failed to create index"); + indicesService.removeIndex(request.index(), failureReason != null ? failureReason : "failed to create index"); } } } @@ -400,44 +395,6 @@ public class MetaDataCreateIndexService extends AbstractComponent { }); } - class CreateIndexListener implements Listener { - - private final AtomicBoolean notified = new AtomicBoolean(); - private final Semaphore mdLock; - private final Request request; - private final Listener listener; - volatile ScheduledFuture future; - - private CreateIndexListener(Semaphore mdLock, Request request, Listener listener) { - this.mdLock = mdLock; - this.request = request; - this.listener = listener; - } - - @Override - public void onResponse(final Response response) { - if (notified.compareAndSet(false, true)) { - mdLock.release(); - if (future != null) { - future.cancel(false); - } - listener.onResponse(response); - } - } - - @Override - public void onFailure(Throwable t) { - if (notified.compareAndSet(false, true)) { - mdLock.release(); - if (future != null) { - future.cancel(false); - } - listener.onFailure(t); - } - } - } - - private Map parseMapping(String mappingSource) throws Exception { return XContentFactory.xContent(mappingSource).createParser(mappingSource).mapAndClose(); } @@ -463,11 +420,11 @@ public class MetaDataCreateIndexService extends AbstractComponent { } } - private List findTemplates(Request request, ClusterState state) { + private List findTemplates(CreateIndexClusterStateUpdateRequest request, ClusterState state) { List templates = Lists.newArrayList(); for (ObjectCursor cursor : state.metaData().templates().values()) { IndexTemplateMetaData template = cursor.value; - if (Regex.simpleMatch(template.template(), request.index)) { + if (Regex.simpleMatch(template.template(), request.index())) { templates.add(template); } } @@ -483,11 +440,11 @@ public class MetaDataCreateIndexService extends AbstractComponent { byte[] templatesData = Streams.copyToByteArray(templatesFile); parser = XContentHelper.createParser(templatesData, 0, templatesData.length); IndexTemplateMetaData template = IndexTemplateMetaData.Builder.fromXContent(parser); - if (Regex.simpleMatch(template.template(), request.index)) { + if (Regex.simpleMatch(template.template(), request.index())) { templates.add(template); } } catch (Exception e) { - logger.warn("[{}] failed to read template [{}] from config", e, request.index, templatesFile.getAbsolutePath()); + logger.warn("[{}] failed to read template [{}] from config", e, request.index(), templatesFile.getAbsolutePath()); } finally { IOUtils.closeWhileHandlingException(parser); } @@ -504,106 +461,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { return templates; } - private void validate(Request request, ClusterState state) throws ElasticSearchException { - validateIndexName(request.index, state); - } - - public static interface Listener { - - void onResponse(Response response); - - void onFailure(Throwable t); - } - - public static class Request { - - final String cause; - final String index; - - State state = State.OPEN; - - Settings settings = ImmutableSettings.Builder.EMPTY_SETTINGS; - - Map mappings = Maps.newHashMap(); - - Map customs = newHashMap(); - - - TimeValue timeout = TimeValue.timeValueSeconds(5); - TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT; - - Set blocks = Sets.newHashSet(); - - public Request(String cause, String index) { - this.cause = cause; - this.index = index; - } - - public Request settings(Settings settings) { - this.settings = settings; - return this; - } - - public Request mappings(Map mappings) { - this.mappings.putAll(mappings); - return this; - } - - public Request mappingsMetaData(Map mappings) throws IOException { - for (Map.Entry entry : mappings.entrySet()) { - this.mappings.put(entry.getKey(), entry.getValue().source().string()); - } - return this; - } - - public Request mappingsCompressed(Map mappings) throws IOException { - for (Map.Entry entry : mappings.entrySet()) { - this.mappings.put(entry.getKey(), entry.getValue().string()); - } - return this; - } - - public Request customs(Map customs) { - this.customs.putAll(customs); - return this; - } - - public Request blocks(Set blocks) { - this.blocks.addAll(blocks); - return this; - } - - public Request state(State state) { - this.state = state; - return this; - } - - public Request timeout(TimeValue timeout) { - this.timeout = timeout; - return this; - } - - public Request masterTimeout(TimeValue masterTimeout) { - this.masterTimeout = masterTimeout; - return this; - } - } - - public static class Response { - private final boolean acknowledged; - private final IndexMetaData indexMetaData; - - public Response(boolean acknowledged, IndexMetaData indexMetaData) { - this.acknowledged = acknowledged; - this.indexMetaData = indexMetaData; - } - - public boolean acknowledged() { - return acknowledged; - } - - public IndexMetaData indexMetaData() { - return indexMetaData; - } + private void validate(CreateIndexClusterStateUpdateRequest request, ClusterState state) throws ElasticSearchException { + validateIndexName(request.index(), state); } } diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index d559b94fe9b..14f2805dc00 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -28,7 +28,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction; import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -86,7 +85,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent() { - @Override - public void onResponse(CreateIndexResponse response) { - try { - XContentBuilder builder = RestXContentBuilder.restContentBuilder(request); - builder.startObject() - .field("ok", true) - .field("acknowledged", response.isAcknowledged()) - .endObject(); - channel.sendResponse(new XContentRestResponse(request, OK, builder)); - } catch (Throwable e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(new XContentThrowableRestResponse(request, e)); - } catch (IOException e1) { - logger.error("Failed to send failure response", e1); - } - } - }); + client.admin().indices().create(createIndexRequest, new AcknowledgedRestResponseActionListener(request, channel, logger)); } } diff --git a/src/test/java/org/elasticsearch/cluster/ack/AckTests.java b/src/test/java/org/elasticsearch/cluster/ack/AckTests.java index 3f017a278ea..cb51e4c356b 100644 --- a/src/test/java/org/elasticsearch/cluster/ack/AckTests.java +++ b/src/test/java/org/elasticsearch/cluster/ack/AckTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResp import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; @@ -469,6 +470,28 @@ public class AckTests extends ElasticsearchIntegrationTest { assertThat(putMappingResponse.isAcknowledged(), equalTo(false)); } + @Test + public void testCreateIndexAcknowledgement() { + createIndex("test"); + + for (Client client : clients()) { + assertThat(getLocalClusterState(client).metaData().indices().containsKey("test"), equalTo(true)); + } + + //let's wait for green, otherwise there can be issues with after test checks (mock directory wrapper etc.) + //but we do want to check that the new index is on all nodes cluster state even before green + ensureGreen(); + } + + @Test + public void testCreateIndexNoAcknowledgement() { + CreateIndexResponse createIndexResponse = client().admin().indices().prepareCreate("test").setTimeout("0s").get(); + assertThat(createIndexResponse.isAcknowledged(), equalTo(false)); + + //let's wait for green, otherwise there can be issues with after test checks (mock directory wrapper etc.) + ensureGreen(); + } + private static ClusterState getLocalClusterState(Client client) { return client.admin().cluster().prepareState().setLocal(true).get().getState(); } diff --git a/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index f54bcb82c9d..24023f486ce 100644 --- a/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -26,12 +26,8 @@ import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ShardOperationFailedException; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.percolate.PercolateResponse; import org.elasticsearch.action.search.SearchRequest; @@ -81,20 +77,11 @@ public class ElasticsearchAssertions { assertAcked(builder.get()); } - public static void assertAcked(CreateIndexRequestBuilder builder) { - assertAcked(builder.get()); - } - public static void assertAcked(DeleteIndexResponse response) { assertThat("Delete Index failed - not acked", response.isAcknowledged(), equalTo(true)); assertVersionSerializable(response); } - public static void assertAcked(CreateIndexResponse response) { - assertThat("Create Index failed - not acked", response.isAcknowledged(), equalTo(true)); - assertVersionSerializable(response); - } - public static String formatShardStatus(BroadcastOperationResponse response) { String msg = " Total shards: " + response.getTotalShards() + " Successful shards: " + response.getSuccessfulShards() + " & " + response.getFailedShards() + " shard failures:";