automatic index creation when using the bulk api

This commit is contained in:
kimchy 2010-09-15 18:18:48 +02:00
parent e93eb16deb
commit e52daa9670
4 changed files with 85 additions and 3 deletions

View File

@ -23,6 +23,9 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.BaseAction;
@ -31,9 +34,11 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.UUID;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
@ -42,6 +47,8 @@ import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -49,6 +56,8 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
private final boolean autoCreateIndex;
private final boolean allowIdGeneration;
private final ThreadPool threadPool;
@ -59,20 +68,74 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
private final TransportShardBulkAction shardBulkAction;
private final TransportCreateIndexAction createIndexAction;
@Inject public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, IndicesService indicesService,
TransportShardBulkAction shardBulkAction) {
TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.indicesService = indicesService;
this.shardBulkAction = shardBulkAction;
this.createIndexAction = createIndexAction;
this.allowIdGeneration = componentSettings.getAsBoolean("allow_id_generation", true);
this.autoCreateIndex = settings.getAsBoolean("action.auto_create_index", true);
this.allowIdGeneration = componentSettings.getAsBoolean("action.allow_id_generation", true);
transportService.registerHandler(TransportActions.BULK, new TransportHandler());
}
@Override protected void doExecute(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
Set<String> indices = Sets.newHashSet();
for (ActionRequest request : bulkRequest.requests) {
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
if (!indices.contains(indexRequest.index())) {
indices.add(indexRequest.index());
}
} else if (request instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request;
if (!indices.contains(deleteRequest.index())) {
indices.add(deleteRequest.index());
}
}
}
if (autoCreateIndex) {
final AtomicInteger counter = new AtomicInteger(indices.size());
final AtomicBoolean failed = new AtomicBoolean();
for (String index : indices) {
if (!clusterService.state().metaData().hasConcreteIndex(index)) {
createIndexAction.execute(new CreateIndexRequest(index).cause("auto(bulk api)"), new ActionListener<CreateIndexResponse>() {
@Override public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
executeBulk(bulkRequest, listener);
}
}
@Override public void onFailure(Throwable e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
// we have the index, do it
if (counter.decrementAndGet() == 0) {
executeBulk(bulkRequest, listener);
}
} else if (failed.compareAndSet(false, true)) {
listener.onFailure(e);
}
}
});
} else {
if (counter.decrementAndGet() == 0) {
executeBulk(bulkRequest, listener);
}
}
}
} else {
executeBulk(bulkRequest, listener);
}
}
private void executeBulk(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
ClusterState clusterState = clusterService.state();
for (ActionRequest request : bulkRequest.requests) {
if (request instanceof IndexRequest) {

View File

@ -74,7 +74,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
this.createIndexAction = createIndexAction;
this.mappingUpdatedAction = mappingUpdatedAction;
this.autoCreateIndex = settings.getAsBoolean("action.auto_create_index", true);
this.allowIdGeneration = componentSettings.getAsBoolean("allow_id_generation", true);
this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true);
}
@Override protected void doExecute(final IndexRequest indexRequest, final ActionListener<IndexResponse> listener) {

View File

@ -30,6 +30,9 @@ import org.elasticsearch.client.action.index.IndexRequestBuilder;
import org.elasticsearch.client.action.support.BaseRequestBuilder;
/**
* A bulk request holds an ordered {@link IndexRequest}s and {@link DeleteRequest}s and allows to executes
* it in a single batch.
*
* @author kimchy (shay.banon)
*/
public class BulkRequestBuilder extends BaseRequestBuilder<BulkRequest, BulkResponse> {
@ -38,21 +41,35 @@ public class BulkRequestBuilder extends BaseRequestBuilder<BulkRequest, BulkResp
super(client, new BulkRequest());
}
/**
* Adds an {@link IndexRequest} to the list of actions to execute. Follows the same behavior of {@link IndexRequest}
* (for example, if no id is provided, one will be generated, or usage of the create flag).
*/
public BulkRequestBuilder add(IndexRequest request) {
super.request.add(request);
return this;
}
/**
* Adds an {@link IndexRequest} to the list of actions to execute. Follows the same behavior of {@link IndexRequest}
* (for example, if no id is provided, one will be generated, or usage of the create flag).
*/
public BulkRequestBuilder add(IndexRequestBuilder request) {
super.request.add(request.request());
return this;
}
/**
* Adds an {@link DeleteRequest} to the list of actions to execute.
*/
public BulkRequestBuilder add(DeleteRequest request) {
super.request.add(request);
return this;
}
/**
* Adds an {@link DeleteRequest} to the list of actions to execute.
*/
public BulkRequestBuilder add(DeleteRequestBuilder request) {
super.request.add(request.request());
return this;

View File

@ -148,6 +148,7 @@ public class RestBulkAction extends BaseRestHandler {
builder.startArray("items");
for (BulkItemResponse itemResponse : response) {
builder.startObject();
builder.startObject(itemResponse.opType());
builder.field("index", itemResponse.index());
builder.field("type", itemResponse.type());
@ -156,6 +157,7 @@ public class RestBulkAction extends BaseRestHandler {
builder.field("error", itemResponse.failure().message());
}
builder.endObject();
builder.endObject();
}
builder.endArray();