Create Mapping API: Automatically create indices. Closes #12.
This commit is contained in:
parent
b5f3fc9ae1
commit
9a9ce99364
|
@ -20,18 +20,25 @@
|
||||||
package org.elasticsearch.action.admin.indices.mapping.create;
|
package org.elasticsearch.action.admin.indices.mapping.create;
|
||||||
|
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.Actions;
|
import org.elasticsearch.action.Actions;
|
||||||
import org.elasticsearch.action.TransportActions;
|
import org.elasticsearch.action.TransportActions;
|
||||||
|
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||||
|
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||||
|
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
|
||||||
import org.elasticsearch.action.support.BaseAction;
|
import org.elasticsearch.action.support.BaseAction;
|
||||||
import org.elasticsearch.cluster.ClusterService;
|
import org.elasticsearch.cluster.ClusterService;
|
||||||
import org.elasticsearch.cluster.metadata.MetaDataService;
|
import org.elasticsearch.cluster.metadata.MetaDataService;
|
||||||
|
import org.elasticsearch.indices.IndexAlreadyExistsException;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.*;
|
import org.elasticsearch.transport.*;
|
||||||
import org.elasticsearch.util.io.VoidStreamable;
|
import org.elasticsearch.util.io.VoidStreamable;
|
||||||
import org.elasticsearch.util.settings.Settings;
|
import org.elasticsearch.util.settings.Settings;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author kimchy (Shay Banon)
|
* @author kimchy (Shay Banon)
|
||||||
|
@ -44,21 +51,54 @@ public class TransportCreateMappingAction extends BaseAction<CreateMappingReques
|
||||||
|
|
||||||
private final MetaDataService metaDataService;
|
private final MetaDataService metaDataService;
|
||||||
|
|
||||||
|
private final TransportCreateIndexAction createIndexAction;
|
||||||
|
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
|
|
||||||
|
private final boolean autoCreateIndex;
|
||||||
|
|
||||||
@Inject public TransportCreateMappingAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
@Inject public TransportCreateMappingAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||||
ThreadPool threadPool, MetaDataService metaDataService) {
|
ThreadPool threadPool, MetaDataService metaDataService, TransportCreateIndexAction createIndexAction) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.transportService = transportService;
|
this.transportService = transportService;
|
||||||
this.clusterService = clusterService;
|
this.clusterService = clusterService;
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
this.metaDataService = metaDataService;
|
this.metaDataService = metaDataService;
|
||||||
|
this.createIndexAction = createIndexAction;
|
||||||
|
this.autoCreateIndex = settings.getAsBoolean("action.autoCreateIndex", true);
|
||||||
|
|
||||||
transportService.registerHandler(TransportActions.Admin.Indices.Mapping.CREATE, new TransportHandler());
|
transportService.registerHandler(TransportActions.Admin.Indices.Mapping.CREATE, new TransportHandler());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override protected void doExecute(final CreateMappingRequest request, final ActionListener<CreateMappingResponse> listener) {
|
@Override protected void doExecute(final CreateMappingRequest request, final ActionListener<CreateMappingResponse> listener) {
|
||||||
final String[] indices = Actions.processIndices(clusterService.state(), request.indices());
|
final String[] indices = Actions.processIndices(clusterService.state(), request.indices());
|
||||||
|
if (autoCreateIndex) {
|
||||||
|
final CountDownLatch latch = new CountDownLatch(indices.length);
|
||||||
|
for (String index : indices) {
|
||||||
|
if (!clusterService.state().metaData().hasIndex(index)) {
|
||||||
|
createIndexAction.execute(new CreateIndexRequest(index), new ActionListener<CreateIndexResponse>() {
|
||||||
|
@Override public void onResponse(CreateIndexResponse result) {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void onFailure(Throwable e) {
|
||||||
|
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
|
||||||
|
latch.countDown();
|
||||||
|
} else {
|
||||||
|
listener.onFailure(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
latch.await(10, TimeUnit.SECONDS);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
if (clusterService.state().nodes().localNodeMaster()) {
|
if (clusterService.state().nodes().localNodeMaster()) {
|
||||||
threadPool.execute(new Runnable() {
|
threadPool.execute(new Runnable() {
|
||||||
@Override public void run() {
|
@Override public void run() {
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
||||||
TransportCreateIndexAction createIndexAction) {
|
TransportCreateIndexAction createIndexAction) {
|
||||||
super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction);
|
super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction);
|
||||||
this.createIndexAction = createIndexAction;
|
this.createIndexAction = createIndexAction;
|
||||||
this.autoCreateIndex = componentSettings.getAsBoolean("autoCreateIndex", true);
|
this.autoCreateIndex = settings.getAsBoolean("action.autoCreateIndex", true);
|
||||||
this.allowIdGeneration = componentSettings.getAsBoolean("allowIdGeneration", true);
|
this.allowIdGeneration = componentSettings.getAsBoolean("allowIdGeneration", true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue