Refactor TransportShardBulkAction and add unit tests

This refactors the `TransportShardBulkAction` to split it appart and make it
unit-testable, and then it also adds unit tests that use these methods.

In particular, this makes `executeBulkItemRequest` shorter and more readable
This commit is contained in:
Lee Hinman 2017-02-21 15:54:55 -07:00
parent 7ce06aeb8c
commit fd991f32f9
6 changed files with 764 additions and 107 deletions

View File

@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.engine.Engine;
/**
* A struct-like holder for a bulk items reponse, result, and the resulting
* replica operation to be executed.
*/
class BulkItemResultHolder {
public final @Nullable DocWriteResponse response;
public final @Nullable Engine.Result operationResult;
public final BulkItemRequest replicaRequest;
BulkItemResultHolder(@Nullable DocWriteResponse response,
@Nullable Engine.Result operationResult,
BulkItemRequest replicaRequest) {
this.response = response;
this.operationResult = operationResult;
this.replicaRequest = replicaRequest;
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import java.util.Objects;
public interface MappingUpdatePerformer {
/**
* Determine if any mappings need to be updated, and update them on the
* master node if necessary. Returnes a failed {@code Engine.IndexResult}
* in the event updating the mappings fails or null if successful.
* Throws a {@code ReplicationOperation.RetryOnPrimaryException} if the
* operation needs to be retried on the primary due to the mappings not
* being present yet, or a different exception if updating the mappings
* on the master failed.
*/
@Nullable
MappingUpdateResult updateMappingsIfNeeded(IndexShard primary, IndexRequest request) throws Exception;
/**
* Class encapsulating the resulting of potentially updating the mapping
*/
class MappingUpdateResult {
@Nullable
public final Engine.Index operation;
@Nullable
public final Exception failure;
MappingUpdateResult(Exception failure) {
Objects.requireNonNull(failure, "failure cannot be null");
this.failure = failure;
this.operation = null;
}
MappingUpdateResult(Engine.Index operation) {
Objects.requireNonNull(operation, "operation cannot be null");
this.operation = operation;
this.failure = null;
}
public boolean isFailed() {
return failure != null;
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.bulk;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ExceptionsHelper;
@ -42,9 +43,11 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
@ -65,12 +68,16 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.function.LongSupplier;
/** Performs shard-level bulk (index, delete or update) operations */
public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
public static final String ACTION_NAME = BulkAction.NAME + "[s]";
private static final Logger logger = ESLoggerFactory.getLogger(TransportShardBulkAction.class);
private final UpdateHelper updateHelper;
private final MappingUpdatedAction mappingUpdatedAction;
@ -105,8 +112,10 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
BulkShardRequest request, IndexShard primary) throws Exception {
final IndexMetaData metaData = primary.indexSettings().getIndexMetaData();
Translog.Location location = null;
final MappingUpdatePerformer mappingUpdater = new ConcreteMappingUpdatePerformer();
for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
location = executeBulkItemRequest(metaData, primary, request, location, requestIndex);
location = executeBulkItemRequest(metaData, primary, request, location, requestIndex,
updateHelper, threadPool::absoluteTimeInMillis, mappingUpdater);
}
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
BulkItemRequest[] items = request.items();
@ -117,57 +126,56 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return new WritePrimaryResult<>(request, response, location, null, primary, logger);
}
/** Executes bulk item requests and handles request execution exceptions */
private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexShard primary,
BulkShardRequest request,
Translog.Location location, int requestIndex) throws Exception {
final DocWriteRequest itemRequest = request.items()[requestIndex].request();
final DocWriteRequest.OpType opType = itemRequest.opType();
final Engine.Result operationResult;
final DocWriteResponse response;
final BulkItemRequest replicaRequest;
switch (itemRequest.opType()) {
case CREATE:
case INDEX:
final IndexRequest indexRequest = (IndexRequest) itemRequest;
Engine.IndexResult indexResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
response = indexResult.hasFailure() ? null :
new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), indexResult.getSeqNo(),
indexResult.getVersion(), indexResult.isCreated());
operationResult = indexResult;
replicaRequest = request.items()[requestIndex];
break;
case UPDATE:
UpdateResultHolder updateResultHolder = executeUpdateRequest(((UpdateRequest) itemRequest),
primary, metaData, request, requestIndex);
operationResult = updateResultHolder.operationResult;
response = updateResultHolder.response;
replicaRequest = updateResultHolder.replicaRequest;
break;
case DELETE:
final DeleteRequest deleteRequest = (DeleteRequest) itemRequest;
Engine.DeleteResult deleteResult = executeDeleteRequestOnPrimary(deleteRequest, primary);
response = deleteResult.hasFailure() ? null :
new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(), deleteResult.getSeqNo(),
deleteResult.getVersion(), deleteResult.isFound());
operationResult = deleteResult;
replicaRequest = request.items()[requestIndex];
break;
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
}
// update the bulk item request because update request execution can mutate the bulk item request
request.items()[requestIndex] = replicaRequest;
private static BulkItemResultHolder executeIndexRequest(final IndexRequest indexRequest,
final BulkItemRequest bulkItemRequest,
final IndexShard primary,
final MappingUpdatePerformer mappingUpdater) throws Exception {
Engine.IndexResult indexResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater);
if (indexResult.hasFailure()) {
return new BulkItemResultHolder(null, indexResult, bulkItemRequest);
} else {
IndexResponse response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
indexResult.getSeqNo(), indexResult.getVersion(), indexResult.isCreated());
return new BulkItemResultHolder(response, indexResult, bulkItemRequest);
}
}
private static BulkItemResultHolder executeDeleteRequest(final DeleteRequest deleteRequest,
final BulkItemRequest bulkItemRequest,
final IndexShard primary) throws IOException {
Engine.DeleteResult deleteResult = executeDeleteRequestOnPrimary(deleteRequest, primary);
if (deleteResult.hasFailure()) {
return new BulkItemResultHolder(null, deleteResult, bulkItemRequest);
} else {
DeleteResponse response = new DeleteResponse(primary.shardId(), deleteRequest.type(), deleteRequest.id(),
deleteResult.getSeqNo(), deleteResult.getVersion(), deleteResult.isFound());
return new BulkItemResultHolder(response, deleteResult, bulkItemRequest);
}
}
// Visible for unit testing
static Translog.Location updateReplicaRequest(BulkItemResultHolder bulkItemResult,
final DocWriteRequest.OpType opType,
final Translog.Location originalLocation,
BulkShardRequest request) {
final Engine.Result operationResult = bulkItemResult.operationResult;
final DocWriteResponse response = bulkItemResult.response;
final BulkItemRequest replicaRequest = bulkItemResult.replicaRequest;
if (operationResult == null) { // in case of noop update operation
assert response.getResult() == DocWriteResponse.Result.NOOP
: "only noop update can have null operation";
assert response.getResult() == DocWriteResponse.Result.NOOP : "only noop updates can have a null operation";
replicaRequest.setPrimaryResponse(new BulkItemResponse(replicaRequest.id(), opType, response));
return originalLocation;
} else if (operationResult.hasFailure() == false) {
location = locationToSync(location, operationResult.getTranslogLocation());
BulkItemResponse primaryResponse = new BulkItemResponse(replicaRequest.id(), opType, response);
replicaRequest.setPrimaryResponse(primaryResponse);
// set the ShardInfo to 0 so we can safely send it to the replicas. We won't use it in the real response though.
// set a blank ShardInfo so we can safely send it to the replicas. We won't use it in the real response though.
primaryResponse.getResponse().setShardInfo(new ShardInfo());
// The operation was successful, advance the translog
return locationToSync(originalLocation, operationResult.getTranslogLocation());
} else {
DocWriteRequest docWriteRequest = replicaRequest.request();
Exception failure = operationResult.getFailure();
@ -178,15 +186,57 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
logger.debug((Supplier<?>) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
request.shardId(), docWriteRequest.opType().getLowercase(), request), failure);
}
// if its a conflict failure, and we already executed the request on a primary (and we execute it
// if it's a conflict failure, and we already executed the request on a primary (and we execute it
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
// then just use the response we got from the successful execution
// then just use the response we got from the failed execution
if (replicaRequest.getPrimaryResponse() == null || isConflictException(failure) == false) {
replicaRequest.setPrimaryResponse(new BulkItemResponse(replicaRequest.id(), docWriteRequest.opType(),
new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), failure)));
replicaRequest.setPrimaryResponse(
new BulkItemResponse(replicaRequest.id(), docWriteRequest.opType(),
// Make sure to use request.indox() here, if you
// use docWriteRequest.index() it will use the
// concrete index instead of an alias if used!
new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), failure)));
}
return originalLocation;
}
assert replicaRequest.getPrimaryResponse() != null;
}
/** Executes bulk item requests and handles request execution exceptions */
static Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexShard primary,
BulkShardRequest request, Translog.Location location,
int requestIndex, UpdateHelper updateHelper,
LongSupplier nowInMillisSupplier,
final MappingUpdatePerformer mappingUpdater) throws Exception {
final DocWriteRequest itemRequest = request.items()[requestIndex].request();
final DocWriteRequest.OpType opType = itemRequest.opType();
final BulkItemResultHolder responseHolder;
switch (itemRequest.opType()) {
case CREATE:
case INDEX:
responseHolder = executeIndexRequest((IndexRequest) itemRequest,
request.items()[requestIndex], primary, mappingUpdater);
break;
case UPDATE:
responseHolder = executeUpdateRequest((UpdateRequest) itemRequest, primary, metaData, request,
requestIndex, updateHelper, nowInMillisSupplier, mappingUpdater);
break;
case DELETE:
responseHolder = executeDeleteRequest((DeleteRequest) itemRequest, request.items()[requestIndex], primary);
break;
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
}
final BulkItemRequest replicaRequest = responseHolder.replicaRequest;
// update the bulk item request because update request execution can mutate the bulk item request
request.items()[requestIndex] = replicaRequest;
// Modify the replica request, if needed, and return a new translog location
location = updateReplicaRequest(responseHolder, opType, location, request);
assert replicaRequest.getPrimaryResponse() != null : "replica request must have a primary response";
return location;
}
@ -194,29 +244,18 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException;
}
private static class UpdateResultHolder {
final BulkItemRequest replicaRequest;
final Engine.Result operationResult;
final DocWriteResponse response;
private UpdateResultHolder(BulkItemRequest replicaRequest, Engine.Result operationResult,
DocWriteResponse response) {
this.replicaRequest = replicaRequest;
this.operationResult = operationResult;
this.response = response;
}
}
/**
* Executes update request, delegating to a index or delete operation after translation,
* handles retries on version conflict and constructs update response
* NOTE: reassigns bulk item request at <code>requestIndex</code> for replicas to
* execute translated update request (NOOP update is an exception). NOOP updates are
* indicated by returning a <code>null</code> operation in {@link UpdateResultHolder}
* indicated by returning a <code>null</code> operation in {@link BulkItemResultHolder}
* */
private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, IndexShard primary,
IndexMetaData metaData, BulkShardRequest request,
int requestIndex) throws Exception {
private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateRequest, IndexShard primary,
IndexMetaData metaData, BulkShardRequest request,
int requestIndex, UpdateHelper updateHelper,
LongSupplier nowInMillis,
final MappingUpdatePerformer mappingUpdater) throws Exception {
Engine.Result updateOperationResult = null;
UpdateResponse updateResponse = null;
BulkItemRequest replicaRequest = request.items()[requestIndex];
@ -225,7 +264,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
final UpdateHelper.Result translate;
// translate update request
try {
translate = updateHelper.prepare(updateRequest, primary, threadPool::absoluteTimeInMillis);
translate = updateHelper.prepare(updateRequest, primary, nowInMillis);
} catch (Exception failure) {
// we may fail translating a update to index or delete operation
// we use index result to communicate failure while translating update request
@ -239,7 +278,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
IndexRequest indexRequest = translate.action();
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
indexRequest.process(mappingMd, request.index());
updateOperationResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
updateOperationResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater);
break;
case DELETED:
DeleteRequest deleteRequest = translate.action();
@ -300,7 +339,14 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
break; // out of retry loop
}
}
return new UpdateResultHolder(replicaRequest, updateOperationResult, updateResponse);
return new BulkItemResultHolder(updateResponse, updateOperationResult, replicaRequest);
}
static boolean shouldExecuteReplicaItem(final BulkItemRequest request, final int index) {
final BulkItemResponse primaryResponse = request.getPrimaryResponse();
assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request ["+ request.request() +"]";
return primaryResponse.isFailed() == false &&
primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP;
}
@Override
@ -308,9 +354,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
assert item.getPrimaryResponse() != null : "expected primary response to be set for item [" + i + "] request ["+ item.request() +"]";
if (item.getPrimaryResponse().isFailed() == false &&
item.getPrimaryResponse().getResponse().getResult() != DocWriteResponse.Result.NOOP) {
if (shouldExecuteReplicaItem(item, i)) {
DocWriteRequest docWriteRequest = item.request();
DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse();
final Engine.Result operationResult;
@ -352,7 +396,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return new WriteReplicaResult<>(request, location, null, replica, logger);
}
private Translog.Location locationToSync(Translog.Location current, Translog.Location next) {
private static Translog.Location locationToSync(Translog.Location current, Translog.Location next) {
/* here we are moving forward in the translog with each operation. Under the hood
* this might cross translog files which is ok since from the user perspective
* the translog is like a tape where only the highest location needs to be fsynced
@ -391,7 +435,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
/** Utility method to prepare an index operation on primary shards */
private static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) {
static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) {
SourceToParse sourceToParse =
SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), request.id(), request.source(),
request.getContentType()).routing(request.routing()).parent(request.parent());
@ -400,36 +444,12 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
/** Executes index operation on primary shard after updates mapping if dynamic mappings are found */
public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
MappingUpdatedAction mappingUpdatedAction) throws Exception {
Engine.Index operation;
try {
operation = prepareIndexOperationOnPrimary(request, primary);
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.IndexResult(e, request.version());
MappingUpdatePerformer mappingUpdater) throws Exception {
MappingUpdatePerformer.MappingUpdateResult result = mappingUpdater.updateMappingsIfNeeded(primary, request);
if (result.isFailed()) {
return new Engine.IndexResult(result.failure, request.version());
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final ShardId shardId = primary.shardId();
if (update != null) {
// can throw timeout exception when updating mappings or ISE for attempting to update default mappings
// which are bubbled up
try {
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
} catch (IllegalArgumentException e) {
// throws IAE on conflicts merging dynamic mappings
return new Engine.IndexResult(e, request.version());
}
try {
operation = prepareIndexOperationOnPrimary(request, primary);
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.IndexResult(e, request.version());
}
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new ReplicationOperation.RetryOnPrimaryException(shardId,
"Dynamic mappings are not available on the node that holds the primary yet");
}
}
return primary.index(operation);
return primary.index(result.operation);
}
private static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) throws IOException {
@ -445,4 +465,39 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
primaryResponse.getSeqNo(), request.primaryTerm(), version, versionType);
return replica.delete(delete);
}
class ConcreteMappingUpdatePerformer implements MappingUpdatePerformer {
@Nullable
public MappingUpdateResult updateMappingsIfNeeded(IndexShard primary, IndexRequest request) throws Exception {
Engine.Index operation;
try {
operation = prepareIndexOperationOnPrimary(request, primary);
} catch (MapperParsingException | IllegalArgumentException e) {
return new MappingUpdateResult(e);
}
final Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final ShardId shardId = primary.shardId();
if (update != null) {
// can throw timeout exception when updating mappings or ISE for attempting to update default mappings
// which are bubbled up
try {
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
} catch (IllegalArgumentException e) {
// throws IAE on conflicts merging dynamic mappings
return new MappingUpdateResult(e);
}
try {
operation = prepareIndexOperationOnPrimary(request, primary);
} catch (MapperParsingException | IllegalArgumentException e) {
return new MappingUpdateResult(e);
}
if (operation.parsedDoc().dynamicMappingsUpdate() != null) {
throw new ReplicationOperation.RetryOnPrimaryException(shardId,
"Dynamic mappings are not available on the node that holds the primary yet");
}
}
return new MappingUpdateResult(operation);
}
}
}

View File

@ -668,7 +668,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
public final long translogLocation;
public final int size;
Location(long generation, long translogLocation, int size) {
public Location(long generation, long translogLocation, int size) {
this.generation = generation;
this.translogLocation = translogLocation;
this.size = size;

View File

@ -0,0 +1,493 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import java.io.IOException;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.containsString;
public class TransportShardBulkActionTests extends IndexShardTestCase {
private final ShardId shardId = new ShardId("index", "_na_", 0);
private final Settings idxSettings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT.id)
.build();
private IndexMetaData indexMetaData() throws IOException {
return IndexMetaData.builder("index")
.putMapping("type", "{\"properties\": {\"foo\": {\"type\": \"text\"}}}")
.settings(idxSettings)
.primaryTerm(0, 1).build();
}
public void testShouldExecuteReplicaItem() throws Exception {
// Successful index request should be replicated
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id").source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 1, randomBoolean());
BulkItemRequest request = new BulkItemRequest(0, writeRequest);
request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response));
assertTrue(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0));
// Failed index requests should not be replicated (for now!)
writeRequest = new IndexRequest("index", "type", "id").source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
response = new IndexResponse(shardId, "type", "id", 1, 1, randomBoolean());
request = new BulkItemRequest(0, writeRequest);
request.setPrimaryResponse(
new BulkItemResponse(0, DocWriteRequest.OpType.INDEX,
new BulkItemResponse.Failure("test", "type", "id", new IllegalArgumentException("i died"))));
assertFalse(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0));
// NOOP requests should not be replicated
writeRequest = new UpdateRequest("index", "type", "id");
response = new UpdateResponse(shardId, "type", "id", 1, DocWriteResponse.Result.NOOP);
request = new BulkItemRequest(0, writeRequest);
request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.UPDATE, response));
assertFalse(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0));
}
public void testExecuteBulkIndexRequest() throws Exception {
IndexMetaData metaData = indexMetaData();
IndexShard shard = newStartedShard(true);
BulkItemRequest[] items = new BulkItemRequest[1];
boolean create = randomBoolean();
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id")
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar")
.create(create);
BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest);
items[0] = primaryRequest;
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
Translog.Location location = new Translog.Location(0, 0, 0);
UpdateHelper updateHelper = null;
Translog.Location newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData, shard, bulkShardRequest,
location, 0, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer());
// Translog should change, since there were no problems
assertThat(newLocation, not(location));
BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse();
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
assertThat(primaryResponse.getOpType(), equalTo(create ? DocWriteRequest.OpType.CREATE : DocWriteRequest.OpType.INDEX));
assertFalse(primaryResponse.isFailed());
// Assert that the document actually made it there
assertDocCount(shard, 1);
writeRequest = new IndexRequest("index", "type", "id")
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar")
.create(true);
primaryRequest = new BulkItemRequest(0, writeRequest);
items[0] = primaryRequest;
bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
Translog.Location secondLocation = TransportShardBulkAction.executeBulkItemRequest(metaData, shard, bulkShardRequest,
newLocation, 0, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer());
// Translog should not change, since the document was not indexed due to a version conflict
assertThat(secondLocation, equalTo(newLocation));
BulkItemRequest replicaRequest = bulkShardRequest.items()[0];
primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse();
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.CREATE));
// Should be failed since the document already exists
assertTrue(primaryResponse.isFailed());
BulkItemResponse.Failure failure = primaryResponse.getFailure();
assertThat(failure.getIndex(), equalTo("index"));
assertThat(failure.getType(), equalTo("type"));
assertThat(failure.getId(), equalTo("id"));
assertThat(failure.getCause().getClass(), equalTo(VersionConflictEngineException.class));
assertThat(failure.getCause().getMessage(),
containsString("version conflict, document already exists (current version [1])"));
assertThat(failure.getStatus(), equalTo(RestStatus.CONFLICT));
assertThat(replicaRequest, equalTo(primaryRequest));
// Assert that the document count is still 1
assertDocCount(shard, 1);
closeShards(shard);
}
public void testExecuteBulkIndexRequestWithRejection() throws Exception {
IndexMetaData metaData = indexMetaData();
IndexShard shard = newStartedShard(true);
BulkItemRequest[] items = new BulkItemRequest[1];
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id").source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
items[0] = new BulkItemRequest(0, writeRequest);
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
Translog.Location location = new Translog.Location(0, 0, 0);
UpdateHelper updateHelper = null;
// Pretend the mappings haven't made it to the node yet, and throw a rejection
Exception err = new ReplicationOperation.RetryOnPrimaryException(shardId, "rejection");
try {
TransportShardBulkAction.executeBulkItemRequest(metaData, shard, bulkShardRequest, location,
0, updateHelper, threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(err));
fail("should have thrown a retry exception");
} catch (ReplicationOperation.RetryOnPrimaryException e) {
assertThat(e, equalTo(err));
}
closeShards(shard);
}
public void testExecuteBulkIndexRequestWithConflictingMappings() throws Exception {
IndexMetaData metaData = indexMetaData();
IndexShard shard = newStartedShard(true);
BulkItemRequest[] items = new BulkItemRequest[1];
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id").source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
items[0] = new BulkItemRequest(0, writeRequest);
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
Translog.Location location = new Translog.Location(0, 0, 0);
UpdateHelper updateHelper = null;
// Return a mapping conflict (IAE) when trying to update the mapping
Exception err = new IllegalArgumentException("mapping conflict");
Translog.Location newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData, shard, bulkShardRequest,
location, 0, updateHelper, threadPool::absoluteTimeInMillis, new FailingMappingUpdatePerformer(err));
// Translog shouldn't change, as there were conflicting mappings
assertThat(newLocation, equalTo(location));
BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse();
// Since this was not a conflict failure, the primary response
// should be filled out with the failure information
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.INDEX));
assertTrue(primaryResponse.isFailed());
assertThat(primaryResponse.getFailureMessage(), containsString("mapping conflict"));
BulkItemResponse.Failure failure = primaryResponse.getFailure();
assertThat(failure.getIndex(), equalTo("index"));
assertThat(failure.getType(), equalTo("type"));
assertThat(failure.getId(), equalTo("id"));
assertThat(failure.getCause(), equalTo(err));
assertThat(failure.getStatus(), equalTo(RestStatus.BAD_REQUEST));
closeShards(shard);
}
public void testExecuteBulkDeleteRequest() throws Exception {
IndexMetaData metaData = indexMetaData();
IndexShard shard = newStartedShard(true);
BulkItemRequest[] items = new BulkItemRequest[1];
DocWriteRequest writeRequest = new DeleteRequest("index", "type", "id");
items[0] = new BulkItemRequest(0, writeRequest);
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
Translog.Location location = new Translog.Location(0, 0, 0);
UpdateHelper updateHelper = null;
Translog.Location newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData, shard, bulkShardRequest,
location, 0, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer());
// Translog changes, even though the document didn't exist
assertThat(newLocation, not(location));
BulkItemRequest replicaRequest = bulkShardRequest.items()[0];
DocWriteRequest replicaDeleteRequest = replicaRequest.request();
BulkItemResponse primaryResponse = replicaRequest.getPrimaryResponse();
DeleteResponse response = primaryResponse.getResponse();
// Any version can be matched on replica
assertThat(replicaDeleteRequest.version(), equalTo(Versions.MATCH_ANY));
assertThat(replicaDeleteRequest.versionType(), equalTo(VersionType.INTERNAL));
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.DELETE));
assertFalse(primaryResponse.isFailed());
assertThat(response.getResult(), equalTo(DocWriteResponse.Result.NOT_FOUND));
assertThat(response.getShardId(), equalTo(shard.shardId()));
assertThat(response.getIndex(), equalTo("index"));
assertThat(response.getType(), equalTo("type"));
assertThat(response.getId(), equalTo("id"));
assertThat(response.getVersion(), equalTo(1L));
assertThat(response.getSeqNo(), equalTo(0L));
assertThat(response.forcedRefresh(), equalTo(false));
// Now do the same after indexing the document, it should now find and delete the document
indexDoc(shard, "type", "id", "{\"foo\": \"bar\"}");
writeRequest = new DeleteRequest("index", "type", "id");
items[0] = new BulkItemRequest(0, writeRequest);
bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
location = newLocation;
newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData, shard, bulkShardRequest,
location, 0, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer());
// Translog changes, because the document was deleted
assertThat(newLocation, not(location));
replicaRequest = bulkShardRequest.items()[0];
replicaDeleteRequest = replicaRequest.request();
primaryResponse = replicaRequest.getPrimaryResponse();
response = primaryResponse.getResponse();
// Any version can be matched on replica
assertThat(replicaDeleteRequest.version(), equalTo(Versions.MATCH_ANY));
assertThat(replicaDeleteRequest.versionType(), equalTo(VersionType.INTERNAL));
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.DELETE));
assertFalse(primaryResponse.isFailed());
assertThat(response.getResult(), equalTo(DocWriteResponse.Result.DELETED));
assertThat(response.getShardId(), equalTo(shard.shardId()));
assertThat(response.getIndex(), equalTo("index"));
assertThat(response.getType(), equalTo("type"));
assertThat(response.getId(), equalTo("id"));
assertThat(response.getVersion(), equalTo(3L));
assertThat(response.getSeqNo(), equalTo(2L));
assertThat(response.forcedRefresh(), equalTo(false));
assertDocCount(shard, 0);
closeShards(shard);
}
public void testNoopUpdateReplicaRequest() throws Exception {
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id").source(Requests.INDEX_CONTENT_TYPE, "field", "value");
BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest);
DocWriteResponse noopUpdateResponse = new UpdateResponse(shardId, "index", "id", 0, DocWriteResponse.Result.NOOP);
BulkItemResultHolder noopResults = new BulkItemResultHolder(noopUpdateResponse, null, replicaRequest);
Translog.Location location = new Translog.Location(0, 0, 0);
BulkItemRequest[] items = new BulkItemRequest[0];
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
Translog.Location newLocation = TransportShardBulkAction.updateReplicaRequest(noopResults,
DocWriteRequest.OpType.UPDATE, location, bulkShardRequest);
BulkItemResponse primaryResponse = replicaRequest.getPrimaryResponse();
// Basically nothing changes in the request since it's a noop
assertThat(newLocation, equalTo(location));
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE));
assertThat(primaryResponse.getResponse(), equalTo(noopUpdateResponse));
assertThat(primaryResponse.getResponse().getResult(), equalTo(DocWriteResponse.Result.NOOP));
}
public void testUpdateReplicaRequestWithFailure() throws Exception {
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id").source(Requests.INDEX_CONTENT_TYPE, "field", "value");
BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest);
Exception err = new ElasticsearchException("I'm dead <(x.x)>");
Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0);
BulkItemResultHolder failedResults = new BulkItemResultHolder(null, indexResult, replicaRequest);
Translog.Location location = new Translog.Location(0, 0, 0);
BulkItemRequest[] items = new BulkItemRequest[0];
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
Translog.Location newLocation = TransportShardBulkAction.updateReplicaRequest(failedResults,
DocWriteRequest.OpType.UPDATE, location, bulkShardRequest);
BulkItemResponse primaryResponse = replicaRequest.getPrimaryResponse();
// Since this was not a conflict failure, the primary response
// should be filled out with the failure information
assertThat(newLocation, equalTo(location));
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.INDEX));
assertTrue(primaryResponse.isFailed());
assertThat(primaryResponse.getFailureMessage(), containsString("I'm dead <(x.x)>"));
BulkItemResponse.Failure failure = primaryResponse.getFailure();
assertThat(failure.getIndex(), equalTo("index"));
assertThat(failure.getType(), equalTo("type"));
assertThat(failure.getId(), equalTo("id"));
assertThat(failure.getCause(), equalTo(err));
assertThat(failure.getStatus(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
}
public void testUpdateReplicaRequestWithConflictFailure() throws Exception {
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id").source(Requests.INDEX_CONTENT_TYPE, "field", "value");
BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest);
Exception err = new VersionConflictEngineException(shardId, "type", "id", "I'm conflicted <(;_;)>");
Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0);
BulkItemResultHolder failedResults = new BulkItemResultHolder(null, indexResult, replicaRequest);
Translog.Location location = new Translog.Location(0, 0, 0);
BulkItemRequest[] items = new BulkItemRequest[0];
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
Translog.Location newLocation = TransportShardBulkAction.updateReplicaRequest(failedResults,
DocWriteRequest.OpType.UPDATE, location, bulkShardRequest);
BulkItemResponse primaryResponse = replicaRequest.getPrimaryResponse();
// Since this was not a conflict failure, the primary response
// should be filled out with the failure information
assertThat(newLocation, equalTo(location));
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.INDEX));
assertTrue(primaryResponse.isFailed());
assertThat(primaryResponse.getFailureMessage(), containsString("I'm conflicted <(;_;)>"));
BulkItemResponse.Failure failure = primaryResponse.getFailure();
assertThat(failure.getIndex(), equalTo("index"));
assertThat(failure.getType(), equalTo("type"));
assertThat(failure.getId(), equalTo("id"));
assertThat(failure.getCause(), equalTo(err));
assertThat(failure.getStatus(), equalTo(RestStatus.CONFLICT));
}
public void testUpdateReplicaRequestWithSuccess() throws Exception {
DocWriteRequest writeRequest = new IndexRequest("index", "type", "id").source(Requests.INDEX_CONTENT_TYPE, "field", "value");
BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest);
boolean created = randomBoolean();
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
Engine.IndexResult indexResult = new FakeResult(1, 1, created, resultLocation);
DocWriteResponse indexResponse = new IndexResponse(shardId, "index", "id", 1, 1, created);
BulkItemResultHolder goodResults = new BulkItemResultHolder(indexResponse, indexResult, replicaRequest);
Translog.Location originalLocation = new Translog.Location(21, 21, 21);
BulkItemRequest[] items = new BulkItemRequest[0];
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
Translog.Location newLocation = TransportShardBulkAction.updateReplicaRequest(goodResults,
DocWriteRequest.OpType.INDEX, originalLocation, bulkShardRequest);
BulkItemResponse primaryResponse = replicaRequest.getPrimaryResponse();
// Check that the translog is successfully advanced
assertThat(newLocation, equalTo(resultLocation));
// Since this was not a conflict failure, the primary response
// should be filled out with the failure information
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.INDEX));
DocWriteResponse response = primaryResponse.getResponse();
assertThat(response.status(), equalTo(created ? RestStatus.CREATED : RestStatus.OK));
}
/**
* Fake IndexResult that has a settable translog location
*/
private static class FakeResult extends Engine.IndexResult {
private final Translog.Location location;
protected FakeResult(long version, long seqNo, boolean created, Translog.Location location) {
super(version, seqNo, created);
this.location = location;
}
@Override
public Translog.Location getTranslogLocation() {
return this.location;
}
}
/** Doesn't perform any mapping updates */
public static class NoopMappingUpdatePerformer implements MappingUpdatePerformer {
public MappingUpdatePerformer.MappingUpdateResult updateMappingsIfNeeded(IndexShard primary,
IndexRequest request) throws Exception {
Engine.Index operation = TransportShardBulkAction.prepareIndexOperationOnPrimary(request, primary);
return new MappingUpdatePerformer.MappingUpdateResult(operation);
}
}
/** Always returns the given failure */
private class FailingMappingUpdatePerformer implements MappingUpdatePerformer {
private final Exception e;
FailingMappingUpdatePerformer(Exception e) {
this.e = e;
}
public MappingUpdatePerformer.MappingUpdateResult updateMappingsIfNeeded(IndexShard primary,
IndexRequest request) throws Exception {
return new MappingUpdatePerformer.MappingUpdateResult(e);
}
}
/** Always throw the given exception */
private class ThrowingMappingUpdatePerformer implements MappingUpdatePerformer {
private final Exception e;
ThrowingMappingUpdatePerformer(Exception e) {
this.e = e;
}
public MappingUpdatePerformer.MappingUpdateResult updateMappingsIfNeeded(IndexShard primary,
IndexRequest request) throws Exception {
throw e;
}
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.bulk.BulkShardResponse;
import org.elasticsearch.action.bulk.TransportShardBulkActionTests;
import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
@ -549,7 +550,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
*/
protected IndexResponse indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary,
null);
new TransportShardBulkActionTests.NoopMappingUpdatePerformer());
request.primaryTerm(primary.getPrimaryTerm());
TransportWriteActionTestHelper.performPostWriteActions(primary, request, indexResult.getTranslogLocation(), logger);
return new IndexResponse(
@ -591,5 +592,4 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
replica.getTranslog().sync();
}
}
}