diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml
index 8d0fbe3edda..c95ad03f9ac 100644
--- a/buildSrc/src/main/resources/checkstyle_suppressions.xml
+++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml
@@ -2304,7 +2304,6 @@
-
diff --git a/core/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java b/core/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java
index 27d636d3d93..e33c54d6dd5 100644
--- a/core/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java
+++ b/core/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java
@@ -20,48 +20,28 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.ShardId;
import java.util.Objects;
public interface MappingUpdatePerformer {
- /**
- * Determine if any mappings need to be updated, and update them on the
- * master node if necessary. Returnes a failed {@code Engine.IndexResult}
- * in the event updating the mappings fails or null if successful.
- * Throws a {@code ReplicationOperation.RetryOnPrimaryException} if the
- * operation needs to be retried on the primary due to the mappings not
- * being present yet, or a different exception if updating the mappings
- * on the master failed.
- */
- @Nullable
- MappingUpdateResult updateMappingsIfNeeded(IndexShard primary, IndexRequest request) throws Exception;
/**
- * Class encapsulating the resulting of potentially updating the mapping
+ * Determine if any mappings need to be updated, and update them on the master node if
+ * necessary. Returnes a failure Exception in the event updating the mappings fails or null if
+ * successful.
*/
- class MappingUpdateResult {
- @Nullable
- public final Engine.Index operation;
- @Nullable
- public final Exception failure;
+ void updateMappingsIfNeeded(Engine.Index operation,
+ ShardId shardId,
+ String type) throws Exception;
- MappingUpdateResult(Exception failure) {
- Objects.requireNonNull(failure, "failure cannot be null");
- this.failure = failure;
- this.operation = null;
- }
+ /**
+ * Throws a {@code ReplicationOperation.RetryOnPrimaryException} if the operation needs to be
+ * retried on the primary due to the mappings not being present yet, or a different exception if
+ * updating the mappings on the master failed.
+ */
+ void verifyMappings(Engine.Index operation, ShardId shardId) throws Exception;
- MappingUpdateResult(Engine.Index operation) {
- Objects.requireNonNull(operation, "operation cannot be null");
- this.operation = operation;
- this.failure = null;
- }
-
- public boolean isFailed() {
- return failure != null;
- }
- }
}
diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
index d1bba8d2d4a..6a286c5a758 100644
--- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
+++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
@@ -65,6 +65,9 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.index.translog.Translog.Location;
+import org.elasticsearch.action.bulk.BulkItemResultHolder;
+import org.elasticsearch.action.bulk.BulkItemResponse;
import java.io.IOException;
import java.util.Map;
@@ -154,10 +157,23 @@ public class TransportShardBulkAction extends TransportWriteAction(request, location, null, replica, logger);
}
- private static Translog.Location locationToSync(Translog.Location current, Translog.Location next) {
- /* here we are moving forward in the translog with each operation. Under the hood
- * this might cross translog files which is ok since from the user perspective
- * the translog is like a tape where only the highest location needs to be fsynced
- * in order to sync all previous locations even though they are not in the same file.
- * When the translog rolls over files the previous file is fsynced on after closing if needed.*/
+ private static Translog.Location locationToSync(Translog.Location current,
+ Translog.Location next) {
+ /* here we are moving forward in the translog with each operation. Under the hood this might
+ * cross translog files which is ok since from the user perspective the translog is like a
+ * tape where only the highest location needs to be fsynced in order to sync all previous
+ * locations even though they are not in the same file. When the translog rolls over files
+ * the previous file is fsynced on after closing if needed.*/
assert next != null : "next operation can't be null";
- assert current == null || current.compareTo(next) < 0 : "translog locations are not increasing";
+ assert current == null || current.compareTo(next) < 0 :
+ "translog locations are not increasing";
return next;
}
@@ -411,45 +429,82 @@ public class TransportShardBulkAction extends TransportWriteAction");
Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0);
- BulkItemResultHolder failedResults = new BulkItemResultHolder(null, indexResult, replicaRequest);
+ BulkItemResultHolder failedResults = new BulkItemResultHolder(null, indexResult,
+ replicaRequest);
Translog.Location location = new Translog.Location(0, 0, 0);
BulkItemRequest[] items = new BulkItemRequest[0];
- BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
- Translog.Location newLocation = TransportShardBulkAction.updateReplicaRequest(failedResults,
- DocWriteRequest.OpType.UPDATE, location, bulkShardRequest);
+ BulkShardRequest bulkShardRequest =
+ new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
+ BulkItemResponse primaryResponse =
+ TransportShardBulkAction.createPrimaryResponse(
+ failedResults, DocWriteRequest.OpType.UPDATE, bulkShardRequest);
- BulkItemResponse primaryResponse = replicaRequest.getPrimaryResponse();
+ Translog.Location newLocation =
+ TransportShardBulkAction.calculateTranslogLocation(location, failedResults);
// Since this was not a conflict failure, the primary response
// should be filled out with the failure information
@@ -378,20 +410,26 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
}
public void testUpdateReplicaRequestWithConflictFailure() throws Exception {
- DocWriteRequest writeRequest = new IndexRequest("index", "type", "id").source(Requests.INDEX_CONTENT_TYPE, "field", "value");
+ DocWriteRequest writeRequest = new IndexRequest("index", "type", "id")
+ .source(Requests.INDEX_CONTENT_TYPE, "field", "value");
BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest);
- Exception err = new VersionConflictEngineException(shardId, "type", "id", "I'm conflicted <(;_;)>");
+ Exception err = new VersionConflictEngineException(shardId, "type", "id",
+ "I'm conflicted <(;_;)>");
Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0);
- BulkItemResultHolder failedResults = new BulkItemResultHolder(null, indexResult, replicaRequest);
+ BulkItemResultHolder failedResults = new BulkItemResultHolder(null, indexResult,
+ replicaRequest);
Translog.Location location = new Translog.Location(0, 0, 0);
BulkItemRequest[] items = new BulkItemRequest[0];
- BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
- Translog.Location newLocation = TransportShardBulkAction.updateReplicaRequest(failedResults,
- DocWriteRequest.OpType.UPDATE, location, bulkShardRequest);
+ BulkShardRequest bulkShardRequest =
+ new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
+ BulkItemResponse primaryResponse =
+ TransportShardBulkAction.createPrimaryResponse(
+ failedResults, DocWriteRequest.OpType.UPDATE, bulkShardRequest);
- BulkItemResponse primaryResponse = replicaRequest.getPrimaryResponse();
+ Translog.Location newLocation =
+ TransportShardBulkAction.calculateTranslogLocation(location, failedResults);
// Since this was not a conflict failure, the primary response
// should be filled out with the failure information
@@ -410,22 +448,27 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
}
public void testUpdateReplicaRequestWithSuccess() throws Exception {
- DocWriteRequest writeRequest = new IndexRequest("index", "type", "id").source(Requests.INDEX_CONTENT_TYPE, "field", "value");
+ DocWriteRequest writeRequest = new IndexRequest("index", "type", "id")
+ .source(Requests.INDEX_CONTENT_TYPE, "field", "value");
BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest);
boolean created = randomBoolean();
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
Engine.IndexResult indexResult = new FakeResult(1, 1, created, resultLocation);
DocWriteResponse indexResponse = new IndexResponse(shardId, "index", "id", 1, 1, created);
- BulkItemResultHolder goodResults = new BulkItemResultHolder(indexResponse, indexResult, replicaRequest);
+ BulkItemResultHolder goodResults =
+ new BulkItemResultHolder(indexResponse, indexResult, replicaRequest);
Translog.Location originalLocation = new Translog.Location(21, 21, 21);
BulkItemRequest[] items = new BulkItemRequest[0];
- BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
- Translog.Location newLocation = TransportShardBulkAction.updateReplicaRequest(goodResults,
- DocWriteRequest.OpType.INDEX, originalLocation, bulkShardRequest);
+ BulkShardRequest bulkShardRequest =
+ new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
+ BulkItemResponse primaryResponse =
+ TransportShardBulkAction.createPrimaryResponse(
+ goodResults, DocWriteRequest.OpType.INDEX, bulkShardRequest);
- BulkItemResponse primaryResponse = replicaRequest.getPrimaryResponse();
+ Translog.Location newLocation =
+ TransportShardBulkAction.calculateTranslogLocation(originalLocation, goodResults);
// Check that the translog is successfully advanced
assertThat(newLocation, equalTo(resultLocation));
@@ -438,6 +481,61 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
assertThat(response.status(), equalTo(created ? RestStatus.CREATED : RestStatus.OK));
}
+ public void testCalculateTranslogLocation() throws Exception {
+ final Translog.Location original = new Translog.Location(0, 0, 0);
+
+ DocWriteRequest writeRequest = new IndexRequest("index", "type", "id")
+ .source(Requests.INDEX_CONTENT_TYPE, "field", "value");
+ BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest);
+ BulkItemResultHolder results = new BulkItemResultHolder(null, null, replicaRequest);
+
+ assertThat(TransportShardBulkAction.calculateTranslogLocation(original, results),
+ equalTo(original));
+
+ boolean created = randomBoolean();
+ DocWriteResponse indexResponse = new IndexResponse(shardId, "index", "id", 1, 1, created);
+ Translog.Location newLocation = new Translog.Location(1, 1, 1);
+ Engine.IndexResult indexResult = new IndexResultWithLocation(randomNonNegativeLong(),
+ randomNonNegativeLong(), created, newLocation);
+ results = new BulkItemResultHolder(indexResponse, indexResult, replicaRequest);
+ assertThat(TransportShardBulkAction.calculateTranslogLocation(original, results),
+ equalTo(newLocation));
+
+ }
+
+ public class IndexResultWithLocation extends Engine.IndexResult {
+ private final Translog.Location location;
+ public IndexResultWithLocation(long version, long seqNo, boolean created,
+ Translog.Location newLocation) {
+ super(version, seqNo, created);
+ this.location = newLocation;
+ }
+
+ @Override
+ public Translog.Location getTranslogLocation() {
+ return this.location;
+ }
+ }
+
+ public void testPrepareIndexOpOnReplica() throws Exception {
+ IndexMetaData metaData = indexMetaData();
+ IndexShard shard = newStartedShard(false);
+
+ DocWriteResponse primaryResponse = new IndexResponse(shardId, "index", "id",
+ 1, 1, randomBoolean());
+ IndexRequest request = new IndexRequest("index", "type", "id")
+ .source(Requests.INDEX_CONTENT_TYPE, "field", "value");
+
+ Engine.Index op = TransportShardBulkAction.prepareIndexOperationOnReplica(
+ primaryResponse, request, shard);
+
+ assertThat(op.version(), equalTo(primaryResponse.getVersion()));
+ assertThat(op.seqNo(), equalTo(primaryResponse.getSeqNo()));
+ assertThat(op.versionType(), equalTo(VersionType.EXTERNAL));
+
+ closeShards(shard);
+ }
+
/**
* Fake IndexResult that has a settable translog location
*/
@@ -445,7 +543,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
private final Translog.Location location;
- protected FakeResult(long version, long seqNo, boolean created, Translog.Location location) {
+ protected FakeResult(long version, long seqNo, boolean created,
+ Translog.Location location) {
super(version, seqNo, created);
this.location = location;
}
@@ -458,23 +557,12 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
/** Doesn't perform any mapping updates */
public static class NoopMappingUpdatePerformer implements MappingUpdatePerformer {
- public MappingUpdatePerformer.MappingUpdateResult updateMappingsIfNeeded(IndexShard primary,
- IndexRequest request) throws Exception {
- Engine.Index operation = TransportShardBulkAction.prepareIndexOperationOnPrimary(request, primary);
- return new MappingUpdatePerformer.MappingUpdateResult(operation);
- }
- }
-
- /** Always returns the given failure */
- private class FailingMappingUpdatePerformer implements MappingUpdatePerformer {
- private final Exception e;
- FailingMappingUpdatePerformer(Exception e) {
- this.e = e;
+ public void updateMappingsIfNeeded(Engine.Index operation,
+ ShardId shardId,
+ String type) throws Exception {
}
- public MappingUpdatePerformer.MappingUpdateResult updateMappingsIfNeeded(IndexShard primary,
- IndexRequest request) throws Exception {
- return new MappingUpdatePerformer.MappingUpdateResult(e);
+ public void verifyMappings(Engine.Index operation, ShardId shardId) throws Exception {
}
}
@@ -485,8 +573,30 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
this.e = e;
}
- public MappingUpdatePerformer.MappingUpdateResult updateMappingsIfNeeded(IndexShard primary,
- IndexRequest request) throws Exception {
+ public void updateMappingsIfNeeded(Engine.Index operation,
+ ShardId shardId,
+ String type) throws Exception {
+ throw e;
+ }
+
+ public void verifyMappings(Engine.Index operation, ShardId shardId) throws Exception {
+ fail("should not have gotten to this point");
+ }
+ }
+
+ /** Always throw the given exception */
+ private class ThrowingVerifyingMappingUpdatePerformer implements MappingUpdatePerformer {
+ private final Exception e;
+ ThrowingVerifyingMappingUpdatePerformer(Exception e) {
+ this.e = e;
+ }
+
+ public void updateMappingsIfNeeded(Engine.Index operation,
+ ShardId shardId,
+ String type) throws Exception {
+ }
+
+ public void verifyMappings(Engine.Index operation, ShardId shardId) throws Exception {
throw e;
}
}