Disallow externally generated autoGeneratedTimestamp (#47341)

The autoGeneratedTimestamp field is internally used to speed up indexing of operations with
auto-ids, as we can rule out duplicates. Setting this field externally can make the index
inconsistent, resulting in duplicate documents with same id.
This commit is contained in:
Yannick Welsch 2019-10-02 11:02:53 +02:00
parent 8c11fe610e
commit 0024695dd8
4 changed files with 29 additions and 4 deletions

View File

@ -172,7 +172,11 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
if (actionRequest instanceof IndexRequest) {
((IndexRequest) actionRequest).checkAutoIdWithOpTypeCreateSupportedByVersion(minNodeVersion);
IndexRequest ir = (IndexRequest) actionRequest;
ir.checkAutoIdWithOpTypeCreateSupportedByVersion(minNodeVersion);
if (ir.getAutoGeneratedTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) {
throw new IllegalArgumentException("autoGeneratedTimestamp should not be set externally");
}
}
}

View File

@ -108,7 +108,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
* Value for {@link #getAutoGeneratedTimestamp()} if the document has an external
* provided ID.
*/
public static final int UNSET_AUTO_GENERATED_TIMESTAMP = -1;
public static final long UNSET_AUTO_GENERATED_TIMESTAMP = -1L;
private long autoGeneratedTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP;
@ -636,7 +636,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
// generate id if not already provided
if (id == null) {
assert autoGeneratedTimestamp == -1 : "timestamp has already been generated!";
assert autoGeneratedTimestamp == UNSET_AUTO_GENERATED_TIMESTAMP : "timestamp has already been generated!";
assert ifSeqNo == UNASSIGNED_SEQ_NO;
assert ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM;
autoGeneratedTimestamp = Math.max(0, System.currentTimeMillis()); // extra paranoia

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
@ -46,6 +47,7 @@ import java.util.concurrent.ExecutionException;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
public class BulkIntegrationIT extends ESIntegTestCase {
@ -100,6 +102,17 @@ public class BulkIntegrationIT extends ESIntegTestCase {
assertFalse(client().prepareGet("index3", "type", "id").setRouting("1").get().isExists());
}
// allowing the auto-generated timestamp to externally be set would allow making the index inconsistent with duplicate docs
public void testExternallySetAutoGeneratedTimestamp() {
IndexRequest indexRequest = new IndexRequest("index1", "_doc").source(Collections.singletonMap("foo", "baz"));
indexRequest.process(Version.CURRENT, null, null); // sets the timestamp
if (randomBoolean()) {
indexRequest.id("test");
}
assertThat(expectThrows(IllegalArgumentException.class, () -> client().prepareBulk().add(indexRequest).get()).getMessage(),
containsString("autoGeneratedTimestamp should not be set externally"));
}
public void testBulkWithGlobalDefaults() throws Exception {
// all requests in the json are missing index and type parameters: "_index" : "test", "_type" : "type1",
String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk-missing-index-type.json");

View File

@ -579,7 +579,15 @@ public class BulkWithUpdatesIT extends ESIntegTestCase {
assertBusy(() -> assertAcked(client().admin().indices().prepareClose("bulkindex2")));
BulkResponse bulkResponse = client().bulk(bulkRequest).get();
BulkRequest bulkRequest2 = new BulkRequest();
bulkRequest2.add(new IndexRequest("bulkindex1", "index1_type", "1").source(Requests.INDEX_CONTENT_TYPE, "text", "hallo1"))
.add(new IndexRequest("bulkindex2", "index2_type", "1").source(Requests.INDEX_CONTENT_TYPE, "text", "hallo2"))
.add(new IndexRequest("bulkindex2", "index2_type").source(Requests.INDEX_CONTENT_TYPE, "text", "hallo2"))
.add(new UpdateRequest("bulkindex2", "index2_type", "2").doc(Requests.INDEX_CONTENT_TYPE, "foo", "bar"))
.add(new DeleteRequest("bulkindex2", "index2_type", "3"))
.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
BulkResponse bulkResponse = client().bulk(bulkRequest2).get();
assertThat(bulkResponse.hasFailures(), is(true));
assertThat(bulkResponse.getItems().length, is(5));
}