Revert "Make update a replication action"
This reverts commit eee0d18f94
.
This commit is contained in:
parent
396f80c963
commit
97a6756521
|
@ -20,11 +20,10 @@ package org.elasticsearch.action;
|
|||
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.update.UpdateReplicaRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -34,72 +33,84 @@ import java.util.Locale;
|
|||
* Generic interface to group ActionRequest, which perform writes to a single document
|
||||
* Action requests implementing this can be part of {@link org.elasticsearch.action.bulk.BulkRequest}
|
||||
*/
|
||||
public abstract class DocumentRequest<T extends ReplicatedWriteRequest<T>> extends ReplicatedWriteRequest<T> {
|
||||
public interface DocumentRequest<T> extends IndicesRequest {
|
||||
|
||||
/**
|
||||
* Get the index that this request operates on
|
||||
* @return the index
|
||||
*/
|
||||
String index();
|
||||
|
||||
/**
|
||||
* Get the type that this request operates on
|
||||
* @return the type
|
||||
*/
|
||||
public abstract String type();
|
||||
String type();
|
||||
|
||||
/**
|
||||
* Get the id of the document for this request
|
||||
* @return the id
|
||||
*/
|
||||
public abstract String id();
|
||||
String id();
|
||||
|
||||
/**
|
||||
* Get the options for this request
|
||||
* @return the indices options
|
||||
*/
|
||||
IndicesOptions indicesOptions();
|
||||
|
||||
/**
|
||||
* Set the routing for this request
|
||||
* @return the Request
|
||||
*/
|
||||
public abstract T routing(String routing);
|
||||
T routing(String routing);
|
||||
|
||||
/**
|
||||
* Get the routing for this request
|
||||
* @return the Routing
|
||||
*/
|
||||
public abstract String routing();
|
||||
String routing();
|
||||
|
||||
|
||||
/**
|
||||
* Get the parent for this request
|
||||
* @return the Parent
|
||||
*/
|
||||
public abstract String parent();
|
||||
String parent();
|
||||
|
||||
/**
|
||||
* Get the document version for this request
|
||||
* @return the document version
|
||||
*/
|
||||
public abstract long version();
|
||||
long version();
|
||||
|
||||
/**
|
||||
* Sets the version, which will perform the operation only if a matching
|
||||
* version exists and no changes happened on the doc since then.
|
||||
*/
|
||||
public abstract T version(long version);
|
||||
T version(long version);
|
||||
|
||||
/**
|
||||
* Get the document version type for this request
|
||||
* @return the document version type
|
||||
*/
|
||||
public abstract VersionType versionType();
|
||||
VersionType versionType();
|
||||
|
||||
/**
|
||||
* Sets the versioning type. Defaults to {@link VersionType#INTERNAL}.
|
||||
*/
|
||||
public abstract T versionType(VersionType versionType);
|
||||
T versionType(VersionType versionType);
|
||||
|
||||
/**
|
||||
* Get the requested document operation type of the request
|
||||
* @return the operation type {@link OpType}
|
||||
*/
|
||||
public abstract OpType opType();
|
||||
OpType opType();
|
||||
|
||||
/**
|
||||
* Requested operation type to perform on the document
|
||||
*/
|
||||
public enum OpType {
|
||||
enum OpType {
|
||||
/**
|
||||
* Index the source. If there an existing document with the id, it will
|
||||
* be replaced.
|
||||
|
@ -153,42 +164,40 @@ public abstract class DocumentRequest<T extends ReplicatedWriteRequest<T>> exten
|
|||
}
|
||||
|
||||
/** read a document write (index/delete/update) request */
|
||||
public static DocumentRequest readDocumentRequest(StreamInput in) throws IOException {
|
||||
static DocumentRequest readDocumentRequest(StreamInput in) throws IOException {
|
||||
byte type = in.readByte();
|
||||
final DocumentRequest documentRequest;
|
||||
if (type == 0) {
|
||||
IndexRequest indexRequest = new IndexRequest();
|
||||
indexRequest.readFrom(in);
|
||||
return indexRequest;
|
||||
documentRequest = indexRequest;
|
||||
} else if (type == 1) {
|
||||
DeleteRequest deleteRequest = new DeleteRequest();
|
||||
deleteRequest.readFrom(in);
|
||||
return deleteRequest;
|
||||
documentRequest = deleteRequest;
|
||||
} else if (type == 2) {
|
||||
UpdateRequest updateRequest = new UpdateRequest();
|
||||
updateRequest.readFrom(in);
|
||||
return updateRequest;
|
||||
} else if (type == 3) {
|
||||
UpdateReplicaRequest updateReplicaRequest = new UpdateReplicaRequest();
|
||||
updateReplicaRequest.readFrom(in);
|
||||
return updateReplicaRequest;
|
||||
documentRequest = updateRequest;
|
||||
} else {
|
||||
throw new IllegalStateException("invalid request type [" + type+ " ]");
|
||||
}
|
||||
return documentRequest;
|
||||
}
|
||||
|
||||
/** write a document write (index/delete/update) request*/
|
||||
public static void writeDocumentRequest(StreamOutput out, DocumentRequest request) throws IOException {
|
||||
static void writeDocumentRequest(StreamOutput out, DocumentRequest request) throws IOException {
|
||||
if (request instanceof IndexRequest) {
|
||||
out.writeByte((byte) 0);
|
||||
((IndexRequest) request).writeTo(out);
|
||||
} else if (request instanceof DeleteRequest) {
|
||||
out.writeByte((byte) 1);
|
||||
((DeleteRequest) request).writeTo(out);
|
||||
} else if (request instanceof UpdateRequest) {
|
||||
out.writeByte((byte) 2);
|
||||
} else if (request instanceof UpdateReplicaRequest) {
|
||||
out.writeByte((byte) 3);
|
||||
((UpdateRequest) request).writeTo(out);
|
||||
} else {
|
||||
throw new IllegalStateException("invalid request [" + request.getClass().getSimpleName() + " ]");
|
||||
}
|
||||
request.writeTo(out);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,9 @@
|
|||
package org.elasticsearch.action.bulk;
|
||||
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
|
|
|
@ -528,11 +528,11 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
|
|||
}
|
||||
for (DocumentRequest<?> request : requests) {
|
||||
// We first check if refresh has been set
|
||||
if (request.getRefreshPolicy() != RefreshPolicy.NONE) {
|
||||
if (((WriteRequest<?>) request).getRefreshPolicy() != RefreshPolicy.NONE) {
|
||||
validationException = addValidationError(
|
||||
"RefreshPolicy is not supported on an item request. Set it on the BulkRequest instead.", validationException);
|
||||
}
|
||||
ActionRequestValidationException ex = request.validate();
|
||||
ActionRequestValidationException ex = ((WriteRequest<?>) request).validate();
|
||||
if (ex != null) {
|
||||
if (validationException == null) {
|
||||
validationException = new ActionRequestValidationException();
|
||||
|
|
|
@ -27,12 +27,14 @@ import org.elasticsearch.action.RoutingMissingException;
|
|||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.delete.TransportDeleteAction;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.AutoCreateIndex;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.action.support.replication.TransportWriteAction;
|
||||
import org.elasticsearch.action.update.TransportUpdateAction;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -209,7 +211,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
|
||||
MetaData metaData = clusterState.metaData();
|
||||
for (int i = 0; i < bulkRequest.requests.size(); i++) {
|
||||
DocumentRequest documentRequest = bulkRequest.requests.get(i);
|
||||
DocumentRequest<?> documentRequest = bulkRequest.requests.get(i);
|
||||
//the request can only be null because we set it to null in the previous step, so it gets ignored
|
||||
if (documentRequest == null) {
|
||||
continue;
|
||||
|
@ -232,8 +234,10 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName());
|
||||
break;
|
||||
case UPDATE:
|
||||
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest)documentRequest);
|
||||
break;
|
||||
case DELETE:
|
||||
TransportWriteAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), documentRequest);
|
||||
TransportDeleteAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (DeleteRequest)documentRequest);
|
||||
break;
|
||||
default: throw new AssertionError("request type not supported: [" + documentRequest.opType() + "]");
|
||||
}
|
||||
|
|
|
@ -23,12 +23,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.delete.TransportDeleteAction;
|
||||
|
@ -36,16 +32,11 @@ import org.elasticsearch.action.index.IndexRequest;
|
|||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.index.TransportIndexAction;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.AutoCreateIndex;
|
||||
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
|
||||
import org.elasticsearch.action.support.replication.TransportWriteAction;
|
||||
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
|
||||
import org.elasticsearch.action.update.TransportUpdateAction;
|
||||
import org.elasticsearch.action.update.UpdateHelper;
|
||||
import org.elasticsearch.action.update.UpdateReplicaRequest;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.action.update.UpdateResponse;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -66,11 +57,8 @@ import org.elasticsearch.index.shard.IndexShard;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.index.translog.Translog.Location;
|
||||
import org.elasticsearch.indices.IndexAlreadyExistsException;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -81,29 +69,24 @@ import static org.elasticsearch.action.support.replication.ReplicationOperation.
|
|||
import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException;
|
||||
|
||||
/** Performs shard-level bulk (index, delete or update) operations */
|
||||
public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
|
||||
public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardResponse> {
|
||||
|
||||
public static final String ACTION_NAME = BulkAction.NAME + "[s]";
|
||||
|
||||
private final UpdateHelper updateHelper;
|
||||
private final boolean allowIdGeneration;
|
||||
private final MappingUpdatedAction mappingUpdatedAction;
|
||||
private final UpdateHelper updateHelper;
|
||||
private final AutoCreateIndex autoCreateIndex;
|
||||
private final TransportCreateIndexAction createIndexAction;
|
||||
|
||||
@Inject
|
||||
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
|
||||
MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, ScriptService scriptService,
|
||||
AutoCreateIndex autoCreateIndex, TransportCreateIndexAction createIndexAction) {
|
||||
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.BULK);
|
||||
indexNameExpressionResolver, BulkShardRequest::new, ThreadPool.Names.BULK);
|
||||
this.updateHelper = updateHelper;
|
||||
this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true);
|
||||
this.mappingUpdatedAction = mappingUpdatedAction;
|
||||
this.updateHelper = new UpdateHelper(scriptService, logger);
|
||||
this.autoCreateIndex = autoCreateIndex;
|
||||
this.createIndexAction = createIndexAction;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -122,39 +105,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, BulkShardRequest request, ActionListener<BulkShardResponse> listener) {
|
||||
ClusterState state = clusterService.state();
|
||||
if (autoCreateIndex.shouldAutoCreate(request.index(), state)) {
|
||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
|
||||
createIndexRequest.index(request.index());
|
||||
createIndexRequest.cause("auto(bulk api)");
|
||||
createIndexRequest.masterNodeTimeout(request.timeout());
|
||||
createIndexAction.execute(task, createIndexRequest, new ActionListener<CreateIndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(CreateIndexResponse result) {
|
||||
innerExecute(task, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
|
||||
// we have the index, do it
|
||||
innerExecute(task, request, listener);
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
innerExecute(task, request, listener);
|
||||
}
|
||||
}
|
||||
|
||||
private void innerExecute(Task task, final BulkShardRequest request, final ActionListener<BulkShardResponse> listener) {
|
||||
super.doExecute(task, request, listener);
|
||||
}
|
||||
@Override
|
||||
protected WriteResult<BulkShardRequest, BulkShardResponse> onPrimaryShard(BulkShardRequest request, IndexShard indexShard) throws Exception {
|
||||
protected WriteResult<BulkShardResponse> onPrimaryShard(BulkShardRequest request, IndexShard indexShard) throws Exception {
|
||||
ShardId shardId = request.shardId();
|
||||
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
|
||||
final IndexMetaData metaData = indexService.getIndexSettings().getIndexMetaData();
|
||||
|
@ -172,7 +123,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
responses[i] = items[i].getPrimaryResponse();
|
||||
}
|
||||
BulkShardResponse response = new BulkShardResponse(request.shardId(), responses);
|
||||
return new WriteResult<>(request, response, location);
|
||||
return new WriteResult<>(response, location);
|
||||
}
|
||||
|
||||
/** Executes bulk item requests and handles request execution exceptions */
|
||||
|
@ -180,39 +131,22 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
BulkShardRequest request,
|
||||
long[] preVersions, VersionType[] preVersionTypes,
|
||||
Translog.Location location, int requestIndex) {
|
||||
DocumentRequest<?> itemRequest = request.items()[requestIndex].request();
|
||||
preVersions[requestIndex] = itemRequest.version();
|
||||
preVersionTypes[requestIndex] = itemRequest.versionType();
|
||||
DocumentRequest.OpType opType = itemRequest.opType();
|
||||
preVersions[requestIndex] = request.items()[requestIndex].request().version();
|
||||
preVersionTypes[requestIndex] = request.items()[requestIndex].request().versionType();
|
||||
DocumentRequest.OpType opType = request.items()[requestIndex].request().opType();
|
||||
try {
|
||||
final WriteResult<? extends ReplicatedWriteRequest, ? extends DocWriteResponse> writeResult;
|
||||
switch (itemRequest.opType()) {
|
||||
case CREATE:
|
||||
case INDEX:
|
||||
writeResult = TransportIndexAction.executeIndexRequestOnPrimary(((IndexRequest) itemRequest), indexShard,
|
||||
mappingUpdatedAction);
|
||||
break;
|
||||
case UPDATE:
|
||||
writeResult = TransportUpdateAction.executeUpdateRequestOnPrimary(((UpdateRequest) itemRequest), indexShard,
|
||||
metaData, updateHelper, mappingUpdatedAction, allowIdGeneration);
|
||||
break;
|
||||
case DELETE:
|
||||
writeResult = TransportDeleteAction.executeDeleteRequestOnPrimary(((DeleteRequest) itemRequest), indexShard);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
|
||||
}
|
||||
WriteResult<? extends DocWriteResponse> writeResult = innerExecuteBulkItemRequest(metaData, indexShard,
|
||||
request, requestIndex);
|
||||
if (writeResult.getLocation() != null) {
|
||||
location = locationToSync(location, writeResult.getLocation());
|
||||
} else {
|
||||
assert writeResult.getResponse().getResult() == DocWriteResponse.Result.NOOP
|
||||
: "only noop operation can have null next operation";
|
||||
}
|
||||
// update the bulk item request with replica request (update request are changed to index or delete requests for replication)
|
||||
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(),
|
||||
(DocumentRequest<?>) writeResult.getReplicaRequest());
|
||||
// update the bulk item request because update request execution can mutate the bulk item request
|
||||
BulkItemRequest item = request.items()[requestIndex];
|
||||
// add the response
|
||||
setResponse(request.items()[requestIndex], new BulkItemResponse(request.items()[requestIndex].id(), opType, writeResult.getResponse()));
|
||||
setResponse(item, new BulkItemResponse(item.id(), opType, writeResult.getResponse()));
|
||||
} catch (Exception e) {
|
||||
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
||||
if (retryPrimaryException(e)) {
|
||||
|
@ -248,6 +182,33 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
return location;
|
||||
}
|
||||
|
||||
private WriteResult<? extends DocWriteResponse> innerExecuteBulkItemRequest(IndexMetaData metaData, IndexShard indexShard,
|
||||
BulkShardRequest request, int requestIndex) throws Exception {
|
||||
DocumentRequest<?> itemRequest = request.items()[requestIndex].request();
|
||||
switch (itemRequest.opType()) {
|
||||
case CREATE:
|
||||
case INDEX:
|
||||
return TransportIndexAction.executeIndexRequestOnPrimary(((IndexRequest) itemRequest), indexShard, mappingUpdatedAction);
|
||||
case UPDATE:
|
||||
int maxAttempts = ((UpdateRequest) itemRequest).retryOnConflict();
|
||||
for (int attemptCount = 0; attemptCount <= maxAttempts; attemptCount++) {
|
||||
try {
|
||||
return shardUpdateOperation(metaData, indexShard, request, requestIndex, ((UpdateRequest) itemRequest));
|
||||
} catch (Exception e) {
|
||||
final Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||
if (attemptCount == maxAttempts // bubble up exception when we run out of attempts
|
||||
|| (cause instanceof VersionConflictEngineException) == false) { // or when exception is not a version conflict
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
throw new IllegalStateException("version conflict exception should bubble up on last attempt");
|
||||
case DELETE:
|
||||
return TransportDeleteAction.executeDeleteRequestOnPrimary(((DeleteRequest) itemRequest), indexShard);
|
||||
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
|
||||
}
|
||||
}
|
||||
|
||||
private void setResponse(BulkItemRequest request, BulkItemResponse response) {
|
||||
request.setPrimaryResponse(response);
|
||||
if (response.isFailed()) {
|
||||
|
@ -258,6 +219,51 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes update request, doing a get and translating update to a index or delete operation
|
||||
* NOTE: all operations except NOOP, reassigns the bulk item request
|
||||
*/
|
||||
private WriteResult<? extends DocWriteResponse> shardUpdateOperation(IndexMetaData metaData, IndexShard indexShard,
|
||||
BulkShardRequest request,
|
||||
int requestIndex, UpdateRequest updateRequest)
|
||||
throws Exception {
|
||||
// Todo: capture read version conflicts, missing documents and malformed script errors in the write result due to get request
|
||||
UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard);
|
||||
switch (translate.getResponseResult()) {
|
||||
case CREATED:
|
||||
case UPDATED:
|
||||
IndexRequest indexRequest = translate.action();
|
||||
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
|
||||
indexRequest.process(mappingMd, allowIdGeneration, request.index());
|
||||
WriteResult<IndexResponse> writeResult = TransportIndexAction.executeIndexRequestOnPrimary(indexRequest, indexShard, mappingUpdatedAction);
|
||||
BytesReference indexSourceAsBytes = indexRequest.source();
|
||||
IndexResponse indexResponse = writeResult.getResponse();
|
||||
UpdateResponse writeUpdateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.getResult());
|
||||
if (updateRequest.fields() != null && updateRequest.fields().length > 0) {
|
||||
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
|
||||
writeUpdateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
|
||||
}
|
||||
// Replace the update request to the translated index request to execute on the replica.
|
||||
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest);
|
||||
return new WriteResult<>(writeUpdateResponse, writeResult.getLocation());
|
||||
case DELETED:
|
||||
DeleteRequest deleteRequest = translate.action();
|
||||
WriteResult<DeleteResponse> deleteResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard);
|
||||
DeleteResponse response = deleteResult.getResponse();
|
||||
UpdateResponse deleteUpdateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult());
|
||||
deleteUpdateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), translate.updatedSourceAsMap(), translate.updateSourceContentType(), null));
|
||||
// Replace the update request to the translated delete request to execute on the replica.
|
||||
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);
|
||||
return new WriteResult<>(deleteUpdateResponse, deleteResult.getLocation());
|
||||
case NOOP:
|
||||
BulkItemRequest item = request.items()[requestIndex];
|
||||
indexShard.noopUpdate(updateRequest.type());
|
||||
item.setIgnoreOnReplica(); // no need to go to the replica
|
||||
return new WriteResult<>(translate.action(), null);
|
||||
default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Location onReplicaShard(BulkShardRequest request, IndexShard indexShard) {
|
||||
Translog.Location location = null;
|
||||
|
@ -266,8 +272,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
if (item == null || item.isIgnoreOnReplica()) {
|
||||
continue;
|
||||
}
|
||||
DocumentRequest documentRequest = (item.request() instanceof UpdateReplicaRequest)
|
||||
? ((UpdateReplicaRequest) item.request()).getRequest() : item.request();
|
||||
DocumentRequest<?> documentRequest = item.request();
|
||||
final Engine.Operation operation;
|
||||
try {
|
||||
switch (documentRequest.opType()) {
|
||||
|
|
|
@ -43,7 +43,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
|
|||
* @see org.elasticsearch.client.Client#delete(DeleteRequest)
|
||||
* @see org.elasticsearch.client.Requests#deleteRequest(String)
|
||||
*/
|
||||
public class DeleteRequest extends DocumentRequest<DeleteRequest> {
|
||||
public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest> implements DocumentRequest<DeleteRequest> {
|
||||
|
||||
private String type;
|
||||
private String id;
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.action.delete;
|
|||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.RoutingMissingException;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
|
@ -50,7 +49,7 @@ import org.elasticsearch.transport.TransportService;
|
|||
/**
|
||||
* Performs the delete operation.
|
||||
*/
|
||||
public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, DeleteRequest, DeleteResponse> {
|
||||
public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, DeleteResponse> {
|
||||
|
||||
private final AutoCreateIndex autoCreateIndex;
|
||||
private final TransportCreateIndexAction createIndexAction;
|
||||
|
@ -62,7 +61,7 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
|
|||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
AutoCreateIndex autoCreateIndex) {
|
||||
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.INDEX);
|
||||
indexNameExpressionResolver, DeleteRequest::new, ThreadPool.Names.INDEX);
|
||||
this.createIndexAction = createIndexAction;
|
||||
this.autoCreateIndex = autoCreateIndex;
|
||||
}
|
||||
|
@ -71,11 +70,7 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
|
|||
protected void doExecute(Task task, final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
|
||||
ClusterState state = clusterService.state();
|
||||
if (autoCreateIndex.shouldAutoCreate(request.index(), state)) {
|
||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
|
||||
createIndexRequest.index(request.index());
|
||||
createIndexRequest.cause("auto(delete api)");
|
||||
createIndexRequest.masterNodeTimeout(request.timeout());
|
||||
createIndexAction.execute(task, createIndexRequest, new ActionListener<CreateIndexResponse>() {
|
||||
createIndexAction.execute(task, new CreateIndexRequest().index(request.index()).cause("auto(delete api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(CreateIndexResponse result) {
|
||||
innerExecute(task, request, listener);
|
||||
|
@ -105,6 +100,15 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
|
|||
request.setShardId(shardId);
|
||||
}
|
||||
|
||||
public static void resolveAndValidateRouting(final MetaData metaData, final String concreteIndex,
|
||||
DeleteRequest request) {
|
||||
request.routing(metaData.resolveIndexRouting(request.parent(), request.routing(), request.index()));
|
||||
// check if routing is required, if so, throw error if routing wasn't specified
|
||||
if (request.routing() == null && metaData.routingRequired(concreteIndex, request.type())) {
|
||||
throw new RoutingMissingException(concreteIndex, request.type(), request.id());
|
||||
}
|
||||
}
|
||||
|
||||
private void innerExecute(Task task, final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
|
||||
super.doExecute(task, request, listener);
|
||||
}
|
||||
|
@ -115,7 +119,7 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
|
|||
}
|
||||
|
||||
@Override
|
||||
protected WriteResult<DeleteRequest, DeleteResponse> onPrimaryShard(DeleteRequest request, IndexShard indexShard) {
|
||||
protected WriteResult<DeleteResponse> onPrimaryShard(DeleteRequest request, IndexShard indexShard) {
|
||||
return executeDeleteRequestOnPrimary(request, indexShard);
|
||||
}
|
||||
|
||||
|
@ -124,7 +128,7 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
|
|||
return executeDeleteRequestOnReplica(request, indexShard).getTranslogLocation();
|
||||
}
|
||||
|
||||
public static WriteResult<DeleteRequest, DeleteResponse> executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard indexShard) {
|
||||
public static WriteResult<DeleteResponse> executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard indexShard) {
|
||||
Engine.Delete delete = indexShard.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
|
||||
indexShard.delete(delete);
|
||||
// update the request with the version so it will go to the replicas
|
||||
|
@ -133,7 +137,7 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
|
|||
|
||||
assert request.versionType().validateVersionForWrites(request.version());
|
||||
DeleteResponse response = new DeleteResponse(indexShard.shardId(), request.type(), request.id(), delete.version(), delete.found());
|
||||
return new WriteResult<>(request, response, delete.getTranslogLocation());
|
||||
return new WriteResult<>(response, delete.getTranslogLocation());
|
||||
}
|
||||
|
||||
public static Engine.Delete executeDeleteRequestOnReplica(DeleteRequest request, IndexShard indexShard) {
|
||||
|
|
|
@ -67,7 +67,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
|
|||
* @see org.elasticsearch.client.Requests#indexRequest(String)
|
||||
* @see org.elasticsearch.client.Client#index(IndexRequest)
|
||||
*/
|
||||
public class IndexRequest extends DocumentRequest<IndexRequest> {
|
||||
public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implements DocumentRequest<IndexRequest> {
|
||||
|
||||
private String type;
|
||||
private String id;
|
||||
|
|
|
@ -60,7 +60,7 @@ import org.elasticsearch.transport.TransportService;
|
|||
* <li><b>allowIdGeneration</b>: If the id is set not, should it be generated. Defaults to <tt>true</tt>.
|
||||
* </ul>
|
||||
*/
|
||||
public class TransportIndexAction extends TransportWriteAction<IndexRequest, IndexRequest, IndexResponse> {
|
||||
public class TransportIndexAction extends TransportWriteAction<IndexRequest, IndexResponse> {
|
||||
|
||||
private final AutoCreateIndex autoCreateIndex;
|
||||
private final boolean allowIdGeneration;
|
||||
|
@ -76,7 +76,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
|
|||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
AutoCreateIndex autoCreateIndex) {
|
||||
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
|
||||
actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX);
|
||||
actionFilters, indexNameExpressionResolver, IndexRequest::new, ThreadPool.Names.INDEX);
|
||||
this.mappingUpdatedAction = mappingUpdatedAction;
|
||||
this.createIndexAction = createIndexAction;
|
||||
this.autoCreateIndex = autoCreateIndex;
|
||||
|
@ -122,7 +122,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
|
|||
@Override
|
||||
protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, IndexRequest request) {
|
||||
super.resolveRequest(metaData, indexMetaData, request);
|
||||
MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
|
||||
MappingMetaData mappingMd =indexMetaData.mappingOrDefault(request.type());
|
||||
request.resolveRouting(metaData);
|
||||
request.process(mappingMd, allowIdGeneration, indexMetaData.getIndex().getName());
|
||||
ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(),
|
||||
|
@ -140,7 +140,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
|
|||
}
|
||||
|
||||
@Override
|
||||
protected WriteResult<IndexRequest, IndexResponse> onPrimaryShard(IndexRequest request, IndexShard indexShard) throws Exception {
|
||||
protected WriteResult<IndexResponse> onPrimaryShard(IndexRequest request, IndexShard indexShard) throws Exception {
|
||||
return executeIndexRequestOnPrimary(request, indexShard, mappingUpdatedAction);
|
||||
}
|
||||
|
||||
|
@ -174,7 +174,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
|
|||
return indexShard.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
|
||||
}
|
||||
|
||||
public static WriteResult<IndexRequest, IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard,
|
||||
public static WriteResult<IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard,
|
||||
MappingUpdatedAction mappingUpdatedAction) throws Exception {
|
||||
Engine.Index operation = prepareIndexOperationOnPrimary(request, indexShard);
|
||||
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
|
||||
|
@ -198,7 +198,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
|
|||
assert request.versionType().validateVersionForWrites(request.version());
|
||||
|
||||
IndexResponse response = new IndexResponse(shardId, request.type(), request.id(), request.version(), operation.isCreated());
|
||||
return new WriteResult<>(request, response, operation.getTranslogLocation());
|
||||
return new WriteResult<>(response, operation.getTranslogLocation());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.elasticsearch.cluster.routing.AllocationId;
|
|||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
|
@ -113,24 +112,22 @@ public class ReplicationOperation<
|
|||
pendingActions.incrementAndGet();
|
||||
primaryResult = primary.perform(request);
|
||||
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
|
||||
if (replicaRequest != null) {
|
||||
assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term";
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
|
||||
}
|
||||
|
||||
// we have to get a new state after successfully indexing into the primary in order to honour recovery semantics.
|
||||
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
|
||||
// to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then.
|
||||
ClusterState clusterState = clusterStateSupplier.get();
|
||||
final List<ShardRouting> shards = getShards(primaryId, clusterState);
|
||||
Set<String> inSyncAllocationIds = getInSyncAllocationIds(primaryId, clusterState);
|
||||
|
||||
markUnavailableShardsAsStale(replicaRequest, inSyncAllocationIds, shards);
|
||||
|
||||
performOnReplicas(replicaRequest, shards);
|
||||
assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term";
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
|
||||
}
|
||||
|
||||
// we have to get a new state after successfully indexing into the primary in order to honour recovery semantics.
|
||||
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
|
||||
// to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then.
|
||||
ClusterState clusterState = clusterStateSupplier.get();
|
||||
final List<ShardRouting> shards = getShards(primaryId, clusterState);
|
||||
Set<String> inSyncAllocationIds = getInSyncAllocationIds(primaryId, clusterState);
|
||||
|
||||
markUnavailableShardsAsStale(replicaRequest, inSyncAllocationIds, shards);
|
||||
|
||||
performOnReplicas(replicaRequest, shards);
|
||||
|
||||
successfulShards.incrementAndGet();
|
||||
decPendingAndFinishIfNeeded();
|
||||
}
|
||||
|
@ -422,10 +419,7 @@ public class ReplicationOperation<
|
|||
|
||||
public interface PrimaryResult<R extends ReplicationRequest<R>> {
|
||||
|
||||
/**
|
||||
* @return null if no operation needs to be sent to a replica
|
||||
*/
|
||||
@Nullable R replicaRequest();
|
||||
R replicaRequest();
|
||||
|
||||
void setShardInfo(ReplicationResponse.ShardInfo shardInfo);
|
||||
}
|
||||
|
|
|
@ -23,8 +23,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionListenerResponseHandler;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.RoutingMissingException;
|
||||
import org.elasticsearch.action.UnavailableShardsException;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
|
@ -165,16 +163,6 @@ public abstract class TransportReplicationAction<
|
|||
}
|
||||
}
|
||||
|
||||
/** helper to verify and resolve request routing */
|
||||
public static void resolveAndValidateRouting(final MetaData metaData, final String concreteIndex,
|
||||
DocumentRequest request) {
|
||||
request.routing(metaData.resolveIndexRouting(request.parent(), request.routing(), request.index()));
|
||||
// check if routing is required, if so, throw error if routing wasn't specified
|
||||
if (request.routing() == null && metaData.routingRequired(concreteIndex, request.type())) {
|
||||
throw new RoutingMissingException(concreteIndex, request.type(), request.id());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Primary operation on node with primary copy.
|
||||
*
|
||||
|
@ -912,9 +900,7 @@ public abstract class TransportReplicationAction<
|
|||
@Override
|
||||
public PrimaryResult perform(Request request) throws Exception {
|
||||
PrimaryResult result = shardOperationOnPrimary(request);
|
||||
if (result.replicaRequest() != null) {
|
||||
result.replicaRequest().primaryTerm(indexShard.getPrimaryTerm());
|
||||
}
|
||||
result.replicaRequest().primaryTerm(indexShard.getPrimaryTerm());
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
@ -49,40 +49,38 @@ import java.util.function.Supplier;
|
|||
*/
|
||||
public abstract class TransportWriteAction<
|
||||
Request extends ReplicatedWriteRequest<Request>,
|
||||
ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>,
|
||||
Response extends ReplicationResponse & WriteResponse
|
||||
> extends TransportReplicationAction<Request, ReplicaRequest, Response> {
|
||||
> extends TransportReplicationAction<Request, Request, Response> {
|
||||
|
||||
protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
|
||||
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
|
||||
Supplier<ReplicaRequest> replicaRequest,
|
||||
String executor) {
|
||||
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
indexNameExpressionResolver, request, replicaRequest, executor);
|
||||
indexNameExpressionResolver, request, request, executor);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called on the primary with a reference to the {@linkplain IndexShard} to modify.
|
||||
*/
|
||||
protected abstract WriteResult<ReplicaRequest, Response> onPrimaryShard(Request request, IndexShard indexShard) throws Exception;
|
||||
protected abstract WriteResult<Response> onPrimaryShard(Request request, IndexShard indexShard) throws Exception;
|
||||
|
||||
/**
|
||||
* Called once per replica with a reference to the {@linkplain IndexShard} to modify.
|
||||
*
|
||||
* @return the translog location of the {@linkplain IndexShard} after the write was completed or null if no write occurred
|
||||
*/
|
||||
protected abstract Translog.Location onReplicaShard(ReplicaRequest request, IndexShard indexShard);
|
||||
protected abstract Translog.Location onReplicaShard(Request request, IndexShard indexShard);
|
||||
|
||||
@Override
|
||||
protected final WritePrimaryResult shardOperationOnPrimary(Request request) throws Exception {
|
||||
IndexShard indexShard = indexShard(request);
|
||||
WriteResult<ReplicaRequest, Response> result = onPrimaryShard(request, indexShard);
|
||||
return new WritePrimaryResult(request, result, indexShard);
|
||||
WriteResult<Response> result = onPrimaryShard(request, indexShard);
|
||||
return new WritePrimaryResult(request, result.getResponse(), result.getLocation(), indexShard);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final WriteReplicaResult shardOperationOnReplica(ReplicaRequest request) {
|
||||
protected final WriteReplicaResult shardOperationOnReplica(Request request) {
|
||||
IndexShard indexShard = indexShard(request);
|
||||
Translog.Location location = onReplicaShard(request, indexShard);
|
||||
return new WriteReplicaResult(indexShard, request, location);
|
||||
|
@ -91,7 +89,7 @@ public abstract class TransportWriteAction<
|
|||
/**
|
||||
* Fetch the IndexShard for the request. Protected so it can be mocked in tests.
|
||||
*/
|
||||
protected IndexShard indexShard(ReplicatedWriteRequest request) {
|
||||
protected IndexShard indexShard(Request request) {
|
||||
final ShardId shardId = request.shardId();
|
||||
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
|
||||
return indexService.getShard(shardId.id());
|
||||
|
@ -100,13 +98,11 @@ public abstract class TransportWriteAction<
|
|||
/**
|
||||
* Simple result from a write action. Write actions have static method to return these so they can integrate with bulk.
|
||||
*/
|
||||
public static class WriteResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>, Response extends ReplicationResponse> {
|
||||
private final ReplicaRequest replicaRequest;
|
||||
public static class WriteResult<Response extends ReplicationResponse> {
|
||||
private final Response response;
|
||||
private final Translog.Location location;
|
||||
|
||||
public WriteResult(ReplicaRequest replicaRequest, Response response, @Nullable Location location) {
|
||||
this.replicaRequest = replicaRequest;
|
||||
public WriteResult(Response response, @Nullable Location location) {
|
||||
this.response = response;
|
||||
this.location = location;
|
||||
}
|
||||
|
@ -118,10 +114,6 @@ public abstract class TransportWriteAction<
|
|||
public Translog.Location getLocation() {
|
||||
return location;
|
||||
}
|
||||
|
||||
public ReplicaRequest getReplicaRequest() {
|
||||
return replicaRequest;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -131,15 +123,15 @@ public abstract class TransportWriteAction<
|
|||
boolean finishedAsyncActions;
|
||||
ActionListener<Response> listener = null;
|
||||
|
||||
public WritePrimaryResult(Request request,
|
||||
WriteResult<ReplicaRequest, Response> result,
|
||||
public WritePrimaryResult(Request request, Response finalResponse,
|
||||
@Nullable Translog.Location location,
|
||||
IndexShard indexShard) {
|
||||
super(result.getReplicaRequest(), result.getResponse());
|
||||
super(request, finalResponse);
|
||||
/*
|
||||
* We call this before replication because this might wait for a refresh and that can take a while. This way we wait for the
|
||||
* refresh in parallel on the primary and on the replica.
|
||||
*/
|
||||
new AsyncAfterWriteAction(indexShard, request, result.getLocation(), this, logger).run();
|
||||
new AsyncAfterWriteAction(indexShard, request, location, this, logger).run();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,138 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support.single.instance;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.ValidateActions;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public abstract class InstanceShardOperationRequest<Request extends InstanceShardOperationRequest<Request>> extends ActionRequest<Request>
|
||||
implements IndicesRequest {
|
||||
|
||||
public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES);
|
||||
|
||||
protected TimeValue timeout = DEFAULT_TIMEOUT;
|
||||
|
||||
protected String index;
|
||||
// null means its not set, allows to explicitly direct a request to a specific shard
|
||||
protected ShardId shardId = null;
|
||||
|
||||
private String concreteIndex;
|
||||
|
||||
protected InstanceShardOperationRequest() {
|
||||
}
|
||||
|
||||
public InstanceShardOperationRequest(String index) {
|
||||
this.index = index;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
ActionRequestValidationException validationException = null;
|
||||
if (index == null) {
|
||||
validationException = ValidateActions.addValidationError("index is missing", validationException);
|
||||
}
|
||||
return validationException;
|
||||
}
|
||||
|
||||
public String index() {
|
||||
return index;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] indices() {
|
||||
return new String[]{index};
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndicesOptions indicesOptions() {
|
||||
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public final Request index(String index) {
|
||||
this.index = index;
|
||||
return (Request) this;
|
||||
}
|
||||
|
||||
public TimeValue timeout() {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public final Request timeout(TimeValue timeout) {
|
||||
this.timeout = timeout;
|
||||
return (Request) this;
|
||||
}
|
||||
|
||||
/**
|
||||
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
|
||||
*/
|
||||
public final Request timeout(String timeout) {
|
||||
return timeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".timeout"));
|
||||
}
|
||||
|
||||
public String concreteIndex() {
|
||||
return concreteIndex;
|
||||
}
|
||||
|
||||
void concreteIndex(String concreteIndex) {
|
||||
this.concreteIndex = concreteIndex;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
index = in.readString();
|
||||
if (in.readBoolean()) {
|
||||
shardId = ShardId.readShardId(in);
|
||||
} else {
|
||||
shardId = null;
|
||||
}
|
||||
timeout = new TimeValue(in);
|
||||
concreteIndex = in.readOptionalString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(index);
|
||||
out.writeOptionalStreamable(shardId);
|
||||
timeout.writeTo(out);
|
||||
out.writeOptionalString(concreteIndex);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support.single.instance;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class InstanceShardOperationRequestBuilder<Request extends InstanceShardOperationRequest<Request>, Response extends ActionResponse, RequestBuilder extends InstanceShardOperationRequestBuilder<Request, Response, RequestBuilder>>
|
||||
extends ActionRequestBuilder<Request, Response, RequestBuilder> {
|
||||
|
||||
protected InstanceShardOperationRequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action, Request request) {
|
||||
super(client, action, request);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public final RequestBuilder setIndex(String index) {
|
||||
request.index(index);
|
||||
return (RequestBuilder) this;
|
||||
}
|
||||
|
||||
/**
|
||||
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public final RequestBuilder setTimeout(TimeValue timeout) {
|
||||
request.timeout(timeout);
|
||||
return (RequestBuilder) this;
|
||||
}
|
||||
|
||||
/**
|
||||
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public final RequestBuilder setTimeout(String timeout) {
|
||||
request.timeout(timeout);
|
||||
return (RequestBuilder) this;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,270 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support.single.instance;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.UnavailableShardsException;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequestHandler;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public abstract class TransportInstanceSingleOperationAction<Request extends InstanceShardOperationRequest<Request>, Response extends ActionResponse>
|
||||
extends HandledTransportAction<Request, Response> {
|
||||
protected final ClusterService clusterService;
|
||||
protected final TransportService transportService;
|
||||
|
||||
final String executor;
|
||||
final String shardActionName;
|
||||
|
||||
protected TransportInstanceSingleOperationAction(Settings settings, String actionName, ThreadPool threadPool,
|
||||
ClusterService clusterService, TransportService transportService,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
|
||||
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
|
||||
this.clusterService = clusterService;
|
||||
this.transportService = transportService;
|
||||
this.executor = executor();
|
||||
this.shardActionName = actionName + "[s]";
|
||||
transportService.registerRequestHandler(shardActionName, request, executor, new ShardTransportHandler());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Request request, ActionListener<Response> listener) {
|
||||
new AsyncSingleAction(request, listener).start();
|
||||
}
|
||||
|
||||
protected abstract String executor();
|
||||
|
||||
protected abstract void shardOperation(Request request, ActionListener<Response> listener);
|
||||
|
||||
protected abstract Response newResponse();
|
||||
|
||||
protected ClusterBlockException checkGlobalBlock(ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
|
||||
}
|
||||
|
||||
protected ClusterBlockException checkRequestBlock(ClusterState state, Request request) {
|
||||
return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.concreteIndex());
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves the request. Throws an exception if the request cannot be resolved.
|
||||
*/
|
||||
protected abstract void resolveRequest(ClusterState state, Request request);
|
||||
|
||||
protected boolean retryOnFailure(Exception e) {
|
||||
return false;
|
||||
}
|
||||
|
||||
protected TransportRequestOptions transportOptions() {
|
||||
return TransportRequestOptions.EMPTY;
|
||||
}
|
||||
|
||||
/**
|
||||
* Should return an iterator with a single shard!
|
||||
*/
|
||||
protected abstract ShardIterator shards(ClusterState clusterState, Request request);
|
||||
|
||||
class AsyncSingleAction {
|
||||
|
||||
private final ActionListener<Response> listener;
|
||||
private final Request request;
|
||||
private volatile ClusterStateObserver observer;
|
||||
private ShardIterator shardIt;
|
||||
private DiscoveryNodes nodes;
|
||||
|
||||
AsyncSingleAction(Request request, ActionListener<Response> listener) {
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
public void start() {
|
||||
this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
|
||||
doStart();
|
||||
}
|
||||
|
||||
protected void doStart() {
|
||||
nodes = observer.observedState().nodes();
|
||||
try {
|
||||
ClusterBlockException blockException = checkGlobalBlock(observer.observedState());
|
||||
if (blockException != null) {
|
||||
if (blockException.retryable()) {
|
||||
retry(blockException);
|
||||
return;
|
||||
} else {
|
||||
throw blockException;
|
||||
}
|
||||
}
|
||||
request.concreteIndex(indexNameExpressionResolver.concreteSingleIndex(observer.observedState(), request).getName());
|
||||
resolveRequest(observer.observedState(), request);
|
||||
blockException = checkRequestBlock(observer.observedState(), request);
|
||||
if (blockException != null) {
|
||||
if (blockException.retryable()) {
|
||||
retry(blockException);
|
||||
return;
|
||||
} else {
|
||||
throw blockException;
|
||||
}
|
||||
}
|
||||
shardIt = shards(observer.observedState(), request);
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
return;
|
||||
}
|
||||
|
||||
// no shardIt, might be in the case between index gateway recovery and shardIt initialization
|
||||
if (shardIt.size() == 0) {
|
||||
retry(null);
|
||||
return;
|
||||
}
|
||||
|
||||
// this transport only make sense with an iterator that returns a single shard routing (like primary)
|
||||
assert shardIt.size() == 1;
|
||||
|
||||
ShardRouting shard = shardIt.nextOrNull();
|
||||
assert shard != null;
|
||||
|
||||
if (!shard.active()) {
|
||||
retry(null);
|
||||
return;
|
||||
}
|
||||
|
||||
request.shardId = shardIt.shardId();
|
||||
DiscoveryNode node = nodes.get(shard.currentNodeId());
|
||||
transportService.sendRequest(node, shardActionName, request, transportOptions(), new TransportResponseHandler<Response>() {
|
||||
|
||||
@Override
|
||||
public Response newInstance() {
|
||||
return newResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(Response response) {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
final Throwable cause = exp.unwrapCause();
|
||||
// if we got disconnected from the node, or the node / shard is not in the right state (being closed)
|
||||
if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException ||
|
||||
retryOnFailure(exp)) {
|
||||
retry((Exception) cause);
|
||||
} else {
|
||||
listener.onFailure(exp);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void retry(@Nullable final Exception failure) {
|
||||
if (observer.isTimedOut()) {
|
||||
// we running as a last attempt after a timeout has happened. don't retry
|
||||
Exception listenFailure = failure;
|
||||
if (listenFailure == null) {
|
||||
if (shardIt == null) {
|
||||
listenFailure = new UnavailableShardsException(request.concreteIndex(), -1, "Timeout waiting for [{}], request: {}", request.timeout(), actionName);
|
||||
} else {
|
||||
listenFailure = new UnavailableShardsException(shardIt.shardId(), "[{}] shardIt, [{}] active : Timeout waiting for [{}], request: {}", shardIt.size(), shardIt.sizeActive(), request.timeout(), actionName);
|
||||
}
|
||||
}
|
||||
listener.onFailure(listenFailure);
|
||||
return;
|
||||
}
|
||||
|
||||
observer.waitForNextChange(new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
doStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new NodeClosedException(nodes.getLocalNode()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
// just to be on the safe side, see if we can start it now?
|
||||
doStart();
|
||||
}
|
||||
}, request.timeout());
|
||||
}
|
||||
}
|
||||
|
||||
private class ShardTransportHandler implements TransportRequestHandler<Request> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
|
||||
shardOperation(request, new ActionListener<Response>() {
|
||||
@Override
|
||||
public void onResponse(Response response) {
|
||||
try {
|
||||
channel.sendResponse(response);
|
||||
} catch (Exception e) {
|
||||
onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
try {
|
||||
channel.sendResponse(e);
|
||||
} catch (Exception inner) {
|
||||
inner.addSuppressed(e);
|
||||
logger.warn("failed to send response for get", inner);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,7 +21,7 @@ package org.elasticsearch.action.update;
|
|||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.RoutingMissingException;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
|
@ -34,17 +34,19 @@ import org.elasticsearch.action.index.IndexResponse;
|
|||
import org.elasticsearch.action.index.TransportIndexAction;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.AutoCreateIndex;
|
||||
import org.elasticsearch.action.support.replication.TransportWriteAction;
|
||||
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.action.support.TransportActions;
|
||||
import org.elasticsearch.action.support.single.instance.TransportInstanceSingleOperationAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.PlainShardIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
|
@ -52,52 +54,59 @@ import org.elasticsearch.index.IndexService;
|
|||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndexAlreadyExistsException;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.ExceptionsHelper.unwrapCause;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class TransportUpdateAction extends TransportWriteAction<UpdateRequest, UpdateReplicaRequest, UpdateResponse> {
|
||||
public class TransportUpdateAction extends TransportInstanceSingleOperationAction<UpdateRequest, UpdateResponse> {
|
||||
|
||||
private final TransportDeleteAction deleteAction;
|
||||
private final TransportIndexAction indexAction;
|
||||
private final AutoCreateIndex autoCreateIndex;
|
||||
private final TransportCreateIndexAction createIndexAction;
|
||||
private final UpdateHelper updateHelper;
|
||||
private final IndicesService indicesService;
|
||||
private final MappingUpdatedAction mappingUpdatedAction;
|
||||
private final boolean allowIdGeneration;
|
||||
|
||||
@Inject
|
||||
public TransportUpdateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
|
||||
TransportCreateIndexAction createIndexAction, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, IndicesService indicesService,
|
||||
AutoCreateIndex autoCreateIndex, ShardStateAction shardStateAction,
|
||||
MappingUpdatedAction mappingUpdatedAction, ScriptService scriptService) {
|
||||
super(settings, UpdateAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
|
||||
actionFilters, indexNameExpressionResolver, UpdateRequest::new, UpdateReplicaRequest::new, ThreadPool.Names.INDEX);
|
||||
TransportIndexAction indexAction, TransportDeleteAction deleteAction, TransportCreateIndexAction createIndexAction,
|
||||
UpdateHelper updateHelper, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
IndicesService indicesService, AutoCreateIndex autoCreateIndex) {
|
||||
super(settings, UpdateAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, UpdateRequest::new);
|
||||
this.indexAction = indexAction;
|
||||
this.deleteAction = deleteAction;
|
||||
this.createIndexAction = createIndexAction;
|
||||
this.updateHelper = new UpdateHelper(scriptService, logger);
|
||||
this.updateHelper = updateHelper;
|
||||
this.indicesService = indicesService;
|
||||
this.autoCreateIndex = autoCreateIndex;
|
||||
this.mappingUpdatedAction = mappingUpdatedAction;
|
||||
this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, UpdateRequest request) {
|
||||
super.resolveRequest(metaData, indexMetaData, request);
|
||||
resolveAndValidateRouting(metaData, indexMetaData.getIndex().getName(), request);
|
||||
ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(),
|
||||
indexMetaData.getIndex().getName(), request.id(), request.routing());
|
||||
request.setShardId(shardId);
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.INDEX;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected UpdateResponse newResponse() {
|
||||
return new UpdateResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean retryOnFailure(Exception e) {
|
||||
return TransportActions.isShardNotAvailableException(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void resolveRequest(ClusterState state, UpdateRequest request) {
|
||||
resolveAndValidateRouting(state.metaData(), request.concreteIndex(), request);
|
||||
}
|
||||
|
||||
public static void resolveAndValidateRouting(MetaData metaData, String concreteIndex, UpdateRequest request) {
|
||||
|
@ -109,17 +118,13 @@ public class TransportUpdateAction extends TransportWriteAction<UpdateRequest, U
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, UpdateRequest request, ActionListener<UpdateResponse> listener) {
|
||||
protected void doExecute(final UpdateRequest request, final ActionListener<UpdateResponse> listener) {
|
||||
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
|
||||
if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
|
||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
|
||||
createIndexRequest.index(request.index());
|
||||
createIndexRequest.cause("auto(update api)");
|
||||
createIndexRequest.masterNodeTimeout(request.timeout());
|
||||
createIndexAction.execute(createIndexRequest, new ActionListener<CreateIndexResponse>() {
|
||||
createIndexAction.execute(new CreateIndexRequest().index(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(CreateIndexResponse result) {
|
||||
innerExecute(task, request, listener);
|
||||
innerExecute(request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -127,7 +132,7 @@ public class TransportUpdateAction extends TransportWriteAction<UpdateRequest, U
|
|||
if (unwrapCause(e) instanceof IndexAlreadyExistsException) {
|
||||
// we have the index, do it
|
||||
try {
|
||||
innerExecute(task, request, listener);
|
||||
innerExecute(request, listener);
|
||||
} catch (Exception inner) {
|
||||
inner.addSuppressed(e);
|
||||
listener.onFailure(inner);
|
||||
|
@ -138,123 +143,153 @@ public class TransportUpdateAction extends TransportWriteAction<UpdateRequest, U
|
|||
}
|
||||
});
|
||||
} else {
|
||||
innerExecute(task, request, listener);
|
||||
innerExecute(request, listener);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected UpdateResponse newResponseInstance() {
|
||||
return new UpdateResponse();
|
||||
}
|
||||
|
||||
private void innerExecute(Task task, final UpdateRequest request, final ActionListener<UpdateResponse> listener) {
|
||||
super.doExecute(task, request, listener);
|
||||
private void innerExecute(final UpdateRequest request, final ActionListener<UpdateResponse> listener) {
|
||||
super.doExecute(request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WriteResult<UpdateReplicaRequest, UpdateResponse> onPrimaryShard(UpdateRequest request, IndexShard indexShard) throws Exception {
|
||||
ShardId shardId = request.shardId();
|
||||
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
|
||||
final IndexMetaData indexMetaData = indexService.getMetaData();
|
||||
return executeUpdateRequestOnPrimary(request, indexShard, indexMetaData, updateHelper, mappingUpdatedAction, allowIdGeneration);
|
||||
}
|
||||
|
||||
public static WriteResult<UpdateReplicaRequest, UpdateResponse> executeUpdateRequestOnPrimary(UpdateRequest request,
|
||||
IndexShard indexShard,
|
||||
IndexMetaData indexMetaData,
|
||||
UpdateHelper updateHelper,
|
||||
MappingUpdatedAction mappingUpdatedAction,
|
||||
boolean allowIdGeneration)
|
||||
throws Exception {
|
||||
int maxAttempts = request.retryOnConflict();
|
||||
for (int attemptCount = 0; attemptCount <= maxAttempts; attemptCount++) {
|
||||
try {
|
||||
return shardUpdateOperation(indexMetaData, indexShard, request, updateHelper, mappingUpdatedAction, allowIdGeneration);
|
||||
} catch (Exception e) {
|
||||
final Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||
if (attemptCount == maxAttempts // bubble up exception when we run out of attempts
|
||||
|| (cause instanceof VersionConflictEngineException) == false) { // or when exception is not a version conflict
|
||||
throw e;
|
||||
}
|
||||
protected ShardIterator shards(ClusterState clusterState, UpdateRequest request) {
|
||||
if (request.getShardId() != null) {
|
||||
return clusterState.routingTable().index(request.concreteIndex()).shard(request.getShardId().getId()).primaryShardIt();
|
||||
}
|
||||
ShardIterator shardIterator = clusterService.operationRouting()
|
||||
.indexShards(clusterState, request.concreteIndex(), request.id(), request.routing());
|
||||
ShardRouting shard;
|
||||
while ((shard = shardIterator.nextOrNull()) != null) {
|
||||
if (shard.primary()) {
|
||||
return new PlainShardIterator(shardIterator.shardId(), Collections.singletonList(shard));
|
||||
}
|
||||
}
|
||||
throw new IllegalStateException("version conflict exception should bubble up on last attempt");
|
||||
|
||||
return new PlainShardIterator(shardIterator.shardId(), Collections.<ShardRouting>emptyList());
|
||||
}
|
||||
|
||||
private static WriteResult<UpdateReplicaRequest, UpdateResponse> shardUpdateOperation(IndexMetaData indexMetaData,
|
||||
IndexShard indexShard,
|
||||
UpdateRequest request,
|
||||
UpdateHelper updateHelper,
|
||||
MappingUpdatedAction mappingUpdatedAction,
|
||||
boolean allowIdGeneration)
|
||||
throws Exception {
|
||||
@Override
|
||||
protected void shardOperation(final UpdateRequest request, final ActionListener<UpdateResponse> listener) {
|
||||
shardOperation(request, listener, 0);
|
||||
}
|
||||
|
||||
protected void shardOperation(final UpdateRequest request, final ActionListener<UpdateResponse> listener, final int retryCount) {
|
||||
final ShardId shardId = request.getShardId();
|
||||
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
|
||||
final IndexShard indexShard = indexService.getShard(shardId.getId());
|
||||
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard);
|
||||
switch (result.getResponseResult()) {
|
||||
case CREATED:
|
||||
IndexRequest upsertRequest = result.action();
|
||||
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
|
||||
final BytesReference upsertSourceBytes = upsertRequest.source();
|
||||
indexAction.execute(upsertRequest, new ActionListener<IndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(IndexResponse response) {
|
||||
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult());
|
||||
if ((request.fetchSource() != null && request.fetchSource().fetchSource()) ||
|
||||
(request.fields() != null && request.fields().length > 0)) {
|
||||
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(upsertSourceBytes, true);
|
||||
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes));
|
||||
} else {
|
||||
update.setGetResult(null);
|
||||
}
|
||||
update.setForcedRefresh(response.forcedRefresh());
|
||||
listener.onResponse(update);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
final Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||
if (cause instanceof VersionConflictEngineException) {
|
||||
if (retryCount < request.retryOnConflict()) {
|
||||
logger.trace("Retry attempt [{}] of [{}] on version conflict on [{}][{}][{}]",
|
||||
retryCount + 1, request.retryOnConflict(), request.index(), request.getShardId(), request.id());
|
||||
threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) {
|
||||
@Override
|
||||
protected void doRun() {
|
||||
shardOperation(request, listener, retryCount + 1);
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause));
|
||||
}
|
||||
});
|
||||
break;
|
||||
case UPDATED:
|
||||
IndexRequest indexRequest = result.action();
|
||||
MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
|
||||
indexRequest.process(mappingMd, allowIdGeneration, indexMetaData.getIndex().getName());
|
||||
WriteResult<IndexRequest, IndexResponse> indexResponseWriteResult = TransportIndexAction.executeIndexRequestOnPrimary(indexRequest, indexShard, mappingUpdatedAction);
|
||||
IndexResponse response = indexResponseWriteResult.getResponse();
|
||||
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult());
|
||||
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
|
||||
final BytesReference indexSourceBytes = indexRequest.source();
|
||||
if (result.getResponseResult() == DocWriteResponse.Result.CREATED) {
|
||||
if ((request.fetchSource() != null && request.fetchSource().fetchSource()) ||
|
||||
(request.fields() != null && request.fields().length > 0)) {
|
||||
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceBytes, true);
|
||||
update.setGetResult(updateHelper.extractGetResult(request, indexMetaData.getIndex().getName(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceBytes));
|
||||
} else {
|
||||
update.setGetResult(null);
|
||||
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(IndexResponse response) {
|
||||
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult());
|
||||
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
|
||||
update.setForcedRefresh(response.forcedRefresh());
|
||||
listener.onResponse(update);
|
||||
}
|
||||
} else if (result.getResponseResult() == DocWriteResponse.Result.UPDATED) {
|
||||
update.setGetResult(updateHelper.extractGetResult(request, indexMetaData.getIndex().getName(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
|
||||
}
|
||||
update.setForcedRefresh(response.forcedRefresh());
|
||||
UpdateReplicaRequest updateReplicaRequest = new UpdateReplicaRequest(indexRequest);
|
||||
updateReplicaRequest.setParentTask(request.getParentTask());
|
||||
updateReplicaRequest.setShardId(request.shardId());
|
||||
updateReplicaRequest.setRefreshPolicy(request.getRefreshPolicy());
|
||||
return new WriteResult<>(updateReplicaRequest, update, indexResponseWriteResult.getLocation());
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
final Throwable cause = unwrapCause(e);
|
||||
if (cause instanceof VersionConflictEngineException) {
|
||||
if (retryCount < request.retryOnConflict()) {
|
||||
threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) {
|
||||
@Override
|
||||
protected void doRun() {
|
||||
shardOperation(request, listener, retryCount + 1);
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause));
|
||||
}
|
||||
});
|
||||
break;
|
||||
case DELETED:
|
||||
DeleteRequest deleteRequest = result.action();
|
||||
WriteResult<DeleteRequest, DeleteResponse> deleteResponseWriteResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard);
|
||||
DeleteResponse deleteResponse = deleteResponseWriteResult.getResponse();
|
||||
UpdateResponse deleteUpdate = new UpdateResponse(deleteResponse.getShardInfo(), deleteResponse.getShardId(), deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getVersion(), deleteResponse.getResult());
|
||||
deleteUpdate.setGetResult(updateHelper.extractGetResult(request, indexMetaData.getIndex().getName(), deleteResponse.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));
|
||||
deleteUpdate.setForcedRefresh(deleteResponse.forcedRefresh());
|
||||
UpdateReplicaRequest deleteReplicaRequest = new UpdateReplicaRequest(deleteRequest);
|
||||
deleteReplicaRequest.setParentTask(request.getParentTask());
|
||||
deleteReplicaRequest.setShardId(request.shardId());
|
||||
deleteReplicaRequest.setRefreshPolicy(request.getRefreshPolicy());
|
||||
return new WriteResult<>(deleteReplicaRequest, deleteUpdate, deleteResponseWriteResult.getLocation());
|
||||
deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() {
|
||||
@Override
|
||||
public void onResponse(DeleteResponse response) {
|
||||
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult());
|
||||
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));
|
||||
update.setForcedRefresh(response.forcedRefresh());
|
||||
listener.onResponse(update);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
final Throwable cause = unwrapCause(e);
|
||||
if (cause instanceof VersionConflictEngineException) {
|
||||
if (retryCount < request.retryOnConflict()) {
|
||||
threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) {
|
||||
@Override
|
||||
protected void doRun() {
|
||||
shardOperation(request, listener, retryCount + 1);
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause));
|
||||
}
|
||||
});
|
||||
break;
|
||||
case NOOP:
|
||||
UpdateResponse noopUpdate = result.action();
|
||||
indexShard.noopUpdate(request.type());
|
||||
return new WriteResult<>(null, noopUpdate, null);
|
||||
UpdateResponse update = result.action();
|
||||
IndexService indexServiceOrNull = indicesService.indexService(shardId.getIndex());
|
||||
if (indexServiceOrNull != null) {
|
||||
IndexShard shard = indexService.getShardOrNull(shardId.getId());
|
||||
if (shard != null) {
|
||||
shard.noopUpdate(request.type());
|
||||
}
|
||||
}
|
||||
listener.onResponse(update);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("Illegal result " + result.getResponseResult());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Translog.Location onReplicaShard(UpdateReplicaRequest request, IndexShard indexShard) {
|
||||
assert request.getRequest() != null;
|
||||
final Translog.Location location;
|
||||
switch (request.getRequest().opType()) {
|
||||
case INDEX:
|
||||
case CREATE:
|
||||
location = TransportIndexAction.executeIndexRequestOnReplica(((IndexRequest) request.getRequest()), indexShard).getTranslogLocation();
|
||||
break;
|
||||
case DELETE:
|
||||
location = TransportDeleteAction.executeDeleteRequestOnReplica(((DeleteRequest) request.getRequest()), indexShard).getTranslogLocation();
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("unexpected opType [" + request.getRequest().opType().getLowercase() + "]");
|
||||
|
||||
}
|
||||
return location;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.action.update;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
|
@ -28,8 +27,11 @@ import org.elasticsearch.client.Requests;
|
|||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
|
@ -61,14 +63,14 @@ import java.util.Map;
|
|||
/**
|
||||
* Helper for translating an update request to an index, delete request or update response.
|
||||
*/
|
||||
public class UpdateHelper {
|
||||
public class UpdateHelper extends AbstractComponent {
|
||||
|
||||
private final ScriptService scriptService;
|
||||
private final Logger logger;
|
||||
|
||||
public UpdateHelper(ScriptService scriptService, Logger logger) {
|
||||
@Inject
|
||||
public UpdateHelper(Settings settings, ScriptService scriptService) {
|
||||
super(settings);
|
||||
this.scriptService = scriptService;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -257,7 +259,7 @@ public class UpdateHelper {
|
|||
return ctx;
|
||||
}
|
||||
|
||||
private static TimeValue getTTLFromScriptContext(Map<String, Object> ctx) {
|
||||
private TimeValue getTTLFromScriptContext(Map<String, Object> ctx) {
|
||||
Object fetchedTTL = ctx.get("_ttl");
|
||||
if (fetchedTTL != null) {
|
||||
if (fetchedTTL instanceof Number) {
|
||||
|
|
|
@ -1,113 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.update;
|
||||
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/** Replica request for update operation holds translated (index/delete) requests */
|
||||
public class UpdateReplicaRequest extends DocumentRequest<UpdateReplicaRequest> {
|
||||
private DocumentRequest<?> request;
|
||||
|
||||
public UpdateReplicaRequest() {
|
||||
}
|
||||
|
||||
public UpdateReplicaRequest(DocumentRequest<?> request) {
|
||||
assert !(request instanceof UpdateReplicaRequest) : "underlying request must not be a update replica request";
|
||||
this.request = request;
|
||||
this.index = request.index();
|
||||
setRefreshPolicy(request.getRefreshPolicy());
|
||||
setShardId(request.shardId());
|
||||
setParentTask(request.getParentTask());
|
||||
}
|
||||
|
||||
public DocumentRequest<?> getRequest() {
|
||||
return request;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
request = DocumentRequest.readDocumentRequest(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
DocumentRequest.writeDocumentRequest(out, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String type() {
|
||||
return request.type();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String id() {
|
||||
return request.id();
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateReplicaRequest routing(String routing) {
|
||||
throw new UnsupportedOperationException("setting routing is not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String routing() {
|
||||
return request.routing();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String parent() {
|
||||
return request.parent();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long version() {
|
||||
return request.version();
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateReplicaRequest version(long version) {
|
||||
throw new UnsupportedOperationException("setting version is not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public VersionType versionType() {
|
||||
return request.versionType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateReplicaRequest versionType(VersionType versionType) {
|
||||
throw new UnsupportedOperationException("setting version type is not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpType opType() {
|
||||
return request.opType();
|
||||
}
|
||||
}
|
|
@ -23,6 +23,9 @@ import org.elasticsearch.action.ActionRequestValidationException;
|
|||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.action.support.replication.ReplicationRequest;
|
||||
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
|
@ -53,7 +56,10 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
|
|||
|
||||
/**
|
||||
*/
|
||||
public class UpdateRequest extends DocumentRequest<UpdateRequest> {
|
||||
public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
|
||||
implements DocumentRequest<UpdateRequest>, WriteRequest<UpdateRequest> {
|
||||
private static final DeprecationLogger DEPRECATION_LOGGER =
|
||||
new DeprecationLogger(Loggers.getLogger(UpdateRequest.class));
|
||||
|
||||
private String type;
|
||||
private String id;
|
||||
|
@ -91,7 +97,7 @@ public class UpdateRequest extends DocumentRequest<UpdateRequest> {
|
|||
}
|
||||
|
||||
public UpdateRequest(String index, String type, String id) {
|
||||
this.index = index;
|
||||
super(index);
|
||||
this.type = type;
|
||||
this.id = id;
|
||||
}
|
||||
|
@ -489,6 +495,39 @@ public class UpdateRequest extends DocumentRequest<UpdateRequest> {
|
|||
return OpType.UPDATE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateRequest setRefreshPolicy(RefreshPolicy refreshPolicy) {
|
||||
this.refreshPolicy = refreshPolicy;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshPolicy getRefreshPolicy() {
|
||||
return refreshPolicy;
|
||||
}
|
||||
|
||||
public ActiveShardCount waitForActiveShards() {
|
||||
return this.waitForActiveShards;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the number of shard copies that must be active before proceeding with the write.
|
||||
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
|
||||
*/
|
||||
public UpdateRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
|
||||
this.waitForActiveShards = waitForActiveShards;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
|
||||
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
|
||||
* to get the ActiveShardCount.
|
||||
*/
|
||||
public UpdateRequest waitForActiveShards(final int waitForActiveShards) {
|
||||
return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the doc to use for updates when a script is not specified.
|
||||
*/
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.elasticsearch.action.index.IndexRequest;
|
|||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.action.support.WriteRequestBuilder;
|
||||
import org.elasticsearch.action.support.replication.ReplicationRequest;
|
||||
import org.elasticsearch.action.support.replication.ReplicationRequestBuilder;
|
||||
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequestBuilder;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
|
@ -37,7 +37,7 @@ import org.elasticsearch.script.Script;
|
|||
|
||||
import java.util.Map;
|
||||
|
||||
public class UpdateRequestBuilder extends ReplicationRequestBuilder<UpdateRequest, UpdateResponse, UpdateRequestBuilder>
|
||||
public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder<UpdateRequest, UpdateResponse, UpdateRequestBuilder>
|
||||
implements WriteRequestBuilder<UpdateRequestBuilder> {
|
||||
private static final DeprecationLogger DEPRECATION_LOGGER =
|
||||
new DeprecationLogger(Loggers.getLogger(RestUpdateAction.class));
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.indices;
|
|||
import org.elasticsearch.action.admin.indices.rollover.Condition;
|
||||
import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
|
||||
import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition;
|
||||
import org.elasticsearch.action.update.UpdateHelper;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
|
||||
import org.elasticsearch.common.geo.ShapesAvailability;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
|
@ -181,6 +182,7 @@ public class IndicesModule extends AbstractModule {
|
|||
bind(SyncedFlushService.class).asEagerSingleton();
|
||||
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
|
||||
bind(IndicesTTLService.class).asEagerSingleton();
|
||||
bind(UpdateHelper.class).asEagerSingleton();
|
||||
bind(MetaDataIndexUpgradeService.class).asEagerSingleton();
|
||||
bind(NodeServicesProvider.class).asEagerSingleton();
|
||||
}
|
||||
|
|
|
@ -222,7 +222,8 @@ public class IndicesRequestIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
public void testUpdate() {
|
||||
String[] updateShardActions = new String[]{UpdateAction.NAME + "[p]", UpdateAction.NAME + "[r]"};
|
||||
//update action goes to the primary, index op gets executed locally, then replicated
|
||||
String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[r]"};
|
||||
interceptTransportActions(updateShardActions);
|
||||
|
||||
String indexOrAlias = randomIndexOrAlias();
|
||||
|
@ -236,7 +237,8 @@ public class IndicesRequestIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
public void testUpdateUpsert() {
|
||||
String[] updateShardActions = new String[]{UpdateAction.NAME + "[p]", UpdateAction.NAME + "[r]"};
|
||||
//update action goes to the primary, index op gets executed locally, then replicated
|
||||
String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[r]"};
|
||||
interceptTransportActions(updateShardActions);
|
||||
|
||||
String indexOrAlias = randomIndexOrAlias();
|
||||
|
@ -249,7 +251,8 @@ public class IndicesRequestIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
public void testUpdateDelete() {
|
||||
String[] updateShardActions = new String[]{UpdateAction.NAME + "[p]", UpdateAction.NAME + "[r]"};
|
||||
//update action goes to the primary, delete op gets executed locally, then replicated
|
||||
String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", DeleteAction.NAME + "[r]"};
|
||||
interceptTransportActions(updateShardActions);
|
||||
|
||||
String indexOrAlias = randomIndexOrAlias();
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.action.support.replication;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
|
||||
import org.elasticsearch.action.support.WriteResponse;
|
||||
|
@ -129,21 +128,21 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
resultChecker.accept(listener.response, forcedRefresh);
|
||||
}
|
||||
|
||||
private class TestAction extends TransportWriteAction<TestRequest, TestRequest, TestResponse> {
|
||||
private class TestAction extends TransportWriteAction<TestRequest, TestResponse> {
|
||||
protected TestAction() {
|
||||
super(Settings.EMPTY, "test", new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR),
|
||||
null, null, null, null, new ActionFilters(new HashSet<>()),
|
||||
new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME);
|
||||
new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new, ThreadPool.Names.SAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected IndexShard indexShard(ReplicatedWriteRequest request) {
|
||||
protected IndexShard indexShard(TestRequest request) {
|
||||
return indexShard;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WriteResult<TestRequest, TestResponse> onPrimaryShard(TestRequest request, IndexShard indexShard) throws Exception {
|
||||
return new WriteResult<>(request, new TestResponse(), location);
|
||||
protected WriteResult<TestResponse> onPrimaryShard(TestRequest request, IndexShard indexShard) throws Exception {
|
||||
return new WriteResult<>(new TestResponse(), location);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,327 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support.single.instance;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.support.ActionFilter;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.CapturingTransport;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.setState;
|
||||
import static org.hamcrest.core.IsEqual.equalTo;
|
||||
|
||||
public class TransportInstanceSingleOperationActionTests extends ESTestCase {
|
||||
|
||||
private static ThreadPool THREAD_POOL;
|
||||
|
||||
private ClusterService clusterService;
|
||||
private CapturingTransport transport;
|
||||
private TransportService transportService;
|
||||
|
||||
private TestTransportInstanceSingleOperationAction action;
|
||||
|
||||
public static class Request extends InstanceShardOperationRequest<Request> {
|
||||
public Request() {
|
||||
}
|
||||
}
|
||||
|
||||
public static class Response extends ActionResponse {
|
||||
public Response() {
|
||||
}
|
||||
}
|
||||
|
||||
class TestTransportInstanceSingleOperationAction extends TransportInstanceSingleOperationAction<Request, Response> {
|
||||
private final Map<ShardId, Object> shards = new HashMap<>();
|
||||
|
||||
public TestTransportInstanceSingleOperationAction(Settings settings, String actionName, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
|
||||
super(settings, actionName, THREAD_POOL, TransportInstanceSingleOperationActionTests.this.clusterService, transportService, actionFilters, indexNameExpressionResolver, request);
|
||||
}
|
||||
|
||||
public Map<ShardId, Object> getResults() {
|
||||
return shards;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void shardOperation(Request request, ActionListener<Response> listener) {
|
||||
throw new UnsupportedOperationException("Not implemented in test class");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponse() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void resolveRequest(ClusterState state, Request request) {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ShardIterator shards(ClusterState clusterState, Request request) {
|
||||
return clusterState.routingTable().index(request.concreteIndex()).shard(request.shardId.getId()).primaryShardIt();
|
||||
}
|
||||
}
|
||||
|
||||
class MyResolver extends IndexNameExpressionResolver {
|
||||
public MyResolver() {
|
||||
super(Settings.EMPTY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] concreteIndexNames(ClusterState state, IndicesRequest request) {
|
||||
return request.indices();
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void startThreadPool() {
|
||||
THREAD_POOL = new TestThreadPool(TransportInstanceSingleOperationActionTests.class.getSimpleName());
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
transport = new CapturingTransport();
|
||||
clusterService = createClusterService(THREAD_POOL);
|
||||
transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
action = new TestTransportInstanceSingleOperationAction(
|
||||
Settings.EMPTY,
|
||||
"indices:admin/test",
|
||||
transportService,
|
||||
new ActionFilters(new HashSet<ActionFilter>()),
|
||||
new MyResolver(),
|
||||
Request::new
|
||||
);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
clusterService.close();
|
||||
transportService.close();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void destroyThreadPool() {
|
||||
ThreadPool.terminate(THREAD_POOL, 30, TimeUnit.SECONDS);
|
||||
// since static must set to null to be eligible for collection
|
||||
THREAD_POOL = null;
|
||||
}
|
||||
|
||||
public void testGlobalBlock() {
|
||||
Request request = new Request();
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
ClusterBlocks.Builder block = ClusterBlocks.builder()
|
||||
.addGlobalBlock(new ClusterBlock(1, "", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
|
||||
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
|
||||
try {
|
||||
action.new AsyncSingleAction(request, listener).start();
|
||||
listener.get();
|
||||
fail("expected ClusterBlockException");
|
||||
} catch (Exception e) {
|
||||
if (ExceptionsHelper.unwrap(e, ClusterBlockException.class) == null) {
|
||||
logger.info("expected ClusterBlockException but got ", e);
|
||||
fail("expected ClusterBlockException");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testBasicRequestWorks() throws InterruptedException, ExecutionException, TimeoutException {
|
||||
Request request = new Request().index("test");
|
||||
request.shardId = new ShardId("test", "_na_", 0);
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
setState(clusterService, ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED));
|
||||
action.new AsyncSingleAction(request, listener).start();
|
||||
assertThat(transport.capturedRequests().length, equalTo(1));
|
||||
transport.handleResponse(transport.capturedRequests()[0].requestId, new Response());
|
||||
listener.get();
|
||||
}
|
||||
|
||||
public void testFailureWithoutRetry() throws Exception {
|
||||
Request request = new Request().index("test");
|
||||
request.shardId = new ShardId("test", "_na_", 0);
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
setState(clusterService, ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED));
|
||||
|
||||
action.new AsyncSingleAction(request, listener).start();
|
||||
assertThat(transport.capturedRequests().length, equalTo(1));
|
||||
long requestId = transport.capturedRequests()[0].requestId;
|
||||
transport.clear();
|
||||
// this should not trigger retry or anything and the listener should report exception immediately
|
||||
transport.handleRemoteError(requestId, new TransportException("a generic transport exception", new Exception("generic test exception")));
|
||||
|
||||
try {
|
||||
// result should return immediately
|
||||
assertTrue(listener.isDone());
|
||||
listener.get();
|
||||
fail("this should fail with a transport exception");
|
||||
} catch (ExecutionException t) {
|
||||
if (ExceptionsHelper.unwrap(t, TransportException.class) == null) {
|
||||
logger.info("expected TransportException but got ", t);
|
||||
fail("expected and TransportException");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testSuccessAfterRetryWithClusterStateUpdate() throws Exception {
|
||||
Request request = new Request().index("test");
|
||||
request.shardId = new ShardId("test", "_na_", 0);
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
boolean local = randomBoolean();
|
||||
setState(clusterService, ClusterStateCreationUtils.state("test", local, ShardRoutingState.INITIALIZING));
|
||||
action.new AsyncSingleAction(request, listener).start();
|
||||
// this should fail because primary not initialized
|
||||
assertThat(transport.capturedRequests().length, equalTo(0));
|
||||
setState(clusterService, ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED));
|
||||
// this time it should work
|
||||
assertThat(transport.capturedRequests().length, equalTo(1));
|
||||
transport.handleResponse(transport.capturedRequests()[0].requestId, new Response());
|
||||
listener.get();
|
||||
}
|
||||
|
||||
public void testSuccessAfterRetryWithExceptionFromTransport() throws Exception {
|
||||
Request request = new Request().index("test");
|
||||
request.shardId = new ShardId("test", "_na_", 0);
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
boolean local = randomBoolean();
|
||||
setState(clusterService, ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED));
|
||||
action.new AsyncSingleAction(request, listener).start();
|
||||
assertThat(transport.capturedRequests().length, equalTo(1));
|
||||
long requestId = transport.capturedRequests()[0].requestId;
|
||||
transport.clear();
|
||||
DiscoveryNode node = clusterService.state().getNodes().getLocalNode();
|
||||
transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception"));
|
||||
// trigger cluster state observer
|
||||
setState(clusterService, ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED));
|
||||
assertThat(transport.capturedRequests().length, equalTo(1));
|
||||
transport.handleResponse(transport.capturedRequests()[0].requestId, new Response());
|
||||
listener.get();
|
||||
}
|
||||
|
||||
public void testRetryOfAnAlreadyTimedOutRequest() throws Exception {
|
||||
Request request = new Request().index("test").timeout(new TimeValue(0, TimeUnit.MILLISECONDS));
|
||||
request.shardId = new ShardId("test", "_na_", 0);
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
setState(clusterService, ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED));
|
||||
action.new AsyncSingleAction(request, listener).start();
|
||||
assertThat(transport.capturedRequests().length, equalTo(1));
|
||||
long requestId = transport.capturedRequests()[0].requestId;
|
||||
transport.clear();
|
||||
DiscoveryNode node = clusterService.state().getNodes().getLocalNode();
|
||||
transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception"));
|
||||
|
||||
// wait until the timeout was triggered and we actually tried to send for the second time
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
assertThat(transport.capturedRequests().length, equalTo(1));
|
||||
}
|
||||
});
|
||||
|
||||
// let it fail the second time too
|
||||
requestId = transport.capturedRequests()[0].requestId;
|
||||
transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception"));
|
||||
try {
|
||||
// result should return immediately
|
||||
assertTrue(listener.isDone());
|
||||
listener.get();
|
||||
fail("this should fail with a transport exception");
|
||||
} catch (ExecutionException t) {
|
||||
if (ExceptionsHelper.unwrap(t, ConnectTransportException.class) == null) {
|
||||
logger.info("expected ConnectTransportException but got ", t);
|
||||
fail("expected and ConnectTransportException");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testUnresolvableRequestDoesNotHang() throws InterruptedException, ExecutionException, TimeoutException {
|
||||
action = new TestTransportInstanceSingleOperationAction(
|
||||
Settings.EMPTY,
|
||||
"indices:admin/test_unresolvable",
|
||||
transportService,
|
||||
new ActionFilters(new HashSet<>()),
|
||||
new MyResolver(),
|
||||
Request::new
|
||||
) {
|
||||
@Override
|
||||
protected void resolveRequest(ClusterState state, Request request) {
|
||||
throw new IllegalStateException("request cannot be resolved");
|
||||
}
|
||||
};
|
||||
Request request = new Request().index("test");
|
||||
request.shardId = new ShardId("test", "_na_", 0);
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
setState(clusterService, ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED));
|
||||
action.new AsyncSingleAction(request, listener).start();
|
||||
assertThat(transport.capturedRequests().length, equalTo(0));
|
||||
try {
|
||||
listener.get();
|
||||
} catch (Exception e) {
|
||||
if (ExceptionsHelper.unwrap(e, IllegalStateException.class) == null) {
|
||||
logger.info("expected IllegalStateException but got ", e);
|
||||
fail("expected and IllegalStateException");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -172,8 +172,9 @@ public class UpdateRequestTests extends ESTestCase {
|
|||
// Related to issue 3256
|
||||
public void testUpdateRequestWithTTL() throws Exception {
|
||||
TimeValue providedTTLValue = TimeValue.parseTimeValue(randomTimeValue(), null, "ttl");
|
||||
Settings settings = settings(Version.CURRENT).build();
|
||||
|
||||
UpdateHelper updateHelper = new UpdateHelper(null, logger);
|
||||
UpdateHelper updateHelper = new UpdateHelper(settings, null);
|
||||
|
||||
// We just upsert one document with ttl
|
||||
IndexRequest indexRequest = new IndexRequest("test", "type1", "1")
|
||||
|
|
|
@ -366,8 +366,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
|
||||
@Override
|
||||
protected PrimaryResult performOnPrimary(IndexShard primary, IndexRequest request) throws Exception {
|
||||
TransportWriteAction.WriteResult<IndexRequest, IndexResponse> result =
|
||||
TransportIndexAction.executeIndexRequestOnPrimary(request, primary, null);
|
||||
TransportWriteAction.WriteResult<IndexResponse> result = TransportIndexAction.executeIndexRequestOnPrimary(request, primary,
|
||||
null);
|
||||
request.primaryTerm(primary.getPrimaryTerm());
|
||||
TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.getLocation(), logger);
|
||||
return new PrimaryResult(request, result.getResponse());
|
||||
|
|
|
@ -162,8 +162,8 @@ the request was ignored.
|
|||
--------------------------------------------------
|
||||
{
|
||||
"_shards": {
|
||||
"total": 1,
|
||||
"successful": 1,
|
||||
"total": 0,
|
||||
"successful": 0,
|
||||
"failed": 0
|
||||
},
|
||||
"_index": "test",
|
||||
|
|
Loading…
Reference in New Issue