Refactored create index api to make use of the new recently introduced generic ack mechanism

Closes #4421
This commit is contained in:
Luca Cavanna 2013-10-29 19:35:54 +01:00
parent bb275166f1
commit 6e4d33bb4d
12 changed files with 245 additions and 492 deletions

View File

@ -0,0 +1,111 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.create;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import java.util.Map;
import java.util.Set;
import static com.google.common.collect.Maps.newHashMap;
/**
* Cluster state update request that allows to create an index
*/
public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequest<CreateIndexClusterStateUpdateRequest> {
final String cause;
final String index;
private IndexMetaData.State state = IndexMetaData.State.OPEN;
private Settings settings = ImmutableSettings.Builder.EMPTY_SETTINGS;
private Map<String, String> mappings = Maps.newHashMap();
private Map<String, IndexMetaData.Custom> customs = newHashMap();
private Set<ClusterBlock> blocks = Sets.newHashSet();
CreateIndexClusterStateUpdateRequest(String cause, String index) {
this.cause = cause;
this.index = index;
}
public CreateIndexClusterStateUpdateRequest settings(Settings settings) {
this.settings = settings;
return this;
}
public CreateIndexClusterStateUpdateRequest mappings(Map<String, String> mappings) {
this.mappings.putAll(mappings);
return this;
}
public CreateIndexClusterStateUpdateRequest customs(Map<String, IndexMetaData.Custom> customs) {
this.customs.putAll(customs);
return this;
}
public CreateIndexClusterStateUpdateRequest blocks(Set<ClusterBlock> blocks) {
this.blocks.addAll(blocks);
return this;
}
public CreateIndexClusterStateUpdateRequest state(IndexMetaData.State state) {
this.state = state;
return this;
}
public String cause() {
return cause;
}
public String index() {
return index;
}
public IndexMetaData.State state() {
return state;
}
public Settings settings() {
return settings;
}
public Map<String, String> mappings() {
return mappings;
}
public Map<String, IndexMetaData.Custom> customs() {
return customs;
}
public Set<ClusterBlock> blocks() {
return blocks;
}
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
@ -35,7 +34,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
@ -48,7 +46,6 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.common.settings.ImmutableSettings.readSettingsFromStream; import static org.elasticsearch.common.settings.ImmutableSettings.readSettingsFromStream;
import static org.elasticsearch.common.settings.ImmutableSettings.writeSettingsToStream; import static org.elasticsearch.common.settings.ImmutableSettings.writeSettingsToStream;
import static org.elasticsearch.common.unit.TimeValue.readTimeValue;
/** /**
* A request to create an index. Best created with {@link org.elasticsearch.client.Requests#createIndexRequest(String)}. * A request to create an index. Best created with {@link org.elasticsearch.client.Requests#createIndexRequest(String)}.
@ -59,7 +56,7 @@ import static org.elasticsearch.common.unit.TimeValue.readTimeValue;
* @see org.elasticsearch.client.Requests#createIndexRequest(String) * @see org.elasticsearch.client.Requests#createIndexRequest(String)
* @see CreateIndexResponse * @see CreateIndexResponse
*/ */
public class CreateIndexRequest extends MasterNodeOperationRequest<CreateIndexRequest> { public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest> {
private String cause = ""; private String cause = "";
@ -71,8 +68,6 @@ public class CreateIndexRequest extends MasterNodeOperationRequest<CreateIndexRe
private Map<String, IndexMetaData.Custom> customs = newHashMap(); private Map<String, IndexMetaData.Custom> customs = newHashMap();
private TimeValue timeout = AcknowledgedRequest.DEFAULT_ACK_TIMEOUT;
CreateIndexRequest() { CreateIndexRequest() {
} }
@ -173,6 +168,7 @@ public class CreateIndexRequest extends MasterNodeOperationRequest<CreateIndexRe
/** /**
* The settings to crete the index with (either json/yaml/properties format) * The settings to crete the index with (either json/yaml/properties format)
*/ */
@SuppressWarnings("unchecked")
public CreateIndexRequest settings(Map source) { public CreateIndexRequest settings(Map source) {
try { try {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
@ -224,6 +220,7 @@ public class CreateIndexRequest extends MasterNodeOperationRequest<CreateIndexRe
* @param type The mapping type * @param type The mapping type
* @param source The mapping source * @param source The mapping source
*/ */
@SuppressWarnings("unchecked")
public CreateIndexRequest mapping(String type, Map source) { public CreateIndexRequest mapping(String type, Map source) {
// wrap it in a type map if its not // wrap it in a type map if its not
if (source.size() != 1 || !source.containsKey(type)) { if (source.size() != 1 || !source.containsKey(type)) {
@ -292,6 +289,7 @@ public class CreateIndexRequest extends MasterNodeOperationRequest<CreateIndexRe
/** /**
* Sets the settings and mappings as a single source. * Sets the settings and mappings as a single source.
*/ */
@SuppressWarnings("unchecked")
public CreateIndexRequest source(Map<String, Object> source) { public CreateIndexRequest source(Map<String, Object> source) {
boolean found = false; boolean found = false;
for (Map.Entry<String, Object> entry : source.entrySet()) { for (Map.Entry<String, Object> entry : source.entrySet()) {
@ -338,38 +336,13 @@ public class CreateIndexRequest extends MasterNodeOperationRequest<CreateIndexRe
return this.customs; return this.customs;
} }
/**
* Timeout to wait for the index creation to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
public TimeValue timeout() {
return timeout;
}
/**
* Timeout to wait for the index creation to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
public CreateIndexRequest timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
/**
* Timeout to wait for the index creation to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
public CreateIndexRequest timeout(String timeout) {
return timeout(TimeValue.parseTimeValue(timeout, null));
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
cause = in.readString(); cause = in.readString();
index = in.readString(); index = in.readString();
settings = readSettingsFromStream(in); settings = readSettingsFromStream(in);
timeout = readTimeValue(in); readTimeout(in);
int size = in.readVInt(); int size = in.readVInt();
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
mappings.put(in.readString(), in.readString()); mappings.put(in.readString(), in.readString());
@ -388,7 +361,7 @@ public class CreateIndexRequest extends MasterNodeOperationRequest<CreateIndexRe
out.writeString(cause); out.writeString(cause);
out.writeString(index); out.writeString(index);
writeSettingsToStream(settings, out); writeSettingsToStream(settings, out);
timeout.writeTo(out); writeTimeout(out);
out.writeVInt(mappings.size()); out.writeVInt(mappings.size());
for (Map.Entry<String, String> entry : mappings.entrySet()) { for (Map.Entry<String, String> entry : mappings.entrySet()) {
out.writeString(entry.getKey()); out.writeString(entry.getKey());

View File

@ -20,21 +20,20 @@
package org.elasticsearch.action.admin.indices.create; package org.elasticsearch.action.admin.indices.create;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.internal.InternalIndicesAdminClient; import org.elasticsearch.client.internal.InternalIndicesAdminClient;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import java.util.Map; import java.util.Map;
/** /**
* * Builder for a create index request
*/ */
public class CreateIndexRequestBuilder extends MasterNodeOperationRequestBuilder<CreateIndexRequest, CreateIndexResponse, CreateIndexRequestBuilder> { public class CreateIndexRequestBuilder extends AcknowledgedRequestBuilder<CreateIndexRequest, CreateIndexResponse, CreateIndexRequestBuilder> {
public CreateIndexRequestBuilder(IndicesAdminClient indicesClient) { public CreateIndexRequestBuilder(IndicesAdminClient indicesClient) {
super((InternalIndicesAdminClient) indicesClient, new CreateIndexRequest()); super((InternalIndicesAdminClient) indicesClient, new CreateIndexRequest());
@ -200,24 +199,6 @@ public class CreateIndexRequestBuilder extends MasterNodeOperationRequestBuilder
return this; return this;
} }
/**
* Timeout to wait for the index creation to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
public CreateIndexRequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return this;
}
/**
* Timeout to wait for the index creation to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
public CreateIndexRequestBuilder setTimeout(String timeout) {
request.timeout(timeout);
return this;
}
@Override @Override
protected void doExecute(ActionListener<CreateIndexResponse> listener) { protected void doExecute(ActionListener<CreateIndexResponse> listener) {
((IndicesAdminClient) client).create(request, listener); ((IndicesAdminClient) client).create(request, listener);

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.admin.indices.create; package org.elasticsearch.action.admin.indices.create;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -28,34 +28,24 @@ import java.io.IOException;
/** /**
* A response for a create index action. * A response for a create index action.
*/ */
public class CreateIndexResponse extends ActionResponse { public class CreateIndexResponse extends AcknowledgedResponse {
private boolean acknowledged;
CreateIndexResponse() { CreateIndexResponse() {
} }
CreateIndexResponse(boolean acknowledged) { CreateIndexResponse(boolean acknowledged) {
this.acknowledged = acknowledged; super(acknowledged);
}
/**
* Has the index creation been acknowledged by all current cluster nodes within the
* provided {@link CreateIndexRequest#timeout(org.elasticsearch.common.unit.TimeValue)}.
*/
public boolean isAcknowledged() {
return acknowledged;
} }
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
acknowledged = in.readBoolean(); readAcknowledged(in, null);
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeBoolean(acknowledged); writeAcknowledged(out, null);
} }
} }

View File

@ -24,6 +24,8 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
@ -80,15 +82,16 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi
cause = "api"; cause = "api";
} }
createIndexService.createIndex(new MetaDataCreateIndexService.Request(cause, request.index()).settings(request.settings()) CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(cause, request.index())
.mappings(request.mappings()) .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.customs(request.customs()) .settings(request.settings()).mappings(request.mappings())
.timeout(request.timeout()) .customs(request.customs());
.masterTimeout(request.masterNodeTimeout()),
new MetaDataCreateIndexService.Listener() { createIndexService.createIndex(updateRequest, new ClusterStateUpdateListener() {
@Override @Override
public void onResponse(MetaDataCreateIndexService.Response response) { public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new CreateIndexResponse(response.acknowledged())); listener.onResponse(new CreateIndexResponse(response.isAcknowledged()));
} }
@Override @Override

View File

@ -71,7 +71,6 @@ public class ClusterModule extends AbstractModule implements SpawnModules {
bind(RoutingService.class).asEagerSingleton(); bind(RoutingService.class).asEagerSingleton();
bind(ShardStateAction.class).asEagerSingleton(); bind(ShardStateAction.class).asEagerSingleton();
bind(NodeIndexCreatedAction.class).asEagerSingleton();
bind(NodeIndexDeletedAction.class).asEagerSingleton(); bind(NodeIndexDeletedAction.class).asEagerSingleton();
bind(NodeMappingRefreshAction.class).asEagerSingleton(); bind(NodeMappingRefreshAction.class).asEagerSingleton();
bind(MappingUpdatedAction.class).asEagerSingleton(); bind(MappingUpdatedAction.class).asEagerSingleton();

View File

@ -1,134 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.action.index;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/**
*
*/
public class NodeIndexCreatedAction extends AbstractComponent {
private final ThreadPool threadPool;
private final TransportService transportService;
private final List<Listener> listeners = new CopyOnWriteArrayList<Listener>();
@Inject
public NodeIndexCreatedAction(Settings settings, ThreadPool threadPool, TransportService transportService) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
transportService.registerHandler(NodeIndexCreatedTransportHandler.ACTION, new NodeIndexCreatedTransportHandler());
}
public void add(Listener listener) {
listeners.add(listener);
}
public void remove(Listener listener) {
listeners.remove(listener);
}
public void nodeIndexCreated(final ClusterState clusterState, final String index, final String nodeId) throws ElasticSearchException {
DiscoveryNodes nodes = clusterState.nodes();
if (nodes.localNodeMaster()) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
innerNodeIndexCreated(index, nodeId);
}
});
} else {
transportService.sendRequest(clusterState.nodes().masterNode(),
NodeIndexCreatedTransportHandler.ACTION, new NodeIndexCreatedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
}
}
private void innerNodeIndexCreated(String index, String nodeId) {
for (Listener listener : listeners) {
listener.onNodeIndexCreated(index, nodeId);
}
}
public static interface Listener {
void onNodeIndexCreated(String index, String nodeId);
}
private class NodeIndexCreatedTransportHandler extends BaseTransportRequestHandler<NodeIndexCreatedMessage> {
static final String ACTION = "cluster/nodeIndexCreated";
@Override
public NodeIndexCreatedMessage newInstance() {
return new NodeIndexCreatedMessage();
}
@Override
public void messageReceived(NodeIndexCreatedMessage message, TransportChannel channel) throws Exception {
innerNodeIndexCreated(message.index, message.nodeId);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
static class NodeIndexCreatedMessage extends TransportRequest {
String index;
String nodeId;
NodeIndexCreatedMessage() {
}
NodeIndexCreatedMessage(String index, String nodeId) {
this.index = index;
this.nodeId = nodeId;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeString(nodeId);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readString();
nodeId = in.readString();
}
}
}

View File

@ -24,21 +24,23 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
@ -66,21 +68,19 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.util.*; import java.util.Comparator;
import java.util.concurrent.ScheduledFuture; import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.collect.Maps.newHashMap;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*; import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
/** /**
* * Service responsible for submitting create index requests
*/ */
public class MetaDataCreateIndexService extends AbstractComponent { public class MetaDataCreateIndexService extends AbstractComponent {
@ -89,29 +89,27 @@ public class MetaDataCreateIndexService extends AbstractComponent {
private final ClusterService clusterService; private final ClusterService clusterService;
private final IndicesService indicesService; private final IndicesService indicesService;
private final AllocationService allocationService; private final AllocationService allocationService;
private final NodeIndexCreatedAction nodeIndexCreatedAction;
private final MetaDataService metaDataService; private final MetaDataService metaDataService;
private final Version version; private final Version version;
private final String riverIndexName; private final String riverIndexName;
@Inject @Inject
public MetaDataCreateIndexService(Settings settings, Environment environment, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService, public MetaDataCreateIndexService(Settings settings, Environment environment, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService,
AllocationService allocationService, NodeIndexCreatedAction nodeIndexCreatedAction, MetaDataService metaDataService, Version version, @RiverIndexName String riverIndexName) { AllocationService allocationService, MetaDataService metaDataService, Version version, @RiverIndexName String riverIndexName) {
super(settings); super(settings);
this.environment = environment; this.environment = environment;
this.threadPool = threadPool; this.threadPool = threadPool;
this.clusterService = clusterService; this.clusterService = clusterService;
this.indicesService = indicesService; this.indicesService = indicesService;
this.allocationService = allocationService; this.allocationService = allocationService;
this.nodeIndexCreatedAction = nodeIndexCreatedAction;
this.metaDataService = metaDataService; this.metaDataService = metaDataService;
this.version = version; this.version = version;
this.riverIndexName = riverIndexName; this.riverIndexName = riverIndexName;
} }
public void createIndex(final Request request, final Listener userListener) { public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) {
ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder(); ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();
for (Map.Entry<String, String> entry : request.settings.getAsMap().entrySet()) { for (Map.Entry<String, String> entry : request.settings().getAsMap().entrySet()) {
if (!entry.getKey().startsWith("index.")) { if (!entry.getKey().startsWith("index.")) {
updatedSettingsBuilder.put("index." + entry.getKey(), entry.getValue()); updatedSettingsBuilder.put("index." + entry.getKey(), entry.getValue());
} else { } else {
@ -122,11 +120,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
// we lock here, and not within the cluster service callback since we don't want to // we lock here, and not within the cluster service callback since we don't want to
// block the whole cluster state handling // block the whole cluster state handling
final Semaphore mdLock = metaDataService.indexMetaDataLock(request.index); final Semaphore mdLock = metaDataService.indexMetaDataLock(request.index());
// quick check to see if we can acquire a lock, otherwise spawn to a thread pool // quick check to see if we can acquire a lock, otherwise spawn to a thread pool
if (mdLock.tryAcquire()) { if (mdLock.tryAcquire()) {
createIndex(request, userListener, mdLock); createIndex(request, listener, mdLock);
return; return;
} }
@ -134,16 +132,17 @@ public class MetaDataCreateIndexService extends AbstractComponent {
@Override @Override
public void run() { public void run() {
try { try {
if (!mdLock.tryAcquire(request.masterTimeout.nanos(), TimeUnit.NANOSECONDS)) { if (!mdLock.tryAcquire(request.masterNodeTimeout().nanos(), TimeUnit.NANOSECONDS)) {
userListener.onFailure(new ProcessClusterEventTimeoutException(request.masterTimeout, "acquire index lock")); listener.onFailure(new ProcessClusterEventTimeoutException(request.masterNodeTimeout(), "acquire index lock"));
return; return;
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
userListener.onFailure(e); Thread.interrupted();
listener.onFailure(e);
return; return;
} }
createIndex(request, userListener, mdLock); createIndex(request, listener, mdLock);
} }
}); });
} }
@ -172,17 +171,39 @@ public class MetaDataCreateIndexService extends AbstractComponent {
} }
} }
private void createIndex(final Request request, final Listener userListener, Semaphore mdLock) { private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ClusterStateUpdateListener listener, final Semaphore mdLock) {
final CreateIndexListener listener = new CreateIndexListener(mdLock, request, userListener); clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]", Priority.URGENT, new AckedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
mdLock.release();
listener.onResponse(new ClusterStateUpdateResponse(true));
}
@Override
public void onAckTimeout() {
mdLock.release();
listener.onResponse(new ClusterStateUpdateResponse(false));
}
@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}
@Override @Override
public TimeValue timeout() { public TimeValue timeout() {
return request.masterTimeout; return request.masterNodeTimeout();
} }
@Override @Override
public void onFailure(String source, Throwable t) { public void onFailure(String source, Throwable t) {
mdLock.release();
listener.onFailure(t); listener.onFailure(t);
} }
@ -202,11 +223,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
// add the request mapping // add the request mapping
Map<String, Map<String, Object>> mappings = Maps.newHashMap(); Map<String, Map<String, Object>> mappings = Maps.newHashMap();
for (Map.Entry<String, String> entry : request.mappings.entrySet()) { for (Map.Entry<String, String> entry : request.mappings().entrySet()) {
mappings.put(entry.getKey(), parseMapping(entry.getValue())); mappings.put(entry.getKey(), parseMapping(entry.getValue()));
} }
for (Map.Entry<String, Custom> entry : request.customs.entrySet()) { for (Map.Entry<String, Custom> entry : request.customs().entrySet()) {
customs.put(entry.getKey(), entry.getValue()); customs.put(entry.getKey(), entry.getValue());
} }
@ -237,7 +258,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
File mappingsDir = new File(environment.configFile(), "mappings"); File mappingsDir = new File(environment.configFile(), "mappings");
if (mappingsDir.exists() && mappingsDir.isDirectory()) { if (mappingsDir.exists() && mappingsDir.isDirectory()) {
// first index level // first index level
File indexMappingsDir = new File(mappingsDir, request.index); File indexMappingsDir = new File(mappingsDir, request.index());
if (indexMappingsDir.exists() && indexMappingsDir.isDirectory()) { if (indexMappingsDir.exists() && indexMappingsDir.isDirectory()) {
addMappings(mappings, indexMappingsDir); addMappings(mappings, indexMappingsDir);
} }
@ -255,17 +276,17 @@ public class MetaDataCreateIndexService extends AbstractComponent {
indexSettingsBuilder.put(templates.get(i).settings()); indexSettingsBuilder.put(templates.get(i).settings());
} }
// now, put the request settings, so they override templates // now, put the request settings, so they override templates
indexSettingsBuilder.put(request.settings); indexSettingsBuilder.put(request.settings());
if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null) { if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null) {
if (request.index.equals(riverIndexName)) { if (request.index().equals(riverIndexName)) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1)); indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1));
} else { } else {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5)); indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5));
} }
} }
if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null) { if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null) {
if (request.index.equals(riverIndexName)) { if (request.index().equals(riverIndexName)) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1)); indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
} else { } else {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1)); indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
@ -286,10 +307,10 @@ public class MetaDataCreateIndexService extends AbstractComponent {
// Set up everything, now locally create the index to see that things are ok, and apply // Set up everything, now locally create the index to see that things are ok, and apply
// create the index here (on the master) to validate it can be created, as well as adding the mapping // create the index here (on the master) to validate it can be created, as well as adding the mapping
indicesService.createIndex(request.index, actualIndexSettings, clusterService.localNode().id()); indicesService.createIndex(request.index(), actualIndexSettings, clusterService.localNode().id());
indexCreated = true; indexCreated = true;
// now add the mappings // now add the mappings
IndexService indexService = indicesService.indexServiceSafe(request.index); IndexService indexService = indicesService.indexServiceSafe(request.index());
MapperService mapperService = indexService.mapperService(); MapperService mapperService = indexService.mapperService();
// first, add the default mapping // first, add the default mapping
if (mappings.containsKey(MapperService.DEFAULT_MAPPING)) { if (mappings.containsKey(MapperService.DEFAULT_MAPPING)) {
@ -319,14 +340,14 @@ public class MetaDataCreateIndexService extends AbstractComponent {
mappingsMetaData.put(mapper.type(), mappingMd); mappingsMetaData.put(mapper.type(), mappingMd);
} }
final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index).settings(actualIndexSettings); final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index()).settings(actualIndexSettings);
for (MappingMetaData mappingMd : mappingsMetaData.values()) { for (MappingMetaData mappingMd : mappingsMetaData.values()) {
indexMetaDataBuilder.putMapping(mappingMd); indexMetaDataBuilder.putMapping(mappingMd);
} }
for (Map.Entry<String, Custom> customEntry : customs.entrySet()) { for (Map.Entry<String, Custom> customEntry : customs.entrySet()) {
indexMetaDataBuilder.putCustom(customEntry.getKey(), customEntry.getValue()); indexMetaDataBuilder.putCustom(customEntry.getKey(), customEntry.getValue());
} }
indexMetaDataBuilder.state(request.state); indexMetaDataBuilder.state(request.state());
final IndexMetaData indexMetaData; final IndexMetaData indexMetaData;
try { try {
indexMetaData = indexMetaDataBuilder.build(); indexMetaData = indexMetaDataBuilder.build();
@ -339,57 +360,31 @@ public class MetaDataCreateIndexService extends AbstractComponent {
.put(indexMetaData, false) .put(indexMetaData, false)
.build(); .build();
logger.info("[{}] creating index, cause [{}], shards [{}]/[{}], mappings {}", request.index, request.cause, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), mappings.keySet()); logger.info("[{}] creating index, cause [{}], shards [{}]/[{}], mappings {}", request.index(), request.cause(), indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), mappings.keySet());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
if (!request.blocks.isEmpty()) { if (!request.blocks().isEmpty()) {
for (ClusterBlock block : request.blocks) { for (ClusterBlock block : request.blocks()) {
blocks.addIndexBlock(request.index, block); blocks.addIndexBlock(request.index(), block);
} }
} }
if (request.state == State.CLOSE) { if (request.state() == State.CLOSE) {
blocks.addIndexBlock(request.index, MetaDataIndexStateService.INDEX_CLOSED_BLOCK); blocks.addIndexBlock(request.index(), MetaDataIndexStateService.INDEX_CLOSED_BLOCK);
} }
ClusterState updatedState = ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build(); ClusterState updatedState = ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build();
if (request.state == State.OPEN) { if (request.state() == State.OPEN) {
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable()) RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())
.addAsNew(updatedState.metaData().index(request.index)); .addAsNew(updatedState.metaData().index(request.index()));
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(routingTableBuilder).build()); RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(routingTableBuilder).build());
updatedState = ClusterState.builder(updatedState).routingResult(routingResult).build(); updatedState = ClusterState.builder(updatedState).routingResult(routingResult).build();
} }
// we wait for events from all nodes that the index has been added to the metadata
final AtomicInteger counter = new AtomicInteger(currentState.nodes().size());
final NodeIndexCreatedAction.Listener nodeIndexCreatedListener = new NodeIndexCreatedAction.Listener() {
@Override
public void onNodeIndexCreated(String index, String nodeId) {
if (index.equals(request.index)) {
if (counter.decrementAndGet() == 0) {
listener.onResponse(new Response(true, indexMetaData));
nodeIndexCreatedAction.remove(this);
}
}
}
};
nodeIndexCreatedAction.add(nodeIndexCreatedListener);
listener.future = threadPool.schedule(request.timeout, ThreadPool.Names.SAME, new Runnable() {
@Override
public void run() {
listener.onResponse(new Response(false, indexMetaData));
nodeIndexCreatedAction.remove(nodeIndexCreatedListener);
}
});
return updatedState; return updatedState;
} finally { } finally {
if (indexCreated) { if (indexCreated) {
// Index was already partially created - need to clean up // Index was already partially created - need to clean up
indicesService.removeIndex(request.index, failureReason != null ? failureReason : "failed to create index"); indicesService.removeIndex(request.index(), failureReason != null ? failureReason : "failed to create index");
} }
} }
} }
@ -400,44 +395,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
}); });
} }
class CreateIndexListener implements Listener {
private final AtomicBoolean notified = new AtomicBoolean();
private final Semaphore mdLock;
private final Request request;
private final Listener listener;
volatile ScheduledFuture future;
private CreateIndexListener(Semaphore mdLock, Request request, Listener listener) {
this.mdLock = mdLock;
this.request = request;
this.listener = listener;
}
@Override
public void onResponse(final Response response) {
if (notified.compareAndSet(false, true)) {
mdLock.release();
if (future != null) {
future.cancel(false);
}
listener.onResponse(response);
}
}
@Override
public void onFailure(Throwable t) {
if (notified.compareAndSet(false, true)) {
mdLock.release();
if (future != null) {
future.cancel(false);
}
listener.onFailure(t);
}
}
}
private Map<String, Object> parseMapping(String mappingSource) throws Exception { private Map<String, Object> parseMapping(String mappingSource) throws Exception {
return XContentFactory.xContent(mappingSource).createParser(mappingSource).mapAndClose(); return XContentFactory.xContent(mappingSource).createParser(mappingSource).mapAndClose();
} }
@ -463,11 +420,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
} }
} }
private List<IndexTemplateMetaData> findTemplates(Request request, ClusterState state) { private List<IndexTemplateMetaData> findTemplates(CreateIndexClusterStateUpdateRequest request, ClusterState state) {
List<IndexTemplateMetaData> templates = Lists.newArrayList(); List<IndexTemplateMetaData> templates = Lists.newArrayList();
for (ObjectCursor<IndexTemplateMetaData> cursor : state.metaData().templates().values()) { for (ObjectCursor<IndexTemplateMetaData> cursor : state.metaData().templates().values()) {
IndexTemplateMetaData template = cursor.value; IndexTemplateMetaData template = cursor.value;
if (Regex.simpleMatch(template.template(), request.index)) { if (Regex.simpleMatch(template.template(), request.index())) {
templates.add(template); templates.add(template);
} }
} }
@ -483,11 +440,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
byte[] templatesData = Streams.copyToByteArray(templatesFile); byte[] templatesData = Streams.copyToByteArray(templatesFile);
parser = XContentHelper.createParser(templatesData, 0, templatesData.length); parser = XContentHelper.createParser(templatesData, 0, templatesData.length);
IndexTemplateMetaData template = IndexTemplateMetaData.Builder.fromXContent(parser); IndexTemplateMetaData template = IndexTemplateMetaData.Builder.fromXContent(parser);
if (Regex.simpleMatch(template.template(), request.index)) { if (Regex.simpleMatch(template.template(), request.index())) {
templates.add(template); templates.add(template);
} }
} catch (Exception e) { } catch (Exception e) {
logger.warn("[{}] failed to read template [{}] from config", e, request.index, templatesFile.getAbsolutePath()); logger.warn("[{}] failed to read template [{}] from config", e, request.index(), templatesFile.getAbsolutePath());
} finally { } finally {
IOUtils.closeWhileHandlingException(parser); IOUtils.closeWhileHandlingException(parser);
} }
@ -504,106 +461,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
return templates; return templates;
} }
private void validate(Request request, ClusterState state) throws ElasticSearchException { private void validate(CreateIndexClusterStateUpdateRequest request, ClusterState state) throws ElasticSearchException {
validateIndexName(request.index, state); validateIndexName(request.index(), state);
}
public static interface Listener {
void onResponse(Response response);
void onFailure(Throwable t);
}
public static class Request {
final String cause;
final String index;
State state = State.OPEN;
Settings settings = ImmutableSettings.Builder.EMPTY_SETTINGS;
Map<String, String> mappings = Maps.newHashMap();
Map<String, IndexMetaData.Custom> customs = newHashMap();
TimeValue timeout = TimeValue.timeValueSeconds(5);
TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT;
Set<ClusterBlock> blocks = Sets.newHashSet();
public Request(String cause, String index) {
this.cause = cause;
this.index = index;
}
public Request settings(Settings settings) {
this.settings = settings;
return this;
}
public Request mappings(Map<String, String> mappings) {
this.mappings.putAll(mappings);
return this;
}
public Request mappingsMetaData(Map<String, MappingMetaData> mappings) throws IOException {
for (Map.Entry<String, MappingMetaData> entry : mappings.entrySet()) {
this.mappings.put(entry.getKey(), entry.getValue().source().string());
}
return this;
}
public Request mappingsCompressed(Map<String, CompressedString> mappings) throws IOException {
for (Map.Entry<String, CompressedString> entry : mappings.entrySet()) {
this.mappings.put(entry.getKey(), entry.getValue().string());
}
return this;
}
public Request customs(Map<String, Custom> customs) {
this.customs.putAll(customs);
return this;
}
public Request blocks(Set<ClusterBlock> blocks) {
this.blocks.addAll(blocks);
return this;
}
public Request state(State state) {
this.state = state;
return this;
}
public Request timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
public Request masterTimeout(TimeValue masterTimeout) {
this.masterTimeout = masterTimeout;
return this;
}
}
public static class Response {
private final boolean acknowledged;
private final IndexMetaData indexMetaData;
public Response(boolean acknowledged, IndexMetaData indexMetaData) {
this.acknowledged = acknowledged;
this.indexMetaData = indexMetaData;
}
public boolean acknowledged() {
return acknowledged;
}
public IndexMetaData indexMetaData() {
return indexMetaData;
}
} }
} }

View File

@ -28,7 +28,6 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.action.shard.ShardStateAction;
@ -86,7 +85,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final RecoveryTarget recoveryTarget; private final RecoveryTarget recoveryTarget;
private final ShardStateAction shardStateAction; private final ShardStateAction shardStateAction;
private final NodeIndexCreatedAction nodeIndexCreatedAction;
private final NodeIndexDeletedAction nodeIndexDeletedAction; private final NodeIndexDeletedAction nodeIndexDeletedAction;
private final NodeMappingRefreshAction nodeMappingRefreshAction; private final NodeMappingRefreshAction nodeMappingRefreshAction;
@ -117,7 +115,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService, public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService,
ThreadPool threadPool, RecoveryTarget recoveryTarget, ThreadPool threadPool, RecoveryTarget recoveryTarget,
ShardStateAction shardStateAction, ShardStateAction shardStateAction,
NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction, NodeIndexDeletedAction nodeIndexDeletedAction,
NodeMappingRefreshAction nodeMappingRefreshAction) { NodeMappingRefreshAction nodeMappingRefreshAction) {
super(settings); super(settings);
this.indicesService = indicesService; this.indicesService = indicesService;
@ -125,7 +123,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
this.threadPool = threadPool; this.threadPool = threadPool;
this.recoveryTarget = recoveryTarget; this.recoveryTarget = recoveryTarget;
this.shardStateAction = shardStateAction; this.shardStateAction = shardStateAction;
this.nodeIndexCreatedAction = nodeIndexCreatedAction;
this.nodeIndexDeletedAction = nodeIndexDeletedAction; this.nodeIndexDeletedAction = nodeIndexDeletedAction;
this.nodeMappingRefreshAction = nodeMappingRefreshAction; this.nodeMappingRefreshAction = nodeMappingRefreshAction;
@ -193,13 +190,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private void sendIndexLifecycleEvents(final ClusterChangedEvent event) { private void sendIndexLifecycleEvents(final ClusterChangedEvent event) {
String localNodeId = event.state().nodes().localNodeId(); String localNodeId = event.state().nodes().localNodeId();
assert localNodeId != null; assert localNodeId != null;
for (String index : event.indicesCreated()) {
try {
nodeIndexCreatedAction.nodeIndexCreated(event.state(), index, localNodeId);
} catch (Throwable e) {
logger.debug("failed to send to master index {} created event", e, index);
}
}
for (String index : event.indicesDeleted()) { for (String index : event.indicesDeleted()) {
try { try {
nodeIndexDeletedAction.nodeIndexDeleted(event.state(), index, localNodeId); nodeIndexDeletedAction.nodeIndexDeleted(event.state(), index, localNodeId);

View File

@ -19,20 +19,15 @@
package org.elasticsearch.rest.action.admin.indices.create; package org.elasticsearch.rest.action.admin.indices.create;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*; import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import java.io.IOException; import java.io.IOException;
import static org.elasticsearch.rest.RestStatus.OK;
/** /**
* *
*/ */
@ -66,29 +61,6 @@ public class RestCreateIndexAction extends BaseRestHandler {
createIndexRequest.timeout(request.paramAsTime("timeout", createIndexRequest.timeout())); createIndexRequest.timeout(request.paramAsTime("timeout", createIndexRequest.timeout()));
createIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", createIndexRequest.masterNodeTimeout())); createIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", createIndexRequest.masterNodeTimeout()));
client.admin().indices().create(createIndexRequest, new ActionListener<CreateIndexResponse>() { client.admin().indices().create(createIndexRequest, new AcknowledgedRestResponseActionListener<CreateIndexResponse>(request, channel, logger));
@Override
public void onResponse(CreateIndexResponse response) {
try {
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
builder.startObject()
.field("ok", true)
.field("acknowledged", response.isAcknowledged())
.endObject();
channel.sendResponse(new XContentRestResponse(request, OK, builder));
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
} }
} }

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResp
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse; import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
@ -469,6 +470,28 @@ public class AckTests extends ElasticsearchIntegrationTest {
assertThat(putMappingResponse.isAcknowledged(), equalTo(false)); assertThat(putMappingResponse.isAcknowledged(), equalTo(false));
} }
@Test
public void testCreateIndexAcknowledgement() {
createIndex("test");
for (Client client : clients()) {
assertThat(getLocalClusterState(client).metaData().indices().containsKey("test"), equalTo(true));
}
//let's wait for green, otherwise there can be issues with after test checks (mock directory wrapper etc.)
//but we do want to check that the new index is on all nodes cluster state even before green
ensureGreen();
}
@Test
public void testCreateIndexNoAcknowledgement() {
CreateIndexResponse createIndexResponse = client().admin().indices().prepareCreate("test").setTimeout("0s").get();
assertThat(createIndexResponse.isAcknowledged(), equalTo(false));
//let's wait for green, otherwise there can be issues with after test checks (mock directory wrapper etc.)
ensureGreen();
}
private static ClusterState getLocalClusterState(Client client) { private static ClusterState getLocalClusterState(Client client) {
return client.admin().cluster().prepareState().setLocal(true).get().getState(); return client.admin().cluster().prepareState().setLocal(true).get().getState();
} }

View File

@ -26,12 +26,8 @@ import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.percolate.PercolateResponse; import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
@ -81,20 +77,11 @@ public class ElasticsearchAssertions {
assertAcked(builder.get()); assertAcked(builder.get());
} }
public static void assertAcked(CreateIndexRequestBuilder builder) {
assertAcked(builder.get());
}
public static void assertAcked(DeleteIndexResponse response) { public static void assertAcked(DeleteIndexResponse response) {
assertThat("Delete Index failed - not acked", response.isAcknowledged(), equalTo(true)); assertThat("Delete Index failed - not acked", response.isAcknowledged(), equalTo(true));
assertVersionSerializable(response); assertVersionSerializable(response);
} }
public static void assertAcked(CreateIndexResponse response) {
assertThat("Create Index failed - not acked", response.isAcknowledged(), equalTo(true));
assertVersionSerializable(response);
}
public static String formatShardStatus(BroadcastOperationResponse response) { public static String formatShardStatus(BroadcastOperationResponse response) {
String msg = " Total shards: " + response.getTotalShards() + " Successful shards: " + response.getSuccessfulShards() + " & " String msg = " Total shards: " + response.getTotalShards() + " Successful shards: " + response.getSuccessfulShards() + " & "
+ response.getFailedShards() + " shard failures:"; + response.getFailedShards() + " shard failures:";