Make update a replication action

Currently, update action delegates to index and delete actions
for replication using a dedicated transport action. This change
makes update a replication operation, removing the dedicated
transport action. This simplifies bulk execution and removes
duplicate logic for update retries and translation. This
consolidates the interface for single document write requests.

Now on the primary, the update request is translated to
an index or delete request before execution and the translated
request is sent to copies for replication.
This commit is contained in:
Areek Zillur 2016-10-06 04:26:32 -04:00
parent 57d8025010
commit eee0d18f94
27 changed files with 460 additions and 1220 deletions

View File

@ -20,10 +20,11 @@ package org.elasticsearch.action;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.update.UpdateReplicaRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.index.VersionType;
import java.io.IOException;
@ -33,84 +34,72 @@ import java.util.Locale;
* Generic interface to group ActionRequest, which perform writes to a single document
* Action requests implementing this can be part of {@link org.elasticsearch.action.bulk.BulkRequest}
*/
public interface DocumentRequest<T> extends IndicesRequest {
/**
* Get the index that this request operates on
* @return the index
*/
String index();
public abstract class DocumentRequest<T extends ReplicatedWriteRequest<T>> extends ReplicatedWriteRequest<T> {
/**
* Get the type that this request operates on
* @return the type
*/
String type();
public abstract String type();
/**
* Get the id of the document for this request
* @return the id
*/
String id();
/**
* Get the options for this request
* @return the indices options
*/
IndicesOptions indicesOptions();
public abstract String id();
/**
* Set the routing for this request
* @return the Request
*/
T routing(String routing);
public abstract T routing(String routing);
/**
* Get the routing for this request
* @return the Routing
*/
String routing();
public abstract String routing();
/**
* Get the parent for this request
* @return the Parent
*/
String parent();
public abstract String parent();
/**
* Get the document version for this request
* @return the document version
*/
long version();
public abstract long version();
/**
* Sets the version, which will perform the operation only if a matching
* version exists and no changes happened on the doc since then.
*/
T version(long version);
public abstract T version(long version);
/**
* Get the document version type for this request
* @return the document version type
*/
VersionType versionType();
public abstract VersionType versionType();
/**
* Sets the versioning type. Defaults to {@link VersionType#INTERNAL}.
*/
T versionType(VersionType versionType);
public abstract T versionType(VersionType versionType);
/**
* Get the requested document operation type of the request
* @return the operation type {@link OpType}
*/
OpType opType();
public abstract OpType opType();
/**
* Requested operation type to perform on the document
*/
enum OpType {
public enum OpType {
/**
* Index the source. If there an existing document with the id, it will
* be replaced.
@ -164,40 +153,42 @@ public interface DocumentRequest<T> extends IndicesRequest {
}
/** read a document write (index/delete/update) request */
static DocumentRequest readDocumentRequest(StreamInput in) throws IOException {
public static DocumentRequest readDocumentRequest(StreamInput in) throws IOException {
byte type = in.readByte();
final DocumentRequest documentRequest;
if (type == 0) {
IndexRequest indexRequest = new IndexRequest();
indexRequest.readFrom(in);
documentRequest = indexRequest;
return indexRequest;
} else if (type == 1) {
DeleteRequest deleteRequest = new DeleteRequest();
deleteRequest.readFrom(in);
documentRequest = deleteRequest;
return deleteRequest;
} else if (type == 2) {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.readFrom(in);
documentRequest = updateRequest;
return updateRequest;
} else if (type == 3) {
UpdateReplicaRequest updateReplicaRequest = new UpdateReplicaRequest();
updateReplicaRequest.readFrom(in);
return updateReplicaRequest;
} else {
throw new IllegalStateException("invalid request type [" + type+ " ]");
}
return documentRequest;
}
/** write a document write (index/delete/update) request*/
static void writeDocumentRequest(StreamOutput out, DocumentRequest request) throws IOException {
public static void writeDocumentRequest(StreamOutput out, DocumentRequest request) throws IOException {
if (request instanceof IndexRequest) {
out.writeByte((byte) 0);
((IndexRequest) request).writeTo(out);
} else if (request instanceof DeleteRequest) {
out.writeByte((byte) 1);
((DeleteRequest) request).writeTo(out);
} else if (request instanceof UpdateRequest) {
out.writeByte((byte) 2);
((UpdateRequest) request).writeTo(out);
} else if (request instanceof UpdateReplicaRequest) {
out.writeByte((byte) 3);
} else {
throw new IllegalStateException("invalid request [" + request.getClass().getSimpleName() + " ]");
}
request.writeTo(out);
}
}

View File

@ -20,9 +20,6 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.DocumentRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;

View File

@ -528,11 +528,11 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
}
for (DocumentRequest<?> request : requests) {
// We first check if refresh has been set
if (((WriteRequest<?>) request).getRefreshPolicy() != RefreshPolicy.NONE) {
if (request.getRefreshPolicy() != RefreshPolicy.NONE) {
validationException = addValidationError(
"RefreshPolicy is not supported on an item request. Set it on the BulkRequest instead.", validationException);
}
ActionRequestValidationException ex = ((WriteRequest<?>) request).validate();
ActionRequestValidationException ex = request.validate();
if (ex != null) {
if (validationException == null) {
validationException = new ActionRequestValidationException();

View File

@ -27,14 +27,12 @@ import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.TransportDeleteAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.update.TransportUpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -211,7 +209,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
MetaData metaData = clusterState.metaData();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocumentRequest<?> documentRequest = bulkRequest.requests.get(i);
DocumentRequest documentRequest = bulkRequest.requests.get(i);
//the request can only be null because we set it to null in the previous step, so it gets ignored
if (documentRequest == null) {
continue;
@ -234,10 +232,8 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName());
break;
case UPDATE:
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest)documentRequest);
break;
case DELETE:
TransportDeleteAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (DeleteRequest)documentRequest);
TransportWriteAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), documentRequest);
break;
default: throw new AssertionError("request type not supported: [" + documentRequest.opType() + "]");
}

View File

@ -23,8 +23,12 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.DocumentRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.delete.TransportDeleteAction;
@ -32,11 +36,16 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
import org.elasticsearch.action.update.TransportUpdateAction;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.action.update.UpdateReplicaRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -57,8 +66,11 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.Translog.Location;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
@ -69,24 +81,29 @@ import static org.elasticsearch.action.support.replication.ReplicationOperation.
import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException;
/** Performs shard-level bulk (index, delete or update) operations */
public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardResponse> {
public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
public static final String ACTION_NAME = BulkAction.NAME + "[s]";
private final UpdateHelper updateHelper;
private final boolean allowIdGeneration;
private final MappingUpdatedAction mappingUpdatedAction;
private final UpdateHelper updateHelper;
private final AutoCreateIndex autoCreateIndex;
private final TransportCreateIndexAction createIndexAction;
@Inject
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, ScriptService scriptService,
AutoCreateIndex autoCreateIndex, TransportCreateIndexAction createIndexAction) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, BulkShardRequest::new, ThreadPool.Names.BULK);
this.updateHelper = updateHelper;
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.BULK);
this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true);
this.mappingUpdatedAction = mappingUpdatedAction;
this.updateHelper = new UpdateHelper(scriptService, logger);
this.autoCreateIndex = autoCreateIndex;
this.createIndexAction = createIndexAction;
}
@Override
@ -105,7 +122,39 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
@Override
protected WriteResult<BulkShardResponse> onPrimaryShard(BulkShardRequest request, IndexShard indexShard) throws Exception {
protected void doExecute(Task task, BulkShardRequest request, ActionListener<BulkShardResponse> listener) {
ClusterState state = clusterService.state();
if (autoCreateIndex.shouldAutoCreate(request.index(), state)) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index(request.index());
createIndexRequest.cause("auto(bulk api)");
createIndexRequest.masterNodeTimeout(request.timeout());
createIndexAction.execute(task, createIndexRequest, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
innerExecute(task, request, listener);
}
@Override
public void onFailure(Exception e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
// we have the index, do it
innerExecute(task, request, listener);
} else {
listener.onFailure(e);
}
}
});
} else {
innerExecute(task, request, listener);
}
}
private void innerExecute(Task task, final BulkShardRequest request, final ActionListener<BulkShardResponse> listener) {
super.doExecute(task, request, listener);
}
@Override
protected WriteResult<BulkShardRequest, BulkShardResponse> onPrimaryShard(BulkShardRequest request, IndexShard indexShard) throws Exception {
ShardId shardId = request.shardId();
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexMetaData metaData = indexService.getIndexSettings().getIndexMetaData();
@ -123,7 +172,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
responses[i] = items[i].getPrimaryResponse();
}
BulkShardResponse response = new BulkShardResponse(request.shardId(), responses);
return new WriteResult<>(response, location);
return new WriteResult<>(request, response, location);
}
/** Executes bulk item requests and handles request execution exceptions */
@ -131,22 +180,39 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
BulkShardRequest request,
long[] preVersions, VersionType[] preVersionTypes,
Translog.Location location, int requestIndex) {
preVersions[requestIndex] = request.items()[requestIndex].request().version();
preVersionTypes[requestIndex] = request.items()[requestIndex].request().versionType();
DocumentRequest.OpType opType = request.items()[requestIndex].request().opType();
DocumentRequest<?> itemRequest = request.items()[requestIndex].request();
preVersions[requestIndex] = itemRequest.version();
preVersionTypes[requestIndex] = itemRequest.versionType();
DocumentRequest.OpType opType = itemRequest.opType();
try {
WriteResult<? extends DocWriteResponse> writeResult = innerExecuteBulkItemRequest(metaData, indexShard,
request, requestIndex);
final WriteResult<? extends ReplicatedWriteRequest, ? extends DocWriteResponse> writeResult;
switch (itemRequest.opType()) {
case CREATE:
case INDEX:
writeResult = TransportIndexAction.executeIndexRequestOnPrimary(((IndexRequest) itemRequest), indexShard,
mappingUpdatedAction);
break;
case UPDATE:
writeResult = TransportUpdateAction.executeUpdateRequestOnPrimary(((UpdateRequest) itemRequest), indexShard,
metaData, updateHelper, mappingUpdatedAction, allowIdGeneration);
break;
case DELETE:
writeResult = TransportDeleteAction.executeDeleteRequestOnPrimary(((DeleteRequest) itemRequest), indexShard);
break;
default:
throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
}
if (writeResult.getLocation() != null) {
location = locationToSync(location, writeResult.getLocation());
} else {
assert writeResult.getResponse().getResult() == DocWriteResponse.Result.NOOP
: "only noop operation can have null next operation";
}
// update the bulk item request because update request execution can mutate the bulk item request
BulkItemRequest item = request.items()[requestIndex];
// update the bulk item request with replica request (update request are changed to index or delete requests for replication)
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(),
(DocumentRequest<?>) writeResult.getReplicaRequest());
// add the response
setResponse(item, new BulkItemResponse(item.id(), opType, writeResult.getResponse()));
setResponse(request.items()[requestIndex], new BulkItemResponse(request.items()[requestIndex].id(), opType, writeResult.getResponse()));
} catch (Exception e) {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
@ -182,33 +248,6 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return location;
}
private WriteResult<? extends DocWriteResponse> innerExecuteBulkItemRequest(IndexMetaData metaData, IndexShard indexShard,
BulkShardRequest request, int requestIndex) throws Exception {
DocumentRequest<?> itemRequest = request.items()[requestIndex].request();
switch (itemRequest.opType()) {
case CREATE:
case INDEX:
return TransportIndexAction.executeIndexRequestOnPrimary(((IndexRequest) itemRequest), indexShard, mappingUpdatedAction);
case UPDATE:
int maxAttempts = ((UpdateRequest) itemRequest).retryOnConflict();
for (int attemptCount = 0; attemptCount <= maxAttempts; attemptCount++) {
try {
return shardUpdateOperation(metaData, indexShard, request, requestIndex, ((UpdateRequest) itemRequest));
} catch (Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
if (attemptCount == maxAttempts // bubble up exception when we run out of attempts
|| (cause instanceof VersionConflictEngineException) == false) { // or when exception is not a version conflict
throw e;
}
}
}
throw new IllegalStateException("version conflict exception should bubble up on last attempt");
case DELETE:
return TransportDeleteAction.executeDeleteRequestOnPrimary(((DeleteRequest) itemRequest), indexShard);
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
}
}
private void setResponse(BulkItemRequest request, BulkItemResponse response) {
request.setPrimaryResponse(response);
if (response.isFailed()) {
@ -219,51 +258,6 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
}
/**
* Executes update request, doing a get and translating update to a index or delete operation
* NOTE: all operations except NOOP, reassigns the bulk item request
*/
private WriteResult<? extends DocWriteResponse> shardUpdateOperation(IndexMetaData metaData, IndexShard indexShard,
BulkShardRequest request,
int requestIndex, UpdateRequest updateRequest)
throws Exception {
// Todo: capture read version conflicts, missing documents and malformed script errors in the write result due to get request
UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard);
switch (translate.getResponseResult()) {
case CREATED:
case UPDATED:
IndexRequest indexRequest = translate.action();
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
indexRequest.process(mappingMd, allowIdGeneration, request.index());
WriteResult<IndexResponse> writeResult = TransportIndexAction.executeIndexRequestOnPrimary(indexRequest, indexShard, mappingUpdatedAction);
BytesReference indexSourceAsBytes = indexRequest.source();
IndexResponse indexResponse = writeResult.getResponse();
UpdateResponse writeUpdateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.getResult());
if (updateRequest.fields() != null && updateRequest.fields().length > 0) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
writeUpdateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
}
// Replace the update request to the translated index request to execute on the replica.
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest);
return new WriteResult<>(writeUpdateResponse, writeResult.getLocation());
case DELETED:
DeleteRequest deleteRequest = translate.action();
WriteResult<DeleteResponse> deleteResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard);
DeleteResponse response = deleteResult.getResponse();
UpdateResponse deleteUpdateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult());
deleteUpdateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), translate.updatedSourceAsMap(), translate.updateSourceContentType(), null));
// Replace the update request to the translated delete request to execute on the replica.
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);
return new WriteResult<>(deleteUpdateResponse, deleteResult.getLocation());
case NOOP:
BulkItemRequest item = request.items()[requestIndex];
indexShard.noopUpdate(updateRequest.type());
item.setIgnoreOnReplica(); // no need to go to the replica
return new WriteResult<>(translate.action(), null);
default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult());
}
}
@Override
protected Location onReplicaShard(BulkShardRequest request, IndexShard indexShard) {
Translog.Location location = null;
@ -272,7 +266,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
if (item == null || item.isIgnoreOnReplica()) {
continue;
}
DocumentRequest<?> documentRequest = item.request();
DocumentRequest documentRequest = (item.request() instanceof UpdateReplicaRequest)
? ((UpdateReplicaRequest) item.request()).getRequest() : item.request();
final Engine.Operation operation;
try {
switch (documentRequest.opType()) {

View File

@ -43,7 +43,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
* @see org.elasticsearch.client.Client#delete(DeleteRequest)
* @see org.elasticsearch.client.Requests#deleteRequest(String)
*/
public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest> implements DocumentRequest<DeleteRequest> {
public class DeleteRequest extends DocumentRequest<DeleteRequest> {
private String type;
private String id;

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.delete;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocumentRequest;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@ -49,7 +50,7 @@ import org.elasticsearch.transport.TransportService;
/**
* Performs the delete operation.
*/
public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, DeleteResponse> {
public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, DeleteRequest, DeleteResponse> {
private final AutoCreateIndex autoCreateIndex;
private final TransportCreateIndexAction createIndexAction;
@ -61,7 +62,7 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex) {
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, DeleteRequest::new, ThreadPool.Names.INDEX);
indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.INDEX);
this.createIndexAction = createIndexAction;
this.autoCreateIndex = autoCreateIndex;
}
@ -70,7 +71,11 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
protected void doExecute(Task task, final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
ClusterState state = clusterService.state();
if (autoCreateIndex.shouldAutoCreate(request.index(), state)) {
createIndexAction.execute(task, new CreateIndexRequest().index(request.index()).cause("auto(delete api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index(request.index());
createIndexRequest.cause("auto(delete api)");
createIndexRequest.masterNodeTimeout(request.timeout());
createIndexAction.execute(task, createIndexRequest, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
innerExecute(task, request, listener);
@ -100,15 +105,6 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
request.setShardId(shardId);
}
public static void resolveAndValidateRouting(final MetaData metaData, final String concreteIndex,
DeleteRequest request) {
request.routing(metaData.resolveIndexRouting(request.parent(), request.routing(), request.index()));
// check if routing is required, if so, throw error if routing wasn't specified
if (request.routing() == null && metaData.routingRequired(concreteIndex, request.type())) {
throw new RoutingMissingException(concreteIndex, request.type(), request.id());
}
}
private void innerExecute(Task task, final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
super.doExecute(task, request, listener);
}
@ -119,7 +115,7 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
}
@Override
protected WriteResult<DeleteResponse> onPrimaryShard(DeleteRequest request, IndexShard indexShard) {
protected WriteResult<DeleteRequest, DeleteResponse> onPrimaryShard(DeleteRequest request, IndexShard indexShard) {
return executeDeleteRequestOnPrimary(request, indexShard);
}
@ -128,7 +124,7 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
return executeDeleteRequestOnReplica(request, indexShard).getTranslogLocation();
}
public static WriteResult<DeleteResponse> executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard indexShard) {
public static WriteResult<DeleteRequest, DeleteResponse> executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard indexShard) {
Engine.Delete delete = indexShard.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
indexShard.delete(delete);
// update the request with the version so it will go to the replicas
@ -137,7 +133,7 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
assert request.versionType().validateVersionForWrites(request.version());
DeleteResponse response = new DeleteResponse(indexShard.shardId(), request.type(), request.id(), delete.version(), delete.found());
return new WriteResult<>(response, delete.getTranslogLocation());
return new WriteResult<>(request, response, delete.getTranslogLocation());
}
public static Engine.Delete executeDeleteRequestOnReplica(DeleteRequest request, IndexShard indexShard) {

View File

@ -67,7 +67,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
* @see org.elasticsearch.client.Requests#indexRequest(String)
* @see org.elasticsearch.client.Client#index(IndexRequest)
*/
public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implements DocumentRequest<IndexRequest> {
public class IndexRequest extends DocumentRequest<IndexRequest> {
private String type;
private String id;

View File

@ -60,7 +60,7 @@ import org.elasticsearch.transport.TransportService;
* <li><b>allowIdGeneration</b>: If the id is set not, should it be generated. Defaults to <tt>true</tt>.
* </ul>
*/
public class TransportIndexAction extends TransportWriteAction<IndexRequest, IndexResponse> {
public class TransportIndexAction extends TransportWriteAction<IndexRequest, IndexRequest, IndexResponse> {
private final AutoCreateIndex autoCreateIndex;
private final boolean allowIdGeneration;
@ -76,7 +76,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex) {
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver, IndexRequest::new, ThreadPool.Names.INDEX);
actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX);
this.mappingUpdatedAction = mappingUpdatedAction;
this.createIndexAction = createIndexAction;
this.autoCreateIndex = autoCreateIndex;
@ -140,7 +140,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
}
@Override
protected WriteResult<IndexResponse> onPrimaryShard(IndexRequest request, IndexShard indexShard) throws Exception {
protected WriteResult<IndexRequest, IndexResponse> onPrimaryShard(IndexRequest request, IndexShard indexShard) throws Exception {
return executeIndexRequestOnPrimary(request, indexShard, mappingUpdatedAction);
}
@ -174,7 +174,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
return indexShard.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
}
public static WriteResult<IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard,
public static WriteResult<IndexRequest, IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard,
MappingUpdatedAction mappingUpdatedAction) throws Exception {
Engine.Index operation = prepareIndexOperationOnPrimary(request, indexShard);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
@ -198,7 +198,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
assert request.versionType().validateVersionForWrites(request.version());
IndexResponse response = new IndexResponse(shardId, request.type(), request.id(), request.version(), operation.isCreated());
return new WriteResult<>(response, operation.getTranslogLocation());
return new WriteResult<>(request, response, operation.getTranslogLocation());
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.engine.VersionConflictEngineException;
@ -112,6 +113,7 @@ public class ReplicationOperation<
pendingActions.incrementAndGet();
primaryResult = primary.perform(request);
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term";
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
@ -127,6 +129,7 @@ public class ReplicationOperation<
markUnavailableShardsAsStale(replicaRequest, inSyncAllocationIds, shards);
performOnReplicas(replicaRequest, shards);
}
successfulShards.incrementAndGet();
decPendingAndFinishIfNeeded();
@ -419,7 +422,10 @@ public class ReplicationOperation<
public interface PrimaryResult<R extends ReplicationRequest<R>> {
R replicaRequest();
/**
* @return null if no operation needs to be sent to a replica
*/
@Nullable R replicaRequest();
void setShardInfo(ReplicationResponse.ShardInfo shardInfo);
}

View File

@ -23,6 +23,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.DocumentRequest;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
@ -163,6 +165,16 @@ public abstract class TransportReplicationAction<
}
}
/** helper to verify and resolve request routing */
public static void resolveAndValidateRouting(final MetaData metaData, final String concreteIndex,
DocumentRequest request) {
request.routing(metaData.resolveIndexRouting(request.parent(), request.routing(), request.index()));
// check if routing is required, if so, throw error if routing wasn't specified
if (request.routing() == null && metaData.routingRequired(concreteIndex, request.type())) {
throw new RoutingMissingException(concreteIndex, request.type(), request.id());
}
}
/**
* Primary operation on node with primary copy.
*
@ -900,7 +912,9 @@ public abstract class TransportReplicationAction<
@Override
public PrimaryResult perform(Request request) throws Exception {
PrimaryResult result = shardOperationOnPrimary(request);
if (result.replicaRequest() != null) {
result.replicaRequest().primaryTerm(indexShard.getPrimaryTerm());
}
return result;
}

View File

@ -49,38 +49,40 @@ import java.util.function.Supplier;
*/
public abstract class TransportWriteAction<
Request extends ReplicatedWriteRequest<Request>,
ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>,
Response extends ReplicationResponse & WriteResponse
> extends TransportReplicationAction<Request, Request, Response> {
> extends TransportReplicationAction<Request, ReplicaRequest, Response> {
protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest,
String executor) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, request, executor);
indexNameExpressionResolver, request, replicaRequest, executor);
}
/**
* Called on the primary with a reference to the {@linkplain IndexShard} to modify.
*/
protected abstract WriteResult<Response> onPrimaryShard(Request request, IndexShard indexShard) throws Exception;
protected abstract WriteResult<ReplicaRequest, Response> onPrimaryShard(Request request, IndexShard indexShard) throws Exception;
/**
* Called once per replica with a reference to the {@linkplain IndexShard} to modify.
*
* @return the translog location of the {@linkplain IndexShard} after the write was completed or null if no write occurred
*/
protected abstract Translog.Location onReplicaShard(Request request, IndexShard indexShard);
protected abstract Translog.Location onReplicaShard(ReplicaRequest request, IndexShard indexShard);
@Override
protected final WritePrimaryResult shardOperationOnPrimary(Request request) throws Exception {
IndexShard indexShard = indexShard(request);
WriteResult<Response> result = onPrimaryShard(request, indexShard);
return new WritePrimaryResult(request, result.getResponse(), result.getLocation(), indexShard);
WriteResult<ReplicaRequest, Response> result = onPrimaryShard(request, indexShard);
return new WritePrimaryResult(request, result, indexShard);
}
@Override
protected final WriteReplicaResult shardOperationOnReplica(Request request) {
protected final WriteReplicaResult shardOperationOnReplica(ReplicaRequest request) {
IndexShard indexShard = indexShard(request);
Translog.Location location = onReplicaShard(request, indexShard);
return new WriteReplicaResult(indexShard, request, location);
@ -89,7 +91,7 @@ public abstract class TransportWriteAction<
/**
* Fetch the IndexShard for the request. Protected so it can be mocked in tests.
*/
protected IndexShard indexShard(Request request) {
protected IndexShard indexShard(ReplicatedWriteRequest request) {
final ShardId shardId = request.shardId();
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
return indexService.getShard(shardId.id());
@ -98,11 +100,13 @@ public abstract class TransportWriteAction<
/**
* Simple result from a write action. Write actions have static method to return these so they can integrate with bulk.
*/
public static class WriteResult<Response extends ReplicationResponse> {
public static class WriteResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>, Response extends ReplicationResponse> {
private final ReplicaRequest replicaRequest;
private final Response response;
private final Translog.Location location;
public WriteResult(Response response, @Nullable Location location) {
public WriteResult(ReplicaRequest replicaRequest, Response response, @Nullable Location location) {
this.replicaRequest = replicaRequest;
this.response = response;
this.location = location;
}
@ -114,6 +118,10 @@ public abstract class TransportWriteAction<
public Translog.Location getLocation() {
return location;
}
public ReplicaRequest getReplicaRequest() {
return replicaRequest;
}
}
/**
@ -123,15 +131,15 @@ public abstract class TransportWriteAction<
boolean finishedAsyncActions;
ActionListener<Response> listener = null;
public WritePrimaryResult(Request request, Response finalResponse,
@Nullable Translog.Location location,
public WritePrimaryResult(Request request,
WriteResult<ReplicaRequest, Response> result,
IndexShard indexShard) {
super(request, finalResponse);
super(result.getReplicaRequest(), result.getResponse());
/*
* We call this before replication because this might wait for a refresh and that can take a while. This way we wait for the
* refresh in parallel on the primary and on the replica.
*/
new AsyncAfterWriteAction(indexShard, request, location, this, logger).run();
new AsyncAfterWriteAction(indexShard, request, result.getLocation(), this, logger).run();
}
@Override

View File

@ -1,138 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.single.instance;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
*
*/
public abstract class InstanceShardOperationRequest<Request extends InstanceShardOperationRequest<Request>> extends ActionRequest<Request>
implements IndicesRequest {
public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES);
protected TimeValue timeout = DEFAULT_TIMEOUT;
protected String index;
// null means its not set, allows to explicitly direct a request to a specific shard
protected ShardId shardId = null;
private String concreteIndex;
protected InstanceShardOperationRequest() {
}
public InstanceShardOperationRequest(String index) {
this.index = index;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (index == null) {
validationException = ValidateActions.addValidationError("index is missing", validationException);
}
return validationException;
}
public String index() {
return index;
}
@Override
public String[] indices() {
return new String[]{index};
}
@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}
@SuppressWarnings("unchecked")
public final Request index(String index) {
this.index = index;
return (Request) this;
}
public TimeValue timeout() {
return timeout;
}
/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
@SuppressWarnings("unchecked")
public final Request timeout(TimeValue timeout) {
this.timeout = timeout;
return (Request) this;
}
/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
public final Request timeout(String timeout) {
return timeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".timeout"));
}
public String concreteIndex() {
return concreteIndex;
}
void concreteIndex(String concreteIndex) {
this.concreteIndex = concreteIndex;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readString();
if (in.readBoolean()) {
shardId = ShardId.readShardId(in);
} else {
shardId = null;
}
timeout = new TimeValue(in);
concreteIndex = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeOptionalStreamable(shardId);
timeout.writeTo(out);
out.writeOptionalString(concreteIndex);
}
}

View File

@ -1,60 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.single.instance;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.unit.TimeValue;
/**
*/
public abstract class InstanceShardOperationRequestBuilder<Request extends InstanceShardOperationRequest<Request>, Response extends ActionResponse, RequestBuilder extends InstanceShardOperationRequestBuilder<Request, Response, RequestBuilder>>
extends ActionRequestBuilder<Request, Response, RequestBuilder> {
protected InstanceShardOperationRequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action, Request request) {
super(client, action, request);
}
@SuppressWarnings("unchecked")
public final RequestBuilder setIndex(String index) {
request.index(index);
return (RequestBuilder) this;
}
/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
@SuppressWarnings("unchecked")
public final RequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return (RequestBuilder) this;
}
/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
@SuppressWarnings("unchecked")
public final RequestBuilder setTimeout(String timeout) {
request.timeout(timeout);
return (RequestBuilder) this;
}
}

View File

@ -1,270 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.single.instance;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.util.function.Supplier;
/**
*
*/
public abstract class TransportInstanceSingleOperationAction<Request extends InstanceShardOperationRequest<Request>, Response extends ActionResponse>
extends HandledTransportAction<Request, Response> {
protected final ClusterService clusterService;
protected final TransportService transportService;
final String executor;
final String shardActionName;
protected TransportInstanceSingleOperationAction(Settings settings, String actionName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
this.clusterService = clusterService;
this.transportService = transportService;
this.executor = executor();
this.shardActionName = actionName + "[s]";
transportService.registerRequestHandler(shardActionName, request, executor, new ShardTransportHandler());
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
new AsyncSingleAction(request, listener).start();
}
protected abstract String executor();
protected abstract void shardOperation(Request request, ActionListener<Response> listener);
protected abstract Response newResponse();
protected ClusterBlockException checkGlobalBlock(ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
}
protected ClusterBlockException checkRequestBlock(ClusterState state, Request request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.concreteIndex());
}
/**
* Resolves the request. Throws an exception if the request cannot be resolved.
*/
protected abstract void resolveRequest(ClusterState state, Request request);
protected boolean retryOnFailure(Exception e) {
return false;
}
protected TransportRequestOptions transportOptions() {
return TransportRequestOptions.EMPTY;
}
/**
* Should return an iterator with a single shard!
*/
protected abstract ShardIterator shards(ClusterState clusterState, Request request);
class AsyncSingleAction {
private final ActionListener<Response> listener;
private final Request request;
private volatile ClusterStateObserver observer;
private ShardIterator shardIt;
private DiscoveryNodes nodes;
AsyncSingleAction(Request request, ActionListener<Response> listener) {
this.request = request;
this.listener = listener;
}
public void start() {
this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
doStart();
}
protected void doStart() {
nodes = observer.observedState().nodes();
try {
ClusterBlockException blockException = checkGlobalBlock(observer.observedState());
if (blockException != null) {
if (blockException.retryable()) {
retry(blockException);
return;
} else {
throw blockException;
}
}
request.concreteIndex(indexNameExpressionResolver.concreteSingleIndex(observer.observedState(), request).getName());
resolveRequest(observer.observedState(), request);
blockException = checkRequestBlock(observer.observedState(), request);
if (blockException != null) {
if (blockException.retryable()) {
retry(blockException);
return;
} else {
throw blockException;
}
}
shardIt = shards(observer.observedState(), request);
} catch (Exception e) {
listener.onFailure(e);
return;
}
// no shardIt, might be in the case between index gateway recovery and shardIt initialization
if (shardIt.size() == 0) {
retry(null);
return;
}
// this transport only make sense with an iterator that returns a single shard routing (like primary)
assert shardIt.size() == 1;
ShardRouting shard = shardIt.nextOrNull();
assert shard != null;
if (!shard.active()) {
retry(null);
return;
}
request.shardId = shardIt.shardId();
DiscoveryNode node = nodes.get(shard.currentNodeId());
transportService.sendRequest(node, shardActionName, request, transportOptions(), new TransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return newResponse();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(Response response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
final Throwable cause = exp.unwrapCause();
// if we got disconnected from the node, or the node / shard is not in the right state (being closed)
if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException ||
retryOnFailure(exp)) {
retry((Exception) cause);
} else {
listener.onFailure(exp);
}
}
});
}
void retry(@Nullable final Exception failure) {
if (observer.isTimedOut()) {
// we running as a last attempt after a timeout has happened. don't retry
Exception listenFailure = failure;
if (listenFailure == null) {
if (shardIt == null) {
listenFailure = new UnavailableShardsException(request.concreteIndex(), -1, "Timeout waiting for [{}], request: {}", request.timeout(), actionName);
} else {
listenFailure = new UnavailableShardsException(shardIt.shardId(), "[{}] shardIt, [{}] active : Timeout waiting for [{}], request: {}", shardIt.size(), shardIt.sizeActive(), request.timeout(), actionName);
}
}
listener.onFailure(listenFailure);
return;
}
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
doStart();
}
@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(nodes.getLocalNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
// just to be on the safe side, see if we can start it now?
doStart();
}
}, request.timeout());
}
}
private class ShardTransportHandler implements TransportRequestHandler<Request> {
@Override
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
shardOperation(request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
channel.sendResponse(response);
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn("failed to send response for get", inner);
}
}
});
}
}
}

View File

@ -21,7 +21,7 @@ package org.elasticsearch.action.update;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@ -34,19 +34,17 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.single.instance.TransportInstanceSingleOperationAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
@ -54,59 +52,52 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Collections;
import java.util.Map;
import static org.elasticsearch.ExceptionsHelper.unwrapCause;
/**
*/
public class TransportUpdateAction extends TransportInstanceSingleOperationAction<UpdateRequest, UpdateResponse> {
public class TransportUpdateAction extends TransportWriteAction<UpdateRequest, UpdateReplicaRequest, UpdateResponse> {
private final TransportDeleteAction deleteAction;
private final TransportIndexAction indexAction;
private final AutoCreateIndex autoCreateIndex;
private final TransportCreateIndexAction createIndexAction;
private final UpdateHelper updateHelper;
private final IndicesService indicesService;
private final MappingUpdatedAction mappingUpdatedAction;
private final boolean allowIdGeneration;
@Inject
public TransportUpdateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
TransportIndexAction indexAction, TransportDeleteAction deleteAction, TransportCreateIndexAction createIndexAction,
UpdateHelper updateHelper, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
IndicesService indicesService, AutoCreateIndex autoCreateIndex) {
super(settings, UpdateAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, UpdateRequest::new);
this.indexAction = indexAction;
this.deleteAction = deleteAction;
TransportCreateIndexAction createIndexAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, IndicesService indicesService,
AutoCreateIndex autoCreateIndex, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, ScriptService scriptService) {
super(settings, UpdateAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver, UpdateRequest::new, UpdateReplicaRequest::new, ThreadPool.Names.INDEX);
this.createIndexAction = createIndexAction;
this.updateHelper = updateHelper;
this.updateHelper = new UpdateHelper(scriptService, logger);
this.indicesService = indicesService;
this.autoCreateIndex = autoCreateIndex;
this.mappingUpdatedAction = mappingUpdatedAction;
this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true);
}
@Override
protected String executor() {
return ThreadPool.Names.INDEX;
}
@Override
protected UpdateResponse newResponse() {
return new UpdateResponse();
}
@Override
protected boolean retryOnFailure(Exception e) {
return TransportActions.isShardNotAvailableException(e);
}
@Override
protected void resolveRequest(ClusterState state, UpdateRequest request) {
resolveAndValidateRouting(state.metaData(), request.concreteIndex(), request);
protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, UpdateRequest request) {
super.resolveRequest(metaData, indexMetaData, request);
resolveAndValidateRouting(metaData, indexMetaData.getIndex().getName(), request);
ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(),
indexMetaData.getIndex().getName(), request.id(), request.routing());
request.setShardId(shardId);
}
public static void resolveAndValidateRouting(MetaData metaData, String concreteIndex, UpdateRequest request) {
@ -118,13 +109,17 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
}
@Override
protected void doExecute(final UpdateRequest request, final ActionListener<UpdateResponse> listener) {
protected void doExecute(Task task, UpdateRequest request, ActionListener<UpdateResponse> listener) {
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
createIndexAction.execute(new CreateIndexRequest().index(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index(request.index());
createIndexRequest.cause("auto(update api)");
createIndexRequest.masterNodeTimeout(request.timeout());
createIndexAction.execute(createIndexRequest, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
innerExecute(request, listener);
innerExecute(task, request, listener);
}
@Override
@ -132,7 +127,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
if (unwrapCause(e) instanceof IndexAlreadyExistsException) {
// we have the index, do it
try {
innerExecute(request, listener);
innerExecute(task, request, listener);
} catch (Exception inner) {
inner.addSuppressed(e);
listener.onFailure(inner);
@ -143,153 +138,123 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
}
});
} else {
innerExecute(request, listener);
innerExecute(task, request, listener);
}
}
private void innerExecute(final UpdateRequest request, final ActionListener<UpdateResponse> listener) {
super.doExecute(request, listener);
}
@Override
protected ShardIterator shards(ClusterState clusterState, UpdateRequest request) {
if (request.getShardId() != null) {
return clusterState.routingTable().index(request.concreteIndex()).shard(request.getShardId().getId()).primaryShardIt();
protected UpdateResponse newResponseInstance() {
return new UpdateResponse();
}
ShardIterator shardIterator = clusterService.operationRouting()
.indexShards(clusterState, request.concreteIndex(), request.id(), request.routing());
ShardRouting shard;
while ((shard = shardIterator.nextOrNull()) != null) {
if (shard.primary()) {
return new PlainShardIterator(shardIterator.shardId(), Collections.singletonList(shard));
}
}
return new PlainShardIterator(shardIterator.shardId(), Collections.<ShardRouting>emptyList());
private void innerExecute(Task task, final UpdateRequest request, final ActionListener<UpdateResponse> listener) {
super.doExecute(task, request, listener);
}
@Override
protected void shardOperation(final UpdateRequest request, final ActionListener<UpdateResponse> listener) {
shardOperation(request, listener, 0);
}
protected void shardOperation(final UpdateRequest request, final ActionListener<UpdateResponse> listener, final int retryCount) {
final ShardId shardId = request.getShardId();
protected WriteResult<UpdateReplicaRequest, UpdateResponse> onPrimaryShard(UpdateRequest request, IndexShard indexShard) throws Exception {
ShardId shardId = request.shardId();
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.getId());
final IndexMetaData indexMetaData = indexService.getMetaData();
return executeUpdateRequestOnPrimary(request, indexShard, indexMetaData, updateHelper, mappingUpdatedAction, allowIdGeneration);
}
public static WriteResult<UpdateReplicaRequest, UpdateResponse> executeUpdateRequestOnPrimary(UpdateRequest request,
IndexShard indexShard,
IndexMetaData indexMetaData,
UpdateHelper updateHelper,
MappingUpdatedAction mappingUpdatedAction,
boolean allowIdGeneration)
throws Exception {
int maxAttempts = request.retryOnConflict();
for (int attemptCount = 0; attemptCount <= maxAttempts; attemptCount++) {
try {
return shardUpdateOperation(indexMetaData, indexShard, request, updateHelper, mappingUpdatedAction, allowIdGeneration);
} catch (Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
if (attemptCount == maxAttempts // bubble up exception when we run out of attempts
|| (cause instanceof VersionConflictEngineException) == false) { // or when exception is not a version conflict
throw e;
}
}
}
throw new IllegalStateException("version conflict exception should bubble up on last attempt");
}
private static WriteResult<UpdateReplicaRequest, UpdateResponse> shardUpdateOperation(IndexMetaData indexMetaData,
IndexShard indexShard,
UpdateRequest request,
UpdateHelper updateHelper,
MappingUpdatedAction mappingUpdatedAction,
boolean allowIdGeneration)
throws Exception {
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard);
switch (result.getResponseResult()) {
case CREATED:
IndexRequest upsertRequest = result.action();
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference upsertSourceBytes = upsertRequest.source();
indexAction.execute(upsertRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
case UPDATED:
IndexRequest indexRequest = result.action();
MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
indexRequest.process(mappingMd, allowIdGeneration, indexMetaData.getIndex().getName());
WriteResult<IndexRequest, IndexResponse> indexResponseWriteResult = TransportIndexAction.executeIndexRequestOnPrimary(indexRequest, indexShard, mappingUpdatedAction);
IndexResponse response = indexResponseWriteResult.getResponse();
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult());
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference indexSourceBytes = indexRequest.source();
if (result.getResponseResult() == DocWriteResponse.Result.CREATED) {
if ((request.fetchSource() != null && request.fetchSource().fetchSource()) ||
(request.fields() != null && request.fields().length > 0)) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(upsertSourceBytes, true);
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes));
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceBytes, true);
update.setGetResult(updateHelper.extractGetResult(request, indexMetaData.getIndex().getName(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceBytes));
} else {
update.setGetResult(null);
}
} else if (result.getResponseResult() == DocWriteResponse.Result.UPDATED) {
update.setGetResult(updateHelper.extractGetResult(request, indexMetaData.getIndex().getName(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
}
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}
@Override
public void onFailure(Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof VersionConflictEngineException) {
if (retryCount < request.retryOnConflict()) {
logger.trace("Retry attempt [{}] of [{}] on version conflict on [{}][{}][{}]",
retryCount + 1, request.retryOnConflict(), request.index(), request.getShardId(), request.id());
threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) {
@Override
protected void doRun() {
shardOperation(request, listener, retryCount + 1);
}
});
return;
}
}
listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause));
}
});
break;
case UPDATED:
IndexRequest indexRequest = result.action();
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference indexSourceBytes = indexRequest.source();
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult());
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}
@Override
public void onFailure(Exception e) {
final Throwable cause = unwrapCause(e);
if (cause instanceof VersionConflictEngineException) {
if (retryCount < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) {
@Override
protected void doRun() {
shardOperation(request, listener, retryCount + 1);
}
});
return;
}
}
listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause));
}
});
break;
UpdateReplicaRequest updateReplicaRequest = new UpdateReplicaRequest(indexRequest);
updateReplicaRequest.setParentTask(request.getParentTask());
updateReplicaRequest.setShardId(request.shardId());
updateReplicaRequest.setRefreshPolicy(request.getRefreshPolicy());
return new WriteResult<>(updateReplicaRequest, update, indexResponseWriteResult.getLocation());
case DELETED:
DeleteRequest deleteRequest = result.action();
deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse response) {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult());
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}
@Override
public void onFailure(Exception e) {
final Throwable cause = unwrapCause(e);
if (cause instanceof VersionConflictEngineException) {
if (retryCount < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) {
@Override
protected void doRun() {
shardOperation(request, listener, retryCount + 1);
}
});
return;
}
}
listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause));
}
});
break;
WriteResult<DeleteRequest, DeleteResponse> deleteResponseWriteResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard);
DeleteResponse deleteResponse = deleteResponseWriteResult.getResponse();
UpdateResponse deleteUpdate = new UpdateResponse(deleteResponse.getShardInfo(), deleteResponse.getShardId(), deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getVersion(), deleteResponse.getResult());
deleteUpdate.setGetResult(updateHelper.extractGetResult(request, indexMetaData.getIndex().getName(), deleteResponse.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));
deleteUpdate.setForcedRefresh(deleteResponse.forcedRefresh());
UpdateReplicaRequest deleteReplicaRequest = new UpdateReplicaRequest(deleteRequest);
deleteReplicaRequest.setParentTask(request.getParentTask());
deleteReplicaRequest.setShardId(request.shardId());
deleteReplicaRequest.setRefreshPolicy(request.getRefreshPolicy());
return new WriteResult<>(deleteReplicaRequest, deleteUpdate, deleteResponseWriteResult.getLocation());
case NOOP:
UpdateResponse update = result.action();
IndexService indexServiceOrNull = indicesService.indexService(shardId.getIndex());
if (indexServiceOrNull != null) {
IndexShard shard = indexService.getShardOrNull(shardId.getId());
if (shard != null) {
shard.noopUpdate(request.type());
}
}
listener.onResponse(update);
break;
UpdateResponse noopUpdate = result.action();
indexShard.noopUpdate(request.type());
return new WriteResult<>(null, noopUpdate, null);
default:
throw new IllegalStateException("Illegal result " + result.getResponseResult());
}
}
@Override
protected Translog.Location onReplicaShard(UpdateReplicaRequest request, IndexShard indexShard) {
assert request.getRequest() != null;
final Translog.Location location;
switch (request.getRequest().opType()) {
case INDEX:
case CREATE:
location = TransportIndexAction.executeIndexRequestOnReplica(((IndexRequest) request.getRequest()), indexShard).getTranslogLocation();
break;
case DELETE:
location = TransportDeleteAction.executeDeleteRequestOnReplica(((DeleteRequest) request.getRequest()), indexShard).getTranslogLocation();
break;
default:
throw new IllegalStateException("unexpected opType [" + request.getRequest().opType().getLowercase() + "]");
}
return location;
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.update;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
@ -27,11 +28,8 @@ import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
@ -63,14 +61,14 @@ import java.util.Map;
/**
* Helper for translating an update request to an index, delete request or update response.
*/
public class UpdateHelper extends AbstractComponent {
public class UpdateHelper {
private final ScriptService scriptService;
private final Logger logger;
@Inject
public UpdateHelper(Settings settings, ScriptService scriptService) {
super(settings);
public UpdateHelper(ScriptService scriptService, Logger logger) {
this.scriptService = scriptService;
this.logger = logger;
}
/**
@ -259,7 +257,7 @@ public class UpdateHelper extends AbstractComponent {
return ctx;
}
private TimeValue getTTLFromScriptContext(Map<String, Object> ctx) {
private static TimeValue getTTLFromScriptContext(Map<String, Object> ctx) {
Object fetchedTTL = ctx.get("_ttl");
if (fetchedTTL != null) {
if (fetchedTTL instanceof Number) {

View File

@ -0,0 +1,113 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.update;
import org.elasticsearch.action.DocumentRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.VersionType;
import java.io.IOException;
/** Replica request for update operation holds translated (index/delete) requests */
public class UpdateReplicaRequest extends DocumentRequest<UpdateReplicaRequest> {
private DocumentRequest<?> request;
public UpdateReplicaRequest() {
}
public UpdateReplicaRequest(DocumentRequest<?> request) {
assert !(request instanceof UpdateReplicaRequest) : "underlying request must not be a update replica request";
this.request = request;
this.index = request.index();
setRefreshPolicy(request.getRefreshPolicy());
setShardId(request.shardId());
setParentTask(request.getParentTask());
}
public DocumentRequest<?> getRequest() {
return request;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
request = DocumentRequest.readDocumentRequest(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
DocumentRequest.writeDocumentRequest(out, request);
}
@Override
public String type() {
return request.type();
}
@Override
public String id() {
return request.id();
}
@Override
public UpdateReplicaRequest routing(String routing) {
throw new UnsupportedOperationException("setting routing is not supported");
}
@Override
public String routing() {
return request.routing();
}
@Override
public String parent() {
return request.parent();
}
@Override
public long version() {
return request.version();
}
@Override
public UpdateReplicaRequest version(long version) {
throw new UnsupportedOperationException("setting version is not supported");
}
@Override
public VersionType versionType() {
return request.versionType();
}
@Override
public UpdateReplicaRequest versionType(VersionType versionType) {
throw new UnsupportedOperationException("setting version type is not supported");
}
@Override
public OpType opType() {
return request.opType();
}
}

View File

@ -23,9 +23,6 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.DocumentRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.bytes.BytesArray;
@ -56,10 +53,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
*/
public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
implements DocumentRequest<UpdateRequest>, WriteRequest<UpdateRequest> {
private static final DeprecationLogger DEPRECATION_LOGGER =
new DeprecationLogger(Loggers.getLogger(UpdateRequest.class));
public class UpdateRequest extends DocumentRequest<UpdateRequest> {
private String type;
private String id;
@ -97,7 +91,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
}
public UpdateRequest(String index, String type, String id) {
super(index);
this.index = index;
this.type = type;
this.id = id;
}
@ -495,39 +489,6 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
return OpType.UPDATE;
}
@Override
public UpdateRequest setRefreshPolicy(RefreshPolicy refreshPolicy) {
this.refreshPolicy = refreshPolicy;
return this;
}
@Override
public RefreshPolicy getRefreshPolicy() {
return refreshPolicy;
}
public ActiveShardCount waitForActiveShards() {
return this.waitForActiveShards;
}
/**
* Sets the number of shard copies that must be active before proceeding with the write.
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
*/
public UpdateRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}
/**
* A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public UpdateRequest waitForActiveShards(final int waitForActiveShards) {
return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}
/**
* Sets the doc to use for updates when a script is not specified.
*/

View File

@ -23,7 +23,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequestBuilder;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequestBuilder;
import org.elasticsearch.action.support.replication.ReplicationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.DeprecationLogger;
@ -37,7 +37,7 @@ import org.elasticsearch.script.Script;
import java.util.Map;
public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder<UpdateRequest, UpdateResponse, UpdateRequestBuilder>
public class UpdateRequestBuilder extends ReplicationRequestBuilder<UpdateRequest, UpdateResponse, UpdateRequestBuilder>
implements WriteRequestBuilder<UpdateRequestBuilder> {
private static final DeprecationLogger DEPRECATION_LOGGER =
new DeprecationLogger(Loggers.getLogger(RestUpdateAction.class));

View File

@ -22,7 +22,6 @@ package org.elasticsearch.indices;
import org.elasticsearch.action.admin.indices.rollover.Condition;
import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.common.geo.ShapesAvailability;
import org.elasticsearch.common.inject.AbstractModule;
@ -182,7 +181,6 @@ public class IndicesModule extends AbstractModule {
bind(SyncedFlushService.class).asEagerSingleton();
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
bind(IndicesTTLService.class).asEagerSingleton();
bind(UpdateHelper.class).asEagerSingleton();
bind(MetaDataIndexUpgradeService.class).asEagerSingleton();
bind(NodeServicesProvider.class).asEagerSingleton();
}

View File

@ -222,8 +222,7 @@ public class IndicesRequestIT extends ESIntegTestCase {
}
public void testUpdate() {
//update action goes to the primary, index op gets executed locally, then replicated
String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[r]"};
String[] updateShardActions = new String[]{UpdateAction.NAME + "[p]", UpdateAction.NAME + "[r]"};
interceptTransportActions(updateShardActions);
String indexOrAlias = randomIndexOrAlias();
@ -237,8 +236,7 @@ public class IndicesRequestIT extends ESIntegTestCase {
}
public void testUpdateUpsert() {
//update action goes to the primary, index op gets executed locally, then replicated
String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[r]"};
String[] updateShardActions = new String[]{UpdateAction.NAME + "[p]", UpdateAction.NAME + "[r]"};
interceptTransportActions(updateShardActions);
String indexOrAlias = randomIndexOrAlias();
@ -251,8 +249,7 @@ public class IndicesRequestIT extends ESIntegTestCase {
}
public void testUpdateDelete() {
//update action goes to the primary, delete op gets executed locally, then replicated
String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", DeleteAction.NAME + "[r]"};
String[] updateShardActions = new String[]{UpdateAction.NAME + "[p]", UpdateAction.NAME + "[r]"};
interceptTransportActions(updateShardActions);
String indexOrAlias = randomIndexOrAlias();

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.support.replication;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.support.WriteResponse;
@ -128,21 +129,21 @@ public class TransportWriteActionTests extends ESTestCase {
resultChecker.accept(listener.response, forcedRefresh);
}
private class TestAction extends TransportWriteAction<TestRequest, TestResponse> {
private class TestAction extends TransportWriteAction<TestRequest, TestRequest, TestResponse> {
protected TestAction() {
super(Settings.EMPTY, "test", new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR),
null, null, null, null, new ActionFilters(new HashSet<>()),
new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new, ThreadPool.Names.SAME);
new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME);
}
@Override
protected IndexShard indexShard(TestRequest request) {
protected IndexShard indexShard(ReplicatedWriteRequest request) {
return indexShard;
}
@Override
protected WriteResult<TestResponse> onPrimaryShard(TestRequest request, IndexShard indexShard) throws Exception {
return new WriteResult<>(new TestResponse(), location);
protected WriteResult<TestRequest, TestResponse> onPrimaryShard(TestRequest request, IndexShard indexShard) throws Exception {
return new WriteResult<>(request, new TestResponse(), location);
}
@Override

View File

@ -1,327 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.single.instance;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.core.IsEqual.equalTo;
public class TransportInstanceSingleOperationActionTests extends ESTestCase {
private static ThreadPool THREAD_POOL;
private ClusterService clusterService;
private CapturingTransport transport;
private TransportService transportService;
private TestTransportInstanceSingleOperationAction action;
public static class Request extends InstanceShardOperationRequest<Request> {
public Request() {
}
}
public static class Response extends ActionResponse {
public Response() {
}
}
class TestTransportInstanceSingleOperationAction extends TransportInstanceSingleOperationAction<Request, Response> {
private final Map<ShardId, Object> shards = new HashMap<>();
public TestTransportInstanceSingleOperationAction(Settings settings, String actionName, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
super(settings, actionName, THREAD_POOL, TransportInstanceSingleOperationActionTests.this.clusterService, transportService, actionFilters, indexNameExpressionResolver, request);
}
public Map<ShardId, Object> getResults() {
return shards;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected void shardOperation(Request request, ActionListener<Response> listener) {
throw new UnsupportedOperationException("Not implemented in test class");
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected void resolveRequest(ClusterState state, Request request) {
}
@Override
protected ShardIterator shards(ClusterState clusterState, Request request) {
return clusterState.routingTable().index(request.concreteIndex()).shard(request.shardId.getId()).primaryShardIt();
}
}
class MyResolver extends IndexNameExpressionResolver {
public MyResolver() {
super(Settings.EMPTY);
}
@Override
public String[] concreteIndexNames(ClusterState state, IndicesRequest request) {
return request.indices();
}
}
@BeforeClass
public static void startThreadPool() {
THREAD_POOL = new TestThreadPool(TransportInstanceSingleOperationActionTests.class.getSimpleName());
}
@Before
public void setUp() throws Exception {
super.setUp();
transport = new CapturingTransport();
clusterService = createClusterService(THREAD_POOL);
transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
transportService.start();
transportService.acceptIncomingRequests();
action = new TestTransportInstanceSingleOperationAction(
Settings.EMPTY,
"indices:admin/test",
transportService,
new ActionFilters(new HashSet<ActionFilter>()),
new MyResolver(),
Request::new
);
}
@After
public void tearDown() throws Exception {
super.tearDown();
clusterService.close();
transportService.close();
}
@AfterClass
public static void destroyThreadPool() {
ThreadPool.terminate(THREAD_POOL, 30, TimeUnit.SECONDS);
// since static must set to null to be eligible for collection
THREAD_POOL = null;
}
public void testGlobalBlock() {
Request request = new Request();
PlainActionFuture<Response> listener = new PlainActionFuture<>();
ClusterBlocks.Builder block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
try {
action.new AsyncSingleAction(request, listener).start();
listener.get();
fail("expected ClusterBlockException");
} catch (Exception e) {
if (ExceptionsHelper.unwrap(e, ClusterBlockException.class) == null) {
logger.info("expected ClusterBlockException but got ", e);
fail("expected ClusterBlockException");
}
}
}
public void testBasicRequestWorks() throws InterruptedException, ExecutionException, TimeoutException {
Request request = new Request().index("test");
request.shardId = new ShardId("test", "_na_", 0);
PlainActionFuture<Response> listener = new PlainActionFuture<>();
setState(clusterService, ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED));
action.new AsyncSingleAction(request, listener).start();
assertThat(transport.capturedRequests().length, equalTo(1));
transport.handleResponse(transport.capturedRequests()[0].requestId, new Response());
listener.get();
}
public void testFailureWithoutRetry() throws Exception {
Request request = new Request().index("test");
request.shardId = new ShardId("test", "_na_", 0);
PlainActionFuture<Response> listener = new PlainActionFuture<>();
setState(clusterService, ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED));
action.new AsyncSingleAction(request, listener).start();
assertThat(transport.capturedRequests().length, equalTo(1));
long requestId = transport.capturedRequests()[0].requestId;
transport.clear();
// this should not trigger retry or anything and the listener should report exception immediately
transport.handleRemoteError(requestId, new TransportException("a generic transport exception", new Exception("generic test exception")));
try {
// result should return immediately
assertTrue(listener.isDone());
listener.get();
fail("this should fail with a transport exception");
} catch (ExecutionException t) {
if (ExceptionsHelper.unwrap(t, TransportException.class) == null) {
logger.info("expected TransportException but got ", t);
fail("expected and TransportException");
}
}
}
public void testSuccessAfterRetryWithClusterStateUpdate() throws Exception {
Request request = new Request().index("test");
request.shardId = new ShardId("test", "_na_", 0);
PlainActionFuture<Response> listener = new PlainActionFuture<>();
boolean local = randomBoolean();
setState(clusterService, ClusterStateCreationUtils.state("test", local, ShardRoutingState.INITIALIZING));
action.new AsyncSingleAction(request, listener).start();
// this should fail because primary not initialized
assertThat(transport.capturedRequests().length, equalTo(0));
setState(clusterService, ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED));
// this time it should work
assertThat(transport.capturedRequests().length, equalTo(1));
transport.handleResponse(transport.capturedRequests()[0].requestId, new Response());
listener.get();
}
public void testSuccessAfterRetryWithExceptionFromTransport() throws Exception {
Request request = new Request().index("test");
request.shardId = new ShardId("test", "_na_", 0);
PlainActionFuture<Response> listener = new PlainActionFuture<>();
boolean local = randomBoolean();
setState(clusterService, ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED));
action.new AsyncSingleAction(request, listener).start();
assertThat(transport.capturedRequests().length, equalTo(1));
long requestId = transport.capturedRequests()[0].requestId;
transport.clear();
DiscoveryNode node = clusterService.state().getNodes().getLocalNode();
transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception"));
// trigger cluster state observer
setState(clusterService, ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED));
assertThat(transport.capturedRequests().length, equalTo(1));
transport.handleResponse(transport.capturedRequests()[0].requestId, new Response());
listener.get();
}
public void testRetryOfAnAlreadyTimedOutRequest() throws Exception {
Request request = new Request().index("test").timeout(new TimeValue(0, TimeUnit.MILLISECONDS));
request.shardId = new ShardId("test", "_na_", 0);
PlainActionFuture<Response> listener = new PlainActionFuture<>();
setState(clusterService, ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED));
action.new AsyncSingleAction(request, listener).start();
assertThat(transport.capturedRequests().length, equalTo(1));
long requestId = transport.capturedRequests()[0].requestId;
transport.clear();
DiscoveryNode node = clusterService.state().getNodes().getLocalNode();
transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception"));
// wait until the timeout was triggered and we actually tried to send for the second time
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(transport.capturedRequests().length, equalTo(1));
}
});
// let it fail the second time too
requestId = transport.capturedRequests()[0].requestId;
transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception"));
try {
// result should return immediately
assertTrue(listener.isDone());
listener.get();
fail("this should fail with a transport exception");
} catch (ExecutionException t) {
if (ExceptionsHelper.unwrap(t, ConnectTransportException.class) == null) {
logger.info("expected ConnectTransportException but got ", t);
fail("expected and ConnectTransportException");
}
}
}
public void testUnresolvableRequestDoesNotHang() throws InterruptedException, ExecutionException, TimeoutException {
action = new TestTransportInstanceSingleOperationAction(
Settings.EMPTY,
"indices:admin/test_unresolvable",
transportService,
new ActionFilters(new HashSet<>()),
new MyResolver(),
Request::new
) {
@Override
protected void resolveRequest(ClusterState state, Request request) {
throw new IllegalStateException("request cannot be resolved");
}
};
Request request = new Request().index("test");
request.shardId = new ShardId("test", "_na_", 0);
PlainActionFuture<Response> listener = new PlainActionFuture<>();
setState(clusterService, ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED));
action.new AsyncSingleAction(request, listener).start();
assertThat(transport.capturedRequests().length, equalTo(0));
try {
listener.get();
} catch (Exception e) {
if (ExceptionsHelper.unwrap(e, IllegalStateException.class) == null) {
logger.info("expected IllegalStateException but got ", e);
fail("expected and IllegalStateException");
}
}
}
}

View File

@ -172,9 +172,8 @@ public class UpdateRequestTests extends ESTestCase {
// Related to issue 3256
public void testUpdateRequestWithTTL() throws Exception {
TimeValue providedTTLValue = TimeValue.parseTimeValue(randomTimeValue(), null, "ttl");
Settings settings = settings(Version.CURRENT).build();
UpdateHelper updateHelper = new UpdateHelper(settings, null);
UpdateHelper updateHelper = new UpdateHelper(null, logger);
// We just upsert one document with ttl
IndexRequest indexRequest = new IndexRequest("test", "type1", "1")

View File

@ -366,8 +366,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
@Override
protected PrimaryResult performOnPrimary(IndexShard primary, IndexRequest request) throws Exception {
TransportWriteAction.WriteResult<IndexResponse> result = TransportIndexAction.executeIndexRequestOnPrimary(request, primary,
null);
TransportWriteAction.WriteResult<IndexRequest, IndexResponse> result =
TransportIndexAction.executeIndexRequestOnPrimary(request, primary, null);
request.primaryTerm(primary.getPrimaryTerm());
TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.getLocation(), logger);
return new PrimaryResult(request, result.getResponse());

View File

@ -162,8 +162,8 @@ the request was ignored.
--------------------------------------------------
{
"_shards": {
"total": 0,
"successful": 0,
"total": 1,
"successful": 1,
"failed": 0
},
"_index": "test",