parent
ef25ac2414
commit
af1e8c0eb1
|
@ -24,12 +24,16 @@ import org.elasticsearch.ElasticSearchException;
|
|||
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.delete.TransportDeleteAction;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.index.TransportIndexAction;
|
||||
import org.elasticsearch.action.support.AutoCreateIndex;
|
||||
import org.elasticsearch.action.support.single.instance.TransportInstanceSingleOperationAction;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
|
@ -61,6 +65,7 @@ import org.elasticsearch.index.service.IndexService;
|
|||
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.service.IndexShard;
|
||||
import org.elasticsearch.indices.IndexAlreadyExistsException;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.script.ExecutableScript;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
|
@ -86,14 +91,20 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
|
|||
|
||||
private final ScriptService scriptService;
|
||||
|
||||
private final AutoCreateIndex autoCreateIndex;
|
||||
|
||||
private final TransportCreateIndexAction createIndexAction;
|
||||
|
||||
@Inject
|
||||
public TransportUpdateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
|
||||
IndicesService indicesService, TransportIndexAction indexAction, TransportDeleteAction deleteAction, ScriptService scriptService) {
|
||||
IndicesService indicesService, TransportIndexAction indexAction, TransportDeleteAction deleteAction, ScriptService scriptService, TransportCreateIndexAction createIndexAction) {
|
||||
super(settings, threadPool, clusterService, transportService);
|
||||
this.indicesService = indicesService;
|
||||
this.indexAction = indexAction;
|
||||
this.deleteAction = deleteAction;
|
||||
this.scriptService = scriptService;
|
||||
this.createIndexAction = createIndexAction;
|
||||
this.autoCreateIndex = new AutoCreateIndex(settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -144,6 +155,40 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(final UpdateRequest request, final ActionListener<UpdateResponse> listener) {
|
||||
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
|
||||
if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
|
||||
request.beforeLocalFork(); // we fork on another thread...
|
||||
createIndexAction.execute(new CreateIndexRequest(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(CreateIndexResponse result) {
|
||||
innerExecute(request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
|
||||
// we have the index, do it
|
||||
try {
|
||||
innerExecute(request, listener);
|
||||
} catch (Exception e1) {
|
||||
listener.onFailure(e1);
|
||||
}
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
innerExecute(request, listener);
|
||||
}
|
||||
}
|
||||
|
||||
private void innerExecute(final UpdateRequest request, final ActionListener<UpdateResponse> listener) {
|
||||
super.doExecute(request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ShardIterator shards(ClusterState clusterState, UpdateRequest request) throws ElasticSearchException {
|
||||
if (request.shardId() != -1) {
|
||||
|
|
|
@ -218,6 +218,24 @@ public class UpdateTests extends AbstractNodesTests {
|
|||
assertThat(updateResponse.getGetResult().sourceAsMap().get("extra").toString(), equalTo("foo"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIndexAutoCreation() throws Exception {
|
||||
try {
|
||||
client.admin().indices().prepareDelete("test").execute().actionGet();
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
|
||||
UpdateResponse updateResponse = client.prepareUpdate("test", "type1", "1")
|
||||
.setUpsert(XContentFactory.jsonBuilder().startObject().field("bar", "baz").endObject())
|
||||
.setScript("ctx._source.extra = \"foo\"")
|
||||
.setFields("_source")
|
||||
.execute().actionGet();
|
||||
|
||||
assertThat(updateResponse.getGetResult(), notNullValue());
|
||||
assertThat(updateResponse.getGetResult().sourceAsMap().get("bar").toString(), equalTo("baz"));
|
||||
assertThat(updateResponse.getGetResult().sourceAsMap().get("extra"), nullValue());
|
||||
}
|
||||
@Test
|
||||
public void testUpdate() throws Exception {
|
||||
createIndex();
|
||||
|
|
Loading…
Reference in New Issue