Internal: auto create index to keep around headers and context of the request that caused it

Closes #7331
This commit is contained in:
javanna 2014-08-19 10:07:44 +02:00 committed by Luca Cavanna
parent a279f2e8c6
commit 7aa2d11cdd
7 changed files with 26 additions and 4 deletions

View File

@ -25,6 +25,7 @@ import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
@ -75,6 +76,14 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
CreateIndexRequest() { CreateIndexRequest() {
} }
/**
* Constructs a new request to create an index that was triggered by a different request,
* provided as an argument so that its headers and context can be copied to the new request.
*/
public CreateIndexRequest(ActionRequest request) {
super(request);
}
/** /**
* Constructs a new request to create an index with the specified name. * Constructs a new request to create an index with the specified name.
*/ */

View File

@ -120,7 +120,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
ClusterState state = clusterService.state(); ClusterState state = clusterService.state();
for (final String index : indices) { for (final String index : indices) {
if (autoCreateIndex.shouldAutoCreate(index, state)) { if (autoCreateIndex.shouldAutoCreate(index, state)) {
createIndexAction.execute(new CreateIndexRequest(index).cause("auto(bulk api)").masterNodeTimeout(bulkRequest.timeout()), new ActionListener<CreateIndexResponse>() { createIndexAction.execute(new CreateIndexRequest(bulkRequest).index(index).cause("auto(bulk api)").masterNodeTimeout(bulkRequest.timeout()), new ActionListener<CreateIndexResponse>() {
@Override @Override
public void onResponse(CreateIndexResponse result) { public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {

View File

@ -74,7 +74,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
protected void doExecute(final DeleteRequest request, final ActionListener<DeleteResponse> listener) { protected void doExecute(final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) { if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
request.beforeLocalFork(); request.beforeLocalFork();
createIndexAction.execute(new CreateIndexRequest(request.index()).cause("auto(delete api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() { createIndexAction.execute(new CreateIndexRequest(request).index(request.index()).cause("auto(delete api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
@Override @Override
public void onResponse(CreateIndexResponse result) { public void onResponse(CreateIndexResponse result) {
innerExecute(request, listener); innerExecute(request, listener);

View File

@ -83,7 +83,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API // if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) { if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
request.beforeLocalFork(); // we fork on another thread... request.beforeLocalFork(); // we fork on another thread...
createIndexAction.execute(new CreateIndexRequest(request.index()).cause("auto(index api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() { createIndexAction.execute(new CreateIndexRequest(request).index(request.index()).cause("auto(index api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
@Override @Override
public void onResponse(CreateIndexResponse result) { public void onResponse(CreateIndexResponse result) {
innerExecute(request, listener); innerExecute(request, listener);

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.action.support.master; package org.elasticsearch.action.support.master;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.cluster.ack.AckedRequest; import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -41,6 +42,10 @@ public abstract class AcknowledgedRequest<T extends MasterNodeOperationRequest>
protected AcknowledgedRequest() { protected AcknowledgedRequest() {
} }
protected AcknowledgedRequest(ActionRequest request) {
super(request);
}
/** /**
* Allows to set the timeout * Allows to set the timeout
* @param timeout timeout as a string (e.g. 1s) * @param timeout timeout as a string (e.g. 1s)

View File

@ -35,6 +35,14 @@ public abstract class MasterNodeOperationRequest<T extends MasterNodeOperationRe
protected TimeValue masterNodeTimeout = DEFAULT_MASTER_NODE_TIMEOUT; protected TimeValue masterNodeTimeout = DEFAULT_MASTER_NODE_TIMEOUT;
protected MasterNodeOperationRequest() {
}
protected MasterNodeOperationRequest(ActionRequest request) {
super(request);
}
/** /**
* A timeout value in case the master has not been discovered yet or disconnected. * A timeout value in case the master has not been discovered yet or disconnected.
*/ */

View File

@ -120,7 +120,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API // if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) { if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
request.beforeLocalFork(); // we fork on another thread... request.beforeLocalFork(); // we fork on another thread...
createIndexAction.execute(new CreateIndexRequest(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() { createIndexAction.execute(new CreateIndexRequest(request).index(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
@Override @Override
public void onResponse(CreateIndexResponse result) { public void onResponse(CreateIndexResponse result) {
innerExecute(request, listener); innerExecute(request, listener);