Merge remote-tracking branch 'zareek/enhancement/use_shard_bulk_for_single_ops'

This commit is contained in:
Lee Hinman 2017-01-13 10:09:18 -07:00
commit cd236c4de4
22 changed files with 728 additions and 841 deletions

View File

@ -22,8 +22,8 @@ package org.elasticsearch.action.bulk;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.DocWriteRequest.OpType;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse;
@ -92,10 +92,10 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject {
private final String index;
private final String type;
private final String id;
private final Throwable cause;
private final Exception cause;
private final RestStatus status;
public Failure(String index, String type, String id, Throwable cause) {
public Failure(String index, String type, String id, Exception cause) {
this.index = index;
this.type = type;
this.id = id;
@ -161,7 +161,7 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject {
/**
* The actual cause of the failure.
*/
public Throwable getCause() {
public Exception getCause() {
return cause;
}

View File

@ -31,8 +31,6 @@ import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.TransportDeleteAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.IngestActionForwarder;
import org.elasticsearch.action.support.ActionFilters;
@ -41,6 +39,8 @@ import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.update.TransportUpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -49,11 +49,15 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@ -153,11 +157,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
try {
executeBulk(task, bulkRequest, startTime, listener, responses);
} catch (Exception e) {
listener.onFailure(e);
}
}
}
@ -173,12 +173,10 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
}
if (counter.decrementAndGet() == 0) {
try {
executeBulk(task, bulkRequest, startTime, listener, responses);
} catch (Exception inner) {
executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
inner.addSuppressed(e);
listener.onFailure(inner);
}
}), responses);
}
}
});
@ -209,26 +207,43 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
return false;
}
/**
* This method executes the {@link BulkRequest} and calls the given listener once the request returns.
* This method will not create any indices even if auto-create indices is enabled.
*
* @see #doExecute(BulkRequest, org.elasticsearch.action.ActionListener)
*/
public void executeBulk(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
final long startTimeNanos = relativeTime();
executeBulk(null, bulkRequest, startTimeNanos, listener, new AtomicArray<>(bulkRequest.requests.size()));
}
private long buildTookInMillis(long startTimeNanos) {
return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeNanos);
}
void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses ) {
final ClusterState clusterState = clusterService.state();
// TODO use timeout to wait here if its blocked...
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE);
/**
* retries on retryable cluster blocks, resolves item requests,
* constructs shard bulk requests and delegates execution to shard bulk action
* */
private final class BulkOperation extends AbstractRunnable {
private final Task task;
private final BulkRequest bulkRequest;
private final ActionListener<BulkResponse> listener;
private final AtomicArray<BulkItemResponse> responses;
private final long startTimeNanos;
private final ClusterStateObserver observer;
BulkOperation(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener,
AtomicArray<BulkItemResponse> responses, long startTimeNanos) {
this.task = task;
this.bulkRequest = bulkRequest;
this.listener = listener;
this.responses = responses;
this.startTimeNanos = startTimeNanos;
this.observer = new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext());
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
final ClusterState clusterState = observer.setAndGetObservedState();
if (handleBlockExceptions(clusterState)) {
return;
}
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
MetaData metaData = clusterState.metaData();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
@ -258,7 +273,11 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest);
break;
case DELETE:
TransportDeleteAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (DeleteRequest) docWriteRequest);
docWriteRequest.routing(metaData.resolveIndexRouting(docWriteRequest.parent(), docWriteRequest.routing(), docWriteRequest.index()));
// check if routing is required, if so, throw error if routing wasn't specified
if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName(), docWriteRequest.type())) {
throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());
}
break;
default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
}
@ -337,6 +356,54 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
}
private boolean handleBlockExceptions(ClusterState state) {
ClusterBlockException blockException = state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked, scheduling a retry", blockException);
retry(blockException);
} else {
onFailure(blockException);
}
return true;
}
return false;
}
void retry(Exception failure) {
assert failure != null;
if (observer.isTimedOut()) {
// we running as a last attempt after a timeout has happened. don't retry
onFailure(failure);
return;
}
final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
context.close();
run();
}
@Override
public void onClusterServiceClose() {
onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
context.close();
// Try one more time...
run();
}
});
}
}
void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses ) {
new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos).run();
}
private boolean addFailureIfIndexIsUnavailable(DocWriteRequest request, BulkRequest bulkRequest, AtomicArray<BulkItemResponse> responses, int idx,
final ConcreteIndices concreteIndices,
final MetaData metaData) {

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.action.update.UpdateHelper;
@ -50,9 +51,12 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
@ -61,10 +65,6 @@ import org.elasticsearch.transport.TransportService;
import java.util.Map;
import static org.elasticsearch.action.delete.TransportDeleteAction.executeDeleteRequestOnPrimary;
import static org.elasticsearch.action.delete.TransportDeleteAction.executeDeleteRequestOnReplica;
import static org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary;
import static org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnReplica;
import static org.elasticsearch.action.support.replication.ReplicationOperation.ignoreReplicaException;
import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException;
@ -105,7 +105,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
@Override
protected WritePrimaryResult shardOperationOnPrimary(BulkShardRequest request, IndexShard primary) throws Exception {
public WritePrimaryResult<BulkShardRequest, BulkShardResponse> shardOperationOnPrimary(
BulkShardRequest request, IndexShard primary) throws Exception {
final IndexMetaData metaData = primary.indexSettings().getIndexMetaData();
long[] preVersions = new long[request.items().length];
@ -121,7 +122,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
responses[i] = items[i].getPrimaryResponse();
}
BulkShardResponse response = new BulkShardResponse(request.shardId(), responses);
return new WritePrimaryResult(request, response, location, null, primary);
return new WritePrimaryResult<>(request, response, location, null, primary, logger);
}
/** Executes bulk item requests and handles request execution exceptions */
@ -362,7 +363,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
@Override
protected WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
@ -406,7 +407,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
}
}
return new WriteReplicaResult(request, location, null, replica);
return new WriteReplicaResult<>(request, location, null, replica, logger);
}
private Translog.Location locationToSync(Translog.Location current, Translog.Location next) {
@ -420,4 +421,77 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return next;
}
/**
* Execute the given {@link IndexRequest} on a replica shard, throwing a
* {@link RetryOnReplicaException} if the operation needs to be re-tried.
*/
public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) {
final ShardId shardId = replica.shardId();
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), request.type(), request.id(), request.source())
.routing(request.routing()).parent(request.parent());
final Engine.Index operation;
try {
operation = replica.prepareIndexOnReplica(sourceToParse, request.getSeqNo(), request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
} catch (MapperParsingException e) {
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
return replica.index(operation);
}
/** Utility method to prepare an index operation on primary shards */
static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) {
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), request.id(), request.source())
.routing(request.routing()).parent(request.parent());
return primary.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
}
/** Executes index operation on primary shard after updates mapping if dynamic mappings are found */
public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
MappingUpdatedAction mappingUpdatedAction) throws Exception {
Engine.Index operation;
try {
operation = prepareIndexOperationOnPrimary(request, primary);
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final ShardId shardId = primary.shardId();
if (update != null) {
// can throw timeout exception when updating mappings or ISE for attempting to update default mappings
// which are bubbled up
try {
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
} catch (IllegalArgumentException e) {
// throws IAE on conflicts merging dynamic mappings
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
}
try {
operation = prepareIndexOperationOnPrimary(request, primary);
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
}
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new ReplicationOperation.RetryOnPrimaryException(shardId,
"Dynamic mappings are not available on the node that holds the primary yet");
}
}
return primary.index(operation);
}
public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) {
final Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
return primary.delete(delete);
}
public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) {
final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(),
request.getSeqNo(), request.primaryTerm(), request.version(), request.versionType());
return replica.delete(delete);
}
}

View File

@ -0,0 +1,132 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.function.Supplier;
/** use transport bulk action directly */
@Deprecated
public abstract class TransportSingleItemBulkWriteAction<
Request extends ReplicatedWriteRequest<Request>,
Response extends ReplicationResponse & WriteResponse
> extends TransportWriteAction<Request, Request, Response> {
private final TransportBulkAction bulkAction;
private final TransportShardBulkAction shardBulkAction;
protected TransportSingleItemBulkWriteAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<Request> replicaRequest, String executor,
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, replicaRequest, executor);
this.bulkAction = bulkAction;
this.shardBulkAction = shardBulkAction;
}
@Override
protected void doExecute(Task task, final Request request, final ActionListener<Response> listener) {
bulkAction.execute(task, toSingleItemBulkRequest(request), wrapBulkResponse(listener));
}
@Override
protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(
Request request, final IndexShard primary) throws Exception {
BulkItemRequest[] itemRequests = new BulkItemRequest[1];
WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy();
request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) request));
BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), refreshPolicy, itemRequests);
WritePrimaryResult<BulkShardRequest, BulkShardResponse> bulkResult =
shardBulkAction.shardOperationOnPrimary(bulkShardRequest, primary);
assert bulkResult.finalResponseIfSuccessful.getResponses().length == 1 : "expected only one bulk shard response";
BulkItemResponse itemResponse = bulkResult.finalResponseIfSuccessful.getResponses()[0];
final Response response;
final Exception failure;
if (itemResponse.isFailed()) {
failure = itemResponse.getFailure().getCause();
response = null;
} else {
response = (Response) itemResponse.getResponse();
failure = null;
}
return new WritePrimaryResult<>(request, response, bulkResult.location, failure, primary, logger);
}
@Override
protected WriteReplicaResult<Request> shardOperationOnReplica(
Request replicaRequest, IndexShard replica) throws Exception {
BulkItemRequest[] itemRequests = new BulkItemRequest[1];
WriteRequest.RefreshPolicy refreshPolicy = replicaRequest.getRefreshPolicy();
itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) replicaRequest));
BulkShardRequest bulkShardRequest = new BulkShardRequest(replicaRequest.shardId(), refreshPolicy, itemRequests);
WriteReplicaResult<BulkShardRequest> result = shardBulkAction.shardOperationOnReplica(bulkShardRequest, replica);
// a replica operation can never throw a document-level failure,
// as the same document has been already indexed successfully in the primary
return new WriteReplicaResult<>(replicaRequest, result.location, null, replica, logger);
}
private ActionListener<BulkResponse> wrapBulkResponse(ActionListener<Response> listener) {
return ActionListener.wrap(bulkItemResponses -> {
assert bulkItemResponses.getItems().length == 1 : "expected only one item in bulk request";
BulkItemResponse bulkItemResponse = bulkItemResponses.getItems()[0];
if (bulkItemResponse.isFailed() == false) {
final DocWriteResponse response = bulkItemResponse.getResponse();
listener.onResponse((Response) response);
} else {
listener.onFailure(bulkItemResponse.getFailure().getCause());
}
}, listener::onFailure);
}
public static BulkRequest toSingleItemBulkRequest(ReplicatedWriteRequest request) {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(((DocWriteRequest) request));
bulkRequest.setRefreshPolicy(request.getRefreshPolicy());
bulkRequest.timeout(request.timeout());
bulkRequest.waitForActiveShards(request.waitForActiveShards());
request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
return bulkRequest;
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.delete;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.common.Nullable;
@ -43,7 +44,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
* @see org.elasticsearch.client.Client#delete(DeleteRequest)
* @see org.elasticsearch.client.Requests#deleteRequest(String)
*/
public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest> implements DocWriteRequest<DeleteRequest> {
public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest> implements DocWriteRequest<DeleteRequest>, CompositeIndicesRequest {
private String type;
private String id;

View File

@ -19,150 +19,39 @@
package org.elasticsearch.action.delete;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
/**
* Performs the delete operation.
*
* Deprecated use TransportBulkAction with a single item instead
*/
public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, DeleteRequest,DeleteResponse> {
private final AutoCreateIndex autoCreateIndex;
private final TransportCreateIndexAction createIndexAction;
@Deprecated
public class TransportDeleteAction extends TransportSingleItemBulkWriteAction<DeleteRequest, DeleteResponse> {
@Inject
public TransportDeleteAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
TransportCreateIndexAction createIndexAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex) {
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.INDEX);
this.createIndexAction = createIndexAction;
this.autoCreateIndex = autoCreateIndex;
}
@Override
protected void doExecute(Task task, final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
ClusterState state = clusterService.state();
if (autoCreateIndex.shouldAutoCreate(request.index(), state)) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest()
.index(request.index())
.cause("auto(delete api)")
.masterNodeTimeout(request.timeout());
createIndexAction.execute(task, createIndexRequest, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
innerExecute(task, request, listener);
}
@Override
public void onFailure(Exception e) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) {
// we have the index, do it
innerExecute(task, request, listener);
} else {
listener.onFailure(e);
}
}
});
} else {
innerExecute(task, request, listener);
}
}
@Override
protected void resolveRequest(final MetaData metaData, IndexMetaData indexMetaData, DeleteRequest request) {
super.resolveRequest(metaData, indexMetaData, request);
resolveAndValidateRouting(metaData, indexMetaData.getIndex().getName(), request);
ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(),
indexMetaData.getIndex().getName(), request.id(), request.routing());
request.setShardId(shardId);
}
public static void resolveAndValidateRouting(final MetaData metaData, final String concreteIndex,
DeleteRequest request) {
request.routing(metaData.resolveIndexRouting(request.parent(), request.routing(), request.index()));
// check if routing is required, if so, throw error if routing wasn't specified
if (request.routing() == null && metaData.routingRequired(concreteIndex, request.type())) {
throw new RoutingMissingException(concreteIndex, request.type(), request.id());
}
}
private void innerExecute(Task task, final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
super.doExecute(task, request, listener);
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.INDEX,
bulkAction, shardBulkAction);
}
@Override
protected DeleteResponse newResponseInstance() {
return new DeleteResponse();
}
@Override
protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, IndexShard primary) throws Exception {
final Engine.DeleteResult result = executeDeleteRequestOnPrimary(request, primary);
final DeleteResponse response;
final DeleteRequest replicaRequest;
if (result.hasFailure() == false) {
// update the request with the version so it will go to the replicas
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
request.version(result.getVersion());
request.setSeqNo(result.getSeqNo());
assert request.versionType().validateVersionForWrites(request.version());
replicaRequest = request;
response = new DeleteResponse(
primary.shardId(),
request.type(),
request.id(),
result.getSeqNo(),
result.getVersion(),
result.isFound());
} else {
response = null;
replicaRequest = null;
}
return new WritePrimaryResult(replicaRequest, response, result.getTranslogLocation(), result.getFailure(), primary);
}
@Override
protected WriteReplicaResult shardOperationOnReplica(DeleteRequest request, IndexShard replica) throws Exception {
final Engine.DeleteResult result = executeDeleteRequestOnReplica(request, replica);
return new WriteReplicaResult(request, result.getTranslogLocation(), result.getFailure(), replica);
}
public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) {
final Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
return primary.delete(delete);
}
public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) {
final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(),
request.getSeqNo(), request.primaryTerm(), request.version(), request.versionType());
return replica.delete(delete);
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.index;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
@ -66,7 +67,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
* @see org.elasticsearch.client.Requests#indexRequest(String)
* @see org.elasticsearch.client.Client#index(IndexRequest)
*/
public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implements DocWriteRequest<IndexRequest> {
public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implements DocWriteRequest<IndexRequest>, CompositeIndicesRequest {
private String type;
private String id;

View File

@ -19,39 +19,16 @@
package org.elasticsearch.action.index;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.ingest.IngestActionForwarder;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -64,205 +41,25 @@ import org.elasticsearch.transport.TransportService;
* Defaults to <tt>true</tt>.
* <li><b>allowIdGeneration</b>: If the id is set not, should it be generated. Defaults to <tt>true</tt>.
* </ul>
*
* Deprecated use TransportBulkAction with a single item instead
*/
public class TransportIndexAction extends TransportWriteAction<IndexRequest, IndexRequest, IndexResponse> {
private final AutoCreateIndex autoCreateIndex;
private final boolean allowIdGeneration;
private final TransportCreateIndexAction createIndexAction;
private final ClusterService clusterService;
private final IngestService ingestService;
private final MappingUpdatedAction mappingUpdatedAction;
private final IngestActionForwarder ingestForwarder;
@Deprecated
public class TransportIndexAction extends TransportSingleItemBulkWriteAction<IndexRequest, IndexResponse> {
@Inject
public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, IngestService ingestService, ThreadPool threadPool,
ShardStateAction shardStateAction, TransportCreateIndexAction createIndexAction,
MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, AutoCreateIndex autoCreateIndex) {
IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX);
this.mappingUpdatedAction = mappingUpdatedAction;
this.createIndexAction = createIndexAction;
this.autoCreateIndex = autoCreateIndex;
this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true);
this.clusterService = clusterService;
this.ingestService = ingestService;
this.ingestForwarder = new IngestActionForwarder(transportService);
clusterService.addStateApplier(this.ingestForwarder);
}
@Override
protected void doExecute(Task task, final IndexRequest request, final ActionListener<IndexResponse> listener) {
if (Strings.hasText(request.getPipeline())) {
if (clusterService.localNode().isIngestNode()) {
processIngestIndexRequest(task, request, listener);
} else {
ingestForwarder.forwardIngestRequest(IndexAction.INSTANCE, request, listener);
}
return;
}
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
ClusterState state = clusterService.state();
if (shouldAutoCreate(request, state)) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index(request.index());
createIndexRequest.cause("auto(index api)");
createIndexRequest.masterNodeTimeout(request.timeout());
createIndexAction.execute(task, createIndexRequest, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
innerExecute(task, request, listener);
}
@Override
public void onFailure(Exception e) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) {
// we have the index, do it
try {
innerExecute(task, request, listener);
} catch (Exception inner) {
inner.addSuppressed(e);
listener.onFailure(inner);
}
} else {
listener.onFailure(e);
}
}
});
} else {
innerExecute(task, request, listener);
}
}
protected boolean shouldAutoCreate(IndexRequest request, ClusterState state) {
return autoCreateIndex.shouldAutoCreate(request.index(), state);
}
@Override
protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, IndexRequest request) {
super.resolveRequest(metaData, indexMetaData, request);
MappingMetaData mappingMd =indexMetaData.mappingOrDefault(request.type());
request.resolveRouting(metaData);
request.process(mappingMd, allowIdGeneration, indexMetaData.getIndex().getName());
ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(),
indexMetaData.getIndex().getName(), request.id(), request.routing());
request.setShardId(shardId);
}
protected void innerExecute(Task task, final IndexRequest request, final ActionListener<IndexResponse> listener) {
super.doExecute(task, request, listener);
actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX,
bulkAction, shardBulkAction);
}
@Override
protected IndexResponse newResponseInstance() {
return new IndexResponse();
}
@Override
protected WritePrimaryResult shardOperationOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary, mappingUpdatedAction);
final IndexResponse response;
final IndexRequest replicaRequest;
if (indexResult.hasFailure() == false) {
// update the version on request so it will happen on the replicas
final long version = indexResult.getVersion();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
request.setSeqNo(indexResult.getSeqNo());
assert request.versionType().validateVersionForWrites(request.version());
replicaRequest = request;
response = new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getSeqNo(),
indexResult.getVersion(), indexResult.isCreated());
} else {
response = null;
replicaRequest = null;
}
return new WritePrimaryResult(replicaRequest, response, indexResult.getTranslogLocation(), indexResult.getFailure(), primary);
}
@Override
protected WriteReplicaResult shardOperationOnReplica(IndexRequest request, IndexShard replica) throws Exception {
final Engine.IndexResult indexResult = executeIndexRequestOnReplica(request, replica);
return new WriteReplicaResult(request, indexResult.getTranslogLocation(), indexResult.getFailure(), replica);
}
/**
* Execute the given {@link IndexRequest} on a replica shard, throwing a
* {@link RetryOnReplicaException} if the operation needs to be re-tried.
*/
public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) {
final ShardId shardId = replica.shardId();
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), request.type(), request.id(), request.source())
.routing(request.routing()).parent(request.parent());
final Engine.Index operation;
try {
operation = replica.prepareIndexOnReplica(sourceToParse, request.getSeqNo(), request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
} catch (MapperParsingException e) {
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
return replica.index(operation);
}
/** Utility method to prepare an index operation on primary shards */
static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) {
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), request.id(), request.source())
.routing(request.routing()).parent(request.parent());
return primary.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
}
public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
MappingUpdatedAction mappingUpdatedAction) throws Exception {
Engine.Index operation;
try {
operation = prepareIndexOperationOnPrimary(request, primary);
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final ShardId shardId = primary.shardId();
if (update != null) {
// can throw timeout exception when updating mappings or ISE for attempting to update default mappings
// which are bubbled up
try {
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
} catch (IllegalArgumentException e) {
// throws IAE on conflicts merging dynamic mappings
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
}
try {
operation = prepareIndexOperationOnPrimary(request, primary);
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
}
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new ReplicationOperation.RetryOnPrimaryException(shardId,
"Dynamic mappings are not available on the node that holds the primary yet");
}
}
return primary.index(operation);
}
private void processIngestIndexRequest(Task task, IndexRequest indexRequest, ActionListener listener) {
ingestService.getPipelineExecutionService().executeIndexRequest(indexRequest, t -> {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to execute pipeline [{}]", indexRequest.getPipeline()), t);
listener.onFailure(t);
}, success -> {
// TransportIndexAction uses IndexRequest and same action name on the node that receives the request and the node that
// processes the primary action. This could lead to a pipeline being executed twice for the same
// index request, hence we set the pipeline to null once its execution completed.
indexRequest.setPipeline(null);
doExecute(task, indexRequest, listener);
});
}
}

View File

@ -173,7 +173,8 @@ public abstract class TransportReplicationAction<
* @param shardRequest the request to the primary shard
* @param primary the primary shard to perform the operation on
*/
protected abstract PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) throws Exception;
protected abstract PrimaryResult<ReplicaRequest, Response> shardOperationOnPrimary(
Request shardRequest, IndexShard primary) throws Exception;
/**
* Synchronous replica operation on nodes with replica copies. This is done under the lock form
@ -364,8 +365,8 @@ public abstract class TransportReplicationAction<
};
}
protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult> createReplicatedOperation(
Request request, ActionListener<PrimaryResult> listener,
protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> createReplicatedOperation(
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
PrimaryShardReference primaryShardReference, boolean executeOnReplicas) {
return new ReplicationOperation<>(request, primaryShardReference, listener,
executeOnReplicas, replicasProxy, clusterService::state, logger, actionName
@ -373,10 +374,12 @@ public abstract class TransportReplicationAction<
}
}
protected class PrimaryResult implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
protected static class PrimaryResult<ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
Response extends ReplicationResponse>
implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
final ReplicaRequest replicaRequest;
final Response finalResponseIfSuccessful;
final Exception finalFailure;
public final Response finalResponseIfSuccessful;
public final Exception finalFailure;
/**
* Result of executing a primary operation
@ -416,7 +419,7 @@ public abstract class TransportReplicationAction<
}
}
protected class ReplicaResult {
protected static class ReplicaResult {
final Exception finalFailure;
public ReplicaResult(Exception finalFailure) {
@ -941,7 +944,8 @@ public abstract class TransportReplicationAction<
}
class PrimaryShardReference extends ShardReference implements ReplicationOperation.Primary<Request, ReplicaRequest, PrimaryResult> {
class PrimaryShardReference extends ShardReference
implements ReplicationOperation.Primary<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> {
PrimaryShardReference(IndexShard indexShard, Releasable operationLock) {
super(indexShard, operationLock);

View File

@ -21,6 +21,8 @@ package org.elasticsearch.action.support.replication;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
@ -68,7 +70,8 @@ public abstract class TransportWriteAction<
* async refresh is performed on the <code>primary</code> shard according to the <code>Request</code> refresh policy
*/
@Override
protected abstract WritePrimaryResult shardOperationOnPrimary(Request request, IndexShard primary) throws Exception;
protected abstract WritePrimaryResult<ReplicaRequest, Response> shardOperationOnPrimary(
Request request, IndexShard primary) throws Exception;
/**
* Called once per replica with a reference to the replica {@linkplain IndexShard} to modify.
@ -77,19 +80,24 @@ public abstract class TransportWriteAction<
* async refresh is performed on the <code>replica</code> shard according to the <code>ReplicaRequest</code> refresh policy
*/
@Override
protected abstract WriteReplicaResult shardOperationOnReplica(ReplicaRequest request, IndexShard replica) throws Exception;
protected abstract WriteReplicaResult<ReplicaRequest> shardOperationOnReplica(
ReplicaRequest request, IndexShard replica) throws Exception;
/**
* Result of taking the action on the primary.
*/
protected class WritePrimaryResult extends PrimaryResult implements RespondingWriteResult {
protected static class WritePrimaryResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>,
Response extends ReplicationResponse & WriteResponse> extends PrimaryResult<ReplicaRequest, Response>
implements RespondingWriteResult {
boolean finishedAsyncActions;
public final Location location;
ActionListener<Response> listener = null;
public WritePrimaryResult(ReplicaRequest request, @Nullable Response finalResponse,
@Nullable Location location, @Nullable Exception operationFailure,
IndexShard primary) {
IndexShard primary, Logger logger) {
super(request, finalResponse, operationFailure);
this.location = location;
assert location == null || operationFailure == null
: "expected either failure to be null or translog location to be null, " +
"but found: [" + location + "] translog location and [" + operationFailure + "] failure";
@ -139,13 +147,16 @@ public abstract class TransportWriteAction<
/**
* Result of taking the action on the replica.
*/
protected class WriteReplicaResult extends ReplicaResult implements RespondingWriteResult {
protected static class WriteReplicaResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>>
extends ReplicaResult implements RespondingWriteResult {
public final Location location;
boolean finishedAsyncActions;
private ActionListener<TransportResponse.Empty> listener;
public WriteReplicaResult(ReplicaRequest request, @Nullable Location location,
@Nullable Exception operationFailure, IndexShard replica) {
@Nullable Exception operationFailure, IndexShard replica, Logger logger) {
super(operationFailure);
this.location = location;
if (operationFailure != null) {
this.finishedAsyncActions = true;
} else {

View File

@ -169,7 +169,7 @@ public class ThreadPool extends AbstractComponent implements Closeable {
final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200));
builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 50));
builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete ops
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000));
builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));

View File

@ -201,7 +201,7 @@ public class IndicesRequestIT extends ESIntegTestCase {
}
public void testIndex() {
String[] indexShardActions = new String[]{IndexAction.NAME, IndexAction.NAME + "[p]", IndexAction.NAME + "[r]"};
String[] indexShardActions = new String[]{BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]"};
interceptTransportActions(indexShardActions);
IndexRequest indexRequest = new IndexRequest(randomIndexOrAlias(), "type", "id").source("field", "value");
@ -212,7 +212,7 @@ public class IndicesRequestIT extends ESIntegTestCase {
}
public void testDelete() {
String[] deleteShardActions = new String[]{DeleteAction.NAME, DeleteAction.NAME + "[p]", DeleteAction.NAME + "[r]"};
String[] deleteShardActions = new String[]{BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]"};
interceptTransportActions(deleteShardActions);
DeleteRequest deleteRequest = new DeleteRequest(randomIndexOrAlias(), "type", "id");
@ -224,7 +224,7 @@ public class IndicesRequestIT extends ESIntegTestCase {
public void testUpdate() {
//update action goes to the primary, index op gets executed locally, then replicated
String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[r]"};
String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]"};
interceptTransportActions(updateShardActions);
String indexOrAlias = randomIndexOrAlias();
@ -239,7 +239,7 @@ public class IndicesRequestIT extends ESIntegTestCase {
public void testUpdateUpsert() {
//update action goes to the primary, index op gets executed locally, then replicated
String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[r]"};
String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]"};
interceptTransportActions(updateShardActions);
String indexOrAlias = randomIndexOrAlias();
@ -253,7 +253,7 @@ public class IndicesRequestIT extends ESIntegTestCase {
public void testUpdateDelete() {
//update action goes to the primary, delete op gets executed locally, then replicated
String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", DeleteAction.NAME + "[r]"};
String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]"};
interceptTransportActions(updateShardActions);
String indexOrAlias = randomIndexOrAlias();

View File

@ -21,21 +21,27 @@ package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
@ -48,6 +54,7 @@ import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.sameInstance;
@ -84,6 +91,9 @@ public class TransportBulkActionIngestTests extends ESTestCase {
/** The actual action we want to test, with real indexing mocked */
TestTransportBulkAction action;
/** Single item bulk write action that wraps index requests */
TestSingleItemBulkWriteAction singleItemBulkWriteAction;
/** True if the next call to the index action should act as an ingest node */
boolean localIngest;
@ -110,6 +120,20 @@ public class TransportBulkActionIngestTests extends ESTestCase {
}
}
class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction<IndexRequest, IndexResponse> {
TestSingleItemBulkWriteAction(TestTransportBulkAction bulkAction) {
super(Settings.EMPTY, IndexAction.NAME, transportService, TransportBulkActionIngestTests.this.clusterService,
null, null, null, new ActionFilters(Collections.emptySet()), null,
IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX, bulkAction, null);
}
@Override
protected IndexResponse newResponseInstance() {
return new IndexResponse();
}
}
@Before
public void setupAction() {
// initialize captors, which must be members to use @Capture because of generics
@ -142,6 +166,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
executionService = mock(PipelineExecutionService.class);
when(ingestService.getPipelineExecutionService()).thenReturn(executionService);
action = new TestTransportBulkAction();
singleItemBulkWriteAction = new TestSingleItemBulkWriteAction(action);
reset(transportService); // call on construction of action
}
@ -157,6 +182,16 @@ public class TransportBulkActionIngestTests extends ESTestCase {
verifyZeroInteractions(ingestService);
}
public void testSingleItemBulkActionIngestSkipped() throws Exception {
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
indexRequest.source(Collections.emptyMap());
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(response -> {}, exception -> {
throw new AssertionError(exception);
}));
assertTrue(action.isExecuted);
verifyZeroInteractions(ingestService);
}
public void testIngestLocal() throws Exception {
Exception exception = new Exception("fake exception");
BulkRequest bulkRequest = new BulkRequest();
@ -200,6 +235,38 @@ public class TransportBulkActionIngestTests extends ESTestCase {
verifyZeroInteractions(transportService);
}
public void testSingleItemBulkActionIngestLocal() throws Exception {
Exception exception = new Exception("fake exception");
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
indexRequest.source(Collections.emptyMap());
indexRequest.setPipeline("testpipeline");
AtomicBoolean responseCalled = new AtomicBoolean(false);
AtomicBoolean failureCalled = new AtomicBoolean(false);
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
response -> {
responseCalled.set(true);
},
e -> {
assertThat(e, sameInstance(exception));
failureCalled.set(true);
}));
// check failure works, and passes through to the listener
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture());
completionHandler.getValue().accept(exception);
assertTrue(failureCalled.get());
// now check success
indexRequest.setPipeline(null); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyZeroInteractions(transportService);
}
public void testIngestForward() throws Exception {
localIngest = false;
BulkRequest bulkRequest = new BulkRequest();
@ -246,5 +313,51 @@ public class TransportBulkActionIngestTests extends ESTestCase {
}
}
public void testSingleItemBulkActionIngestForward() throws Exception {
localIngest = false;
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
indexRequest.source(Collections.emptyMap());
indexRequest.setPipeline("testpipeline");
IndexResponse indexResponse = mock(IndexResponse.class);
AtomicBoolean responseCalled = new AtomicBoolean(false);
ActionListener<IndexResponse> listener = ActionListener.wrap(
response -> {
responseCalled.set(true);
assertSame(indexResponse, response);
},
e -> {
throw new AssertionError(e);
});
singleItemBulkWriteAction.execute(null, indexRequest, listener);
// should not have executed ingest locally
verify(executionService, never()).executeBulkRequest(any(), any(), any());
// but instead should have sent to a remote node with the transport service
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
boolean usedNode1 = node.getValue() == remoteNode1; // make sure we used one of the nodes
if (usedNode1 == false) {
assertSame(remoteNode2, node.getValue());
}
assertFalse(action.isExecuted); // no local index execution
assertFalse(responseCalled.get()); // listener not called yet
BulkItemResponse itemResponse = new BulkItemResponse(0, DocWriteRequest.OpType.CREATE, indexResponse);
BulkItemResponse[] bulkItemResponses = new BulkItemResponse[1];
bulkItemResponses[0] = itemResponse;
remoteResponseHandler.getValue().handleResponse(new BulkResponse(bulkItemResponses, 0)); // call the listener for the remote node
assertTrue(responseCalled.get()); // now the listener we passed should have been delegated to by the remote listener
assertFalse(action.isExecuted); // still no local index execution
// now make sure ingest nodes are rotated through with a subsequent request
reset(transportService);
singleItemBulkWriteAction.execute(null, indexRequest, listener);
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
if (usedNode1) {
assertSame(remoteNode2, node.getValue());
} else {
assertSame(remoteNode1, node.getValue());
}
}
}

View File

@ -117,11 +117,6 @@ public class TransportBulkActionTookTests extends ESTestCase {
resolver,
null,
expected::get) {
@Override
public void executeBulk(BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
expected.set(1000000);
super.executeBulk(bulkRequest, listener);
}
@Override
void executeBulk(
@ -146,12 +141,6 @@ public class TransportBulkActionTookTests extends ESTestCase {
resolver,
null,
System::nanoTime) {
@Override
public void executeBulk(BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
long elapsed = spinForAtLeastOneMillisecond();
expected.set(elapsed);
super.executeBulk(bulkRequest, listener);
}
@Override
void executeBulk(

View File

@ -1,224 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.index;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.MockitoAnnotations;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class TransportIndexActionIngestTests extends ESTestCase {
/** Services needed by index action */
TransportService transportService;
ClusterService clusterService;
IngestService ingestService;
/** The ingest execution service we can capture calls to */
PipelineExecutionService executionService;
/** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */
@Captor
ArgumentCaptor<Consumer<Exception>> exceptionHandler;
@Captor
ArgumentCaptor<Consumer<Boolean>> successHandler;
@Captor
ArgumentCaptor<TransportResponseHandler<IndexResponse>> remoteResponseHandler;
/** The actual action we want to test, with real indexing mocked */
TestTransportIndexAction action;
/** True if the next call to the index action should act as an ingest node */
boolean localIngest;
/** The nodes that forwarded index requests should be cycled through. */
DiscoveryNodes nodes;
DiscoveryNode remoteNode1;
DiscoveryNode remoteNode2;
/** A subclass of the real index action to allow skipping real indexing, and marking when it would have happened. */
class TestTransportIndexAction extends TransportIndexAction {
boolean isExecuted = false; // set when the "real" index execution happens
TestTransportIndexAction() {
super(Settings.EMPTY, transportService, clusterService, null, ingestService, null, null, null, null,
new ActionFilters(Collections.emptySet()), null, null);
}
@Override
protected boolean shouldAutoCreate(IndexRequest reqest, ClusterState state) {
return false;
}
@Override
protected void innerExecute(Task task, final IndexRequest request, final ActionListener<IndexResponse> listener) {
isExecuted = true;
}
}
@Before
public void setupAction() {
// initialize captors, which must be members to use @Capture because of generics
MockitoAnnotations.initMocks(this);
// setup services that will be called by action
transportService = mock(TransportService.class);
clusterService = mock(ClusterService.class);
localIngest = true;
// setup nodes for local and remote
DiscoveryNode localNode = mock(DiscoveryNode.class);
when(localNode.isIngestNode()).thenAnswer(stub -> localIngest);
when(clusterService.localNode()).thenReturn(localNode);
remoteNode1 = mock(DiscoveryNode.class);
remoteNode2 = mock(DiscoveryNode.class);
nodes = mock(DiscoveryNodes.class);
ImmutableOpenMap<String, DiscoveryNode> ingestNodes = ImmutableOpenMap.<String, DiscoveryNode>builder(2)
.fPut("node1", remoteNode1).fPut("node2", remoteNode2).build();
when(nodes.getIngestNodes()).thenReturn(ingestNodes);
ClusterState state = mock(ClusterState.class);
when(state.getNodes()).thenReturn(nodes);
when(clusterService.state()).thenReturn(state);
doAnswer(invocation -> {
ClusterChangedEvent event = mock(ClusterChangedEvent.class);
when(event.state()).thenReturn(state);
((ClusterStateApplier)invocation.getArguments()[0]).applyClusterState(event);
return null;
}).when(clusterService).addStateApplier(any(ClusterStateApplier.class));
// setup the mocked ingest service for capturing calls
ingestService = mock(IngestService.class);
executionService = mock(PipelineExecutionService.class);
when(ingestService.getPipelineExecutionService()).thenReturn(executionService);
action = new TestTransportIndexAction();
reset(transportService); // call on construction of action
}
public void testIngestSkipped() throws Exception {
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
indexRequest.source(Collections.emptyMap());
action.execute(null, indexRequest, ActionListener.wrap(response -> {}, exception -> {
throw new AssertionError(exception);
}));
assertTrue(action.isExecuted);
verifyZeroInteractions(ingestService);
}
public void testIngestLocal() throws Exception {
Exception exception = new Exception("fake exception");
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
indexRequest.source(Collections.emptyMap());
indexRequest.setPipeline("testpipeline");
AtomicBoolean responseCalled = new AtomicBoolean(false);
AtomicBoolean failureCalled = new AtomicBoolean(false);
action.execute(null, indexRequest, ActionListener.wrap(
response -> {
responseCalled.set(true);
},
e -> {
assertThat(e, sameInstance(exception));
failureCalled.set(true);
}));
// check failure works, and passes through to the listener
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(executionService).executeIndexRequest(same(indexRequest), exceptionHandler.capture(), successHandler.capture());
exceptionHandler.getValue().accept(exception);
assertTrue(failureCalled.get());
// now check success
successHandler.getValue().accept(true);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyZeroInteractions(transportService);
}
public void testIngestForward() throws Exception {
localIngest = false;
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
indexRequest.source(Collections.emptyMap());
indexRequest.setPipeline("testpipeline");
IndexResponse indexResponse = mock(IndexResponse.class);
AtomicBoolean responseCalled = new AtomicBoolean(false);
ActionListener<IndexResponse> listener = ActionListener.wrap(
response -> {
responseCalled.set(true);
assertSame(indexResponse, response);
},
e -> {
throw new AssertionError(e);
});
action.execute(null, indexRequest, listener);
// should not have executed ingest locally
verify(executionService, never()).executeIndexRequest(any(), any(), any());
// but instead should have sent to a remote node with the transport service
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
verify(transportService).sendRequest(node.capture(), eq(IndexAction.NAME), any(), remoteResponseHandler.capture());
boolean usedNode1 = node.getValue() == remoteNode1; // make sure we used one of the nodes
if (usedNode1 == false) {
assertSame(remoteNode2, node.getValue());
}
assertFalse(action.isExecuted); // no local index execution
assertFalse(responseCalled.get()); // listener not called yet
remoteResponseHandler.getValue().handleResponse(indexResponse); // call the listener for the remote node
assertTrue(responseCalled.get()); // now the listener we passed should have been delegated to by the remote listener
assertFalse(action.isExecuted); // still no local index execution
// now make sure ingest nodes are rotated through with a subsequent request
reset(transportService);
action.execute(null, indexRequest, listener);
verify(transportService).sendRequest(node.capture(), eq(IndexAction.NAME), any(), remoteResponseHandler.capture());
if (usedNode1) {
assertSame(remoteNode2, node.getValue());
} else {
assertSame(remoteNode1, node.getValue());
}
}
}

View File

@ -52,7 +52,7 @@ public class WaitActiveShardCountIT extends ESIntegTestCase {
fail("can't index, does not enough active shard copies");
} catch (UnavailableShardsException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [2] (have 1, needed 2). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]"));
assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [2] (have 1, needed 2). Timeout: [100ms], request: [BulkShardRequest to [test] containing [1] requests]"));
// but really, all is well
}
@ -81,7 +81,7 @@ public class WaitActiveShardCountIT extends ESIntegTestCase {
fail("can't index, not enough active shard copies");
} catch (UnavailableShardsException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [" + ActiveShardCount.ALL + "] (have 2, needed 3). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]"));
assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [" + ActiveShardCount.ALL + "] (have 2, needed 3). Timeout: [100ms], request: [BulkShardRequest to [test] containing [1] requests]"));
// but really, all is well
}

View File

@ -492,8 +492,10 @@ public class TransportReplicationActionTests extends ESTestCase {
}
action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) {
@Override
protected ReplicationOperation<Request, Request, Action.PrimaryResult> createReplicatedOperation(Request request,
ActionListener<Action.PrimaryResult> actionListener, Action.PrimaryShardReference primaryShardReference,
protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, Response>>
createReplicatedOperation(Request request,
ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener,
TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference,
boolean executeOnReplicas) {
return new NoopReplicationOperation(request, actionListener) {
public void execute() throws Exception {
@ -542,8 +544,10 @@ public class TransportReplicationActionTests extends ESTestCase {
AtomicBoolean executed = new AtomicBoolean();
action.new AsyncPrimaryAction(request, primaryShard.allocationId().getRelocationId(), createTransportChannel(listener), task) {
@Override
protected ReplicationOperation<Request, Request, Action.PrimaryResult> createReplicatedOperation(Request request,
ActionListener<Action.PrimaryResult> actionListener, Action.PrimaryShardReference primaryShardReference,
protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, Response>>
createReplicatedOperation(Request request,
ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener,
TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference,
boolean executeOnReplicas) {
return new NoopReplicationOperation(request, actionListener) {
public void execute() throws Exception {
@ -577,7 +581,7 @@ public class TransportReplicationActionTests extends ESTestCase {
};
Action.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable);
final Request request = new Request();
Request replicaRequest = primary.perform(request).replicaRequest;
Request replicaRequest = (Request) primary.perform(request).replicaRequest;
assertThat(replicaRequest.primaryTerm(), equalTo(primaryTerm));
@ -687,13 +691,15 @@ public class TransportReplicationActionTests extends ESTestCase {
action.new AsyncPrimaryAction(new Request(shardId), primaryShard.allocationId().getId(),
createTransportChannel(new PlainActionFuture<>()), null) {
@Override
protected ReplicationOperation<Request, Request, Action.PrimaryResult> createReplicatedOperation(Request request,
ActionListener<Action.PrimaryResult> actionListener, Action.PrimaryShardReference primaryShardReference,
protected ReplicationOperation<Request, Request, Action.PrimaryResult<Request, Response>> createReplicatedOperation(
Request request, ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener,
TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference,
boolean executeOnReplicas) {
assertFalse(executeOnReplicas);
assertFalse(executed.getAndSet(true));
return new NoopReplicationOperation(request, actionListener);
}
}.run();
assertThat(executed.get(), equalTo(true));
}
@ -753,8 +759,10 @@ public class TransportReplicationActionTests extends ESTestCase {
final boolean respondWithError = i == 3;
action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) {
@Override
protected ReplicationOperation<Request, Request, Action.PrimaryResult> createReplicatedOperation(Request request,
ActionListener<Action.PrimaryResult> actionListener, Action.PrimaryShardReference primaryShardReference,
protected ReplicationOperation<Request, Request, TransportReplicationAction.PrimaryResult<Request, Response>>
createReplicatedOperation(Request request,
ActionListener<TransportReplicationAction.PrimaryResult<Request, Response>> actionListener,
TransportReplicationAction<Request, Request, Response>.PrimaryShardReference primaryShardReference,
boolean executeOnReplicas) {
assertIndexShardCounter(1);
if (throwExceptionOnCreation) {
@ -1148,14 +1156,14 @@ public class TransportReplicationActionTests extends ESTestCase {
return indexShard;
}
class NoopReplicationOperation extends ReplicationOperation<Request, Request, Action.PrimaryResult> {
public NoopReplicationOperation(Request request, ActionListener<Action.PrimaryResult> listener) {
class NoopReplicationOperation extends ReplicationOperation<Request, Request, Action.PrimaryResult<Request, Response>> {
public NoopReplicationOperation(Request request, ActionListener<Action.PrimaryResult<Request, Response>> listener) {
super(request, null, listener, true, null, null, TransportReplicationActionTests.this.logger, "noop");
}
@Override
public void execute() throws Exception {
this.resultListener.onResponse(action.new PrimaryResult(null, new Response()));
this.resultListener.onResponse(new TransportReplicationAction.PrimaryResult<>(null, new Response()));
}
}

View File

@ -36,7 +36,6 @@ import org.junit.Before;
import org.mockito.ArgumentCaptor;
import java.util.HashSet;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static org.mockito.Matchers.any;
@ -55,21 +54,27 @@ public class TransportWriteActionTests extends ESTestCase {
}
public void testPrimaryNoRefreshCall() throws Exception {
noRefreshCall(TestAction::shardOperationOnPrimary, TestAction.WritePrimaryResult::respond);
TestRequest request = new TestRequest();
request.setRefreshPolicy(RefreshPolicy.NONE); // The default, but we'll set it anyway just to be explicit
TestAction testAction = new TestAction();
TransportWriteAction.WritePrimaryResult<TestRequest, TestResponse> result =
testAction.shardOperationOnPrimary(request, indexShard);
CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
result.respond(listener);
assertNotNull(listener.response);
assertNull(listener.failure);
verify(indexShard, never()).refresh(any());
verify(indexShard, never()).addRefreshListener(any(), any());
}
public void testReplicaNoRefreshCall() throws Exception {
noRefreshCall(TestAction::shardOperationOnReplica, TestAction.WriteReplicaResult::respond);
}
private <Result, Response> void noRefreshCall(ThrowingTriFunction<TestAction, TestRequest, IndexShard, Result> action,
BiConsumer<Result, CapturingActionListener<Response>> responder)
throws Exception {
TestRequest request = new TestRequest();
request.setRefreshPolicy(RefreshPolicy.NONE); // The default, but we'll set it anyway just to be explicit
Result result = action.apply(new TestAction(), request, indexShard);
CapturingActionListener<Response> listener = new CapturingActionListener<>();
responder.accept(result, listener);
TestAction testAction = new TestAction();
TransportWriteAction.WriteReplicaResult<TestRequest> result =
testAction.shardOperationOnReplica(request, indexShard);
CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
result.respond(listener);
assertNotNull(listener.response);
assertNull(listener.failure);
verify(indexShard, never()).refresh(any());
@ -77,46 +82,44 @@ public class TransportWriteActionTests extends ESTestCase {
}
public void testPrimaryImmediateRefresh() throws Exception {
immediateRefresh(TestAction::shardOperationOnPrimary, TestAction.WritePrimaryResult::respond, r -> assertTrue(r.forcedRefresh));
TestRequest request = new TestRequest();
request.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
TestAction testAction = new TestAction();
TransportWriteAction.WritePrimaryResult<TestRequest, TestResponse> result =
testAction.shardOperationOnPrimary(request, indexShard);
CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
result.respond(listener);
assertNotNull(listener.response);
assertNull(listener.failure);
assertTrue(listener.response.forcedRefresh);
verify(indexShard).refresh("refresh_flag_index");
verify(indexShard, never()).addRefreshListener(any(), any());
}
public void testReplicaImmediateRefresh() throws Exception {
immediateRefresh(TestAction::shardOperationOnReplica, TestAction.WriteReplicaResult::respond, r -> {});
}
private <Result, Response> void immediateRefresh(ThrowingTriFunction<TestAction, TestRequest, IndexShard, Result> action,
BiConsumer<Result, CapturingActionListener<Response>> responder,
Consumer<Response> responseChecker) throws Exception {
TestRequest request = new TestRequest();
request.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
Result result = action.apply(new TestAction(), request, indexShard);
CapturingActionListener<Response> listener = new CapturingActionListener<>();
responder.accept(result, listener);
TestAction testAction = new TestAction();
TransportWriteAction.WriteReplicaResult<TestRequest> result =
testAction.shardOperationOnReplica(request, indexShard);
CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
result.respond(listener);
assertNotNull(listener.response);
assertNull(listener.failure);
responseChecker.accept(listener.response);
verify(indexShard).refresh("refresh_flag_index");
verify(indexShard, never()).addRefreshListener(any(), any());
}
public void testPrimaryWaitForRefresh() throws Exception {
waitForRefresh(TestAction::shardOperationOnPrimary, TestAction.WritePrimaryResult::respond,
(r, forcedRefresh) -> assertEquals(forcedRefresh, r.forcedRefresh));
}
public void testReplicaWaitForRefresh() throws Exception {
waitForRefresh(TestAction::shardOperationOnReplica, TestAction.WriteReplicaResult::respond, (r, forcedRefresh) -> {});
}
private <Result, Response> void waitForRefresh(ThrowingTriFunction<TestAction, TestRequest, IndexShard, Result> action,
BiConsumer<Result, CapturingActionListener<Response>> responder,
BiConsumer<Response, Boolean> resultChecker) throws Exception {
TestRequest request = new TestRequest();
request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
Result result = action.apply(new TestAction(), request, indexShard);
CapturingActionListener<Response> listener = new CapturingActionListener<>();
responder.accept(result, listener);
assertNull(listener.response); // Haven't responded yet
TestAction testAction = new TestAction();
TransportWriteAction.WritePrimaryResult<TestRequest, TestResponse> result =
testAction.shardOperationOnPrimary(request, indexShard);
CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
result.respond(listener);
assertNull(listener.response); // Haven't reallresponded yet
@SuppressWarnings({ "unchecked", "rawtypes" })
ArgumentCaptor<Consumer<Boolean>> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class);
@ -128,13 +131,33 @@ public class TransportWriteActionTests extends ESTestCase {
refreshListener.getValue().accept(forcedRefresh);
assertNotNull(listener.response);
assertNull(listener.failure);
resultChecker.accept(listener.response, forcedRefresh);
assertEquals(forcedRefresh, listener.response.forcedRefresh);
}
public void testReplicaWaitForRefresh() throws Exception {
TestRequest request = new TestRequest();
request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
TestAction testAction = new TestAction();
TransportWriteAction.WriteReplicaResult<TestRequest> result = testAction.shardOperationOnReplica(request, indexShard);
CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
result.respond(listener);
assertNull(listener.response); // Haven't responded yet
@SuppressWarnings({ "unchecked", "rawtypes" })
ArgumentCaptor<Consumer<Boolean>> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class);
verify(indexShard, never()).refresh(any());
verify(indexShard).addRefreshListener(any(), refreshListener.capture());
// Now we can fire the listener manually and we'll get a response
boolean forcedRefresh = randomBoolean();
refreshListener.getValue().accept(forcedRefresh);
assertNotNull(listener.response);
assertNull(listener.failure);
}
public void testDocumentFailureInShardOperationOnPrimary() throws Exception {
TestRequest request = new TestRequest();
TestAction testAction = new TestAction(true, true);
TransportWriteAction<TestRequest, TestRequest, TestResponse>.WritePrimaryResult writePrimaryResult =
TransportWriteAction.WritePrimaryResult<TestRequest, TestResponse> writePrimaryResult =
testAction.shardOperationOnPrimary(request, indexShard);
CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
writePrimaryResult.respond(listener);
@ -145,7 +168,7 @@ public class TransportWriteActionTests extends ESTestCase {
public void testDocumentFailureInShardOperationOnReplica() throws Exception {
TestRequest request = new TestRequest();
TestAction testAction = new TestAction(randomBoolean(), true);
TransportWriteAction<TestRequest, TestRequest, TestResponse>.WriteReplicaResult writeReplicaResult =
TransportWriteAction.WriteReplicaResult<TestRequest> writeReplicaResult =
testAction.shardOperationOnReplica(request, indexShard);
CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
writeReplicaResult.respond(listener);
@ -176,23 +199,24 @@ public class TransportWriteActionTests extends ESTestCase {
}
@Override
protected WritePrimaryResult shardOperationOnPrimary(TestRequest request, IndexShard primary) throws Exception {
final WritePrimaryResult primaryResult;
protected WritePrimaryResult<TestRequest, TestResponse> shardOperationOnPrimary(
TestRequest request, IndexShard primary) throws Exception {
final WritePrimaryResult<TestRequest, TestResponse> primaryResult;
if (withDocumentFailureOnPrimary) {
primaryResult = new WritePrimaryResult(request, null, null, new RuntimeException("simulated"), primary);
primaryResult = new WritePrimaryResult<>(request, null, null, new RuntimeException("simulated"), primary, logger);
} else {
primaryResult = new WritePrimaryResult(request, new TestResponse(), location, null, primary);
primaryResult = new WritePrimaryResult<>(request, new TestResponse(), location, null, primary, logger);
}
return primaryResult;
}
@Override
protected WriteReplicaResult shardOperationOnReplica(TestRequest request, IndexShard replica) throws Exception {
final WriteReplicaResult replicaResult;
protected WriteReplicaResult<TestRequest> shardOperationOnReplica(TestRequest request, IndexShard replica) throws Exception {
final WriteReplicaResult<TestRequest> replicaResult;
if (withDocumentFailureOnReplica) {
replicaResult = new WriteReplicaResult(request, null, new RuntimeException("simulated"), replica);
replicaResult = new WriteReplicaResult<>(request, null, new RuntimeException("simulated"), replica, logger);
} else {
replicaResult = new WriteReplicaResult(request, location, null, replica);
replicaResult = new WriteReplicaResult<>(request, location, null, replica, logger);
}
return replicaResult;
}
@ -232,8 +256,4 @@ public class TransportWriteActionTests extends ESTestCase {
this.failure = failure;
}
}
private interface ThrowingTriFunction<A, B, C, R> {
R apply(A a, B b, C c) throws Exception;
}
}

View File

@ -121,14 +121,12 @@ public class NoMasterNodeIT extends ESIntegTestCase {
ClusterBlockException.class, RestStatus.SERVICE_UNAVAILABLE
);
checkWriteAction(
false, timeout,
checkUpdateAction(false, timeout,
client().prepareUpdate("test", "type1", "1")
.setScript(new Script(
ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, "test script", Collections.emptyMap())).setTimeout(timeout));
checkWriteAction(
autoCreateIndex, timeout,
checkUpdateAction(autoCreateIndex, timeout,
client().prepareUpdate("no_index", "type1", "1")
.setScript(new Script(
ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, "test script", Collections.emptyMap())).setTimeout(timeout));
@ -143,18 +141,29 @@ public class NoMasterNodeIT extends ESIntegTestCase {
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
bulkRequestBuilder.add(client().prepareIndex("test", "type1", "1").setSource(XContentFactory.jsonBuilder().startObject().endObject()));
bulkRequestBuilder.add(client().prepareIndex("test", "type1", "2").setSource(XContentFactory.jsonBuilder().startObject().endObject()));
checkBulkAction(false, bulkRequestBuilder);
// the request should fail very quickly - use a large timeout and make sure it didn't pass...
timeout = new TimeValue(5000);
bulkRequestBuilder.setTimeout(timeout);
checkWriteAction(false, timeout, bulkRequestBuilder);
bulkRequestBuilder = client().prepareBulk();
bulkRequestBuilder.add(client().prepareIndex("no_index", "type1", "1").setSource(XContentFactory.jsonBuilder().startObject().endObject()));
bulkRequestBuilder.add(client().prepareIndex("no_index", "type1", "2").setSource(XContentFactory.jsonBuilder().startObject().endObject()));
checkBulkAction(autoCreateIndex, bulkRequestBuilder);
if (autoCreateIndex) {
// we expect the bulk to fail because it will try to go to the master. Use small timeout and detect it has passed
timeout = new TimeValue(200);
} else {
// the request should fail very quickly - use a large timeout and make sure it didn't pass...
timeout = new TimeValue(5000);
}
bulkRequestBuilder.setTimeout(timeout);
checkWriteAction(autoCreateIndex, timeout, bulkRequestBuilder);
internalCluster().startNode(settings);
client().admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
}
void checkWriteAction(boolean autoCreateIndex, TimeValue timeout, ActionRequestBuilder<?, ?, ?> builder) {
void checkUpdateAction(boolean autoCreateIndex, TimeValue timeout, ActionRequestBuilder<?, ?, ?> builder) {
// we clean the metadata when loosing a master, therefore all operations on indices will auto create it, if allowed
long now = System.currentTimeMillis();
try {
@ -172,18 +181,7 @@ public class NoMasterNodeIT extends ESIntegTestCase {
}
}
void checkBulkAction(boolean indexShouldBeAutoCreated, BulkRequestBuilder builder) {
// bulk operation do not throw MasterNotDiscoveredException exceptions. The only test that auto create kicked in and failed is
// via the timeout, as bulk operation do not wait on blocks.
TimeValue timeout;
if (indexShouldBeAutoCreated) {
// we expect the bulk to fail because it will try to go to the master. Use small timeout and detect it has passed
timeout = new TimeValue(200);
} else {
// the request should fail very quickly - use a large timeout and make sure it didn't pass...
timeout = new TimeValue(5000);
}
builder.setTimeout(timeout);
void checkWriteAction(boolean indexShouldBeAutoCreated, TimeValue timeout, ActionRequestBuilder<?, ?, ?> builder) {
long now = System.currentTimeMillis();
try {
builder.get();
@ -195,7 +193,7 @@ public class NoMasterNodeIT extends ESIntegTestCase {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
} else {
// timeout is 5000
assertThat(System.currentTimeMillis() - now, lessThan(timeout.millis() - 50));
assertThat(System.currentTimeMillis() - now, lessThan(timeout.millis() + 50));
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.DocWriteResponse;
@ -90,6 +91,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
/**
* Tests for indices that use shadow replicas and a shared filesystem
*/
@LuceneTestCase.AwaitsFix(bugUrl = "fix this fails intermittently")
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class IndexWithShadowReplicasIT extends ESIntegTestCase {
@ -457,6 +459,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
assertHitCount(resp, numPhase1Docs + numPhase2Docs);
}
@AwaitsFix(bugUrl = "uncaught exception")
public void testPrimaryRelocationWhereRecoveryFails() throws Exception {
Path dataPath = createTempDir();
Settings nodeSettings = Settings.builder()

View File

@ -20,11 +20,15 @@
package org.elasticsearch.index.mapper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
@ -57,14 +61,8 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase {
private static ThreadPool THREAD_POOL;
private ClusterService clusterService;
private Transport transport;
private TransportService transportService;
private IndicesService indicesService;
private ShardStateAction shardStateAction;
private ActionFilters actionFilters;
private IndexNameExpressionResolver indexNameExpressionResolver;
private AutoCreateIndex autoCreateIndex;
private Settings settings;
private TransportBulkAction transportBulkAction;
@BeforeClass
public static void createThreadPool() {
@ -74,21 +72,26 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase {
@Override
public void setUp() throws Exception {
super.setUp();
settings = Settings.builder()
Settings settings = Settings.builder()
.put(MapperService.INDEX_MAPPER_DYNAMIC_SETTING.getKey(), false)
.build();
clusterService = createClusterService(THREAD_POOL);
transport = new MockTcpTransport(settings, THREAD_POOL, BigArrays.NON_RECYCLING_INSTANCE,
Transport transport = new MockTcpTransport(settings, THREAD_POOL, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), new NamedWriteableRegistry(Collections.emptyList()),
new NetworkService(settings, Collections.emptyList()));
transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
indicesService = getInstanceFromNode(IndicesService.class);
shardStateAction = new ShardStateAction(settings, clusterService, transportService, null, null, THREAD_POOL);
actionFilters = new ActionFilters(Collections.emptySet());
indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
autoCreateIndex = new AutoCreateIndex(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
indexNameExpressionResolver);
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
ShardStateAction shardStateAction = new ShardStateAction(settings, clusterService, transportService, null, null, THREAD_POOL);
ActionFilters actionFilters = new ActionFilters(Collections.emptySet());
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
AutoCreateIndex autoCreateIndex = new AutoCreateIndex(settings, new ClusterSettings(settings,
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), indexNameExpressionResolver);
UpdateHelper updateHelper = new UpdateHelper(settings, null);
TransportShardBulkAction shardBulkAction = new TransportShardBulkAction(settings, transportService, clusterService,
indicesService, THREAD_POOL, shardStateAction, null, updateHelper, actionFilters, indexNameExpressionResolver);
transportBulkAction = new TransportBulkAction(settings, THREAD_POOL, transportService, clusterService,
null, shardBulkAction, null, actionFilters, indexNameExpressionResolver, autoCreateIndex, System::currentTimeMillis);
}
@After
@ -107,25 +110,26 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase {
}
public void testDynamicDisabled() {
TransportIndexAction action = new TransportIndexAction(settings, transportService, clusterService,
indicesService, null, THREAD_POOL, shardStateAction, null, null, actionFilters, indexNameExpressionResolver,
autoCreateIndex);
IndexRequest request = new IndexRequest("index", "type", "1");
request.source("foo", 3);
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(request);
final AtomicBoolean onFailureCalled = new AtomicBoolean();
action.execute(request, new ActionListener<IndexResponse>() {
transportBulkAction.execute(bulkRequest, new ActionListener<BulkResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
fail("Indexing request should have failed");
public void onResponse(BulkResponse bulkResponse) {
BulkItemResponse itemResponse = bulkResponse.getItems()[0];
assertTrue(itemResponse.isFailed());
assertThat(itemResponse.getFailure().getCause(), instanceOf(IndexNotFoundException.class));
assertEquals(itemResponse.getFailure().getCause().getMessage(), "no such index");
onFailureCalled.set(true);
}
@Override
public void onFailure(Exception e) {
onFailureCalled.set(true);
assertThat(e, instanceOf(IndexNotFoundException.class));
assertEquals(e.getMessage(), "no such index");
fail("unexpected failure in bulk action, expected failed bulk item");
}
});

View File

@ -65,8 +65,8 @@ import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary;
import static org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnReplica;
import static org.elasticsearch.action.bulk.TransportShardBulkAction.executeIndexRequestOnPrimary;
import static org.elasticsearch.action.bulk.TransportShardBulkAction.executeIndexRequestOnReplica;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;