mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-27 02:18:42 +00:00
Allow optype CREATE for append-only indexing operations (#47169)
Bulk requests currently do not allow adding "create" actions with auto-generated IDs. This commit allows using the optype CREATE for append-only indexing operations. This is mainly the user facing aspect of it.
This commit is contained in:
parent
a032f9b2d5
commit
7b2613db55
@ -19,19 +19,23 @@
|
||||
package org.elasticsearch.upgrades;
|
||||
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.ResponseException;
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.rest.action.document.RestBulkAction;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.either;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
/**
|
||||
@ -145,6 +149,60 @@ public class IndexingIT extends AbstractRollingTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testAutoIdWithOpTypeCreate() throws IOException {
|
||||
final String indexName = "auto_id_and_op_type_create_index";
|
||||
StringBuilder b = new StringBuilder();
|
||||
b.append("{\"create\": {\"_index\": \"").append(indexName).append("\"}}\n");
|
||||
b.append("{\"f1\": \"v\"}\n");
|
||||
Request bulk = new Request("POST", "/_bulk");
|
||||
bulk.addParameter("refresh", "true");
|
||||
bulk.setJsonEntity(b.toString());
|
||||
|
||||
switch (CLUSTER_TYPE) {
|
||||
case OLD:
|
||||
Request createTestIndex = new Request("PUT", "/" + indexName);
|
||||
createTestIndex.setJsonEntity("{\"settings\": {\"index.number_of_replicas\": 0}}");
|
||||
client().performRequest(createTestIndex);
|
||||
break;
|
||||
case MIXED:
|
||||
Request waitForGreen = new Request("GET", "/_cluster/health");
|
||||
waitForGreen.addParameter("wait_for_nodes", "3");
|
||||
client().performRequest(waitForGreen);
|
||||
|
||||
Version minNodeVersion = null;
|
||||
Map<?, ?> response = entityAsMap(client().performRequest(new Request("GET", "_nodes")));
|
||||
Map<?, ?> nodes = (Map<?, ?>) response.get("nodes");
|
||||
for (Map.Entry<?, ?> node : nodes.entrySet()) {
|
||||
Map<?, ?> nodeInfo = (Map<?, ?>) node.getValue();
|
||||
Version nodeVersion = Version.fromString(nodeInfo.get("version").toString());
|
||||
if (minNodeVersion == null) {
|
||||
minNodeVersion = nodeVersion;
|
||||
} else if (nodeVersion.before(minNodeVersion)) {
|
||||
minNodeVersion = nodeVersion;
|
||||
}
|
||||
}
|
||||
|
||||
if (minNodeVersion.before(Version.V_7_5_0)) {
|
||||
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(bulk));
|
||||
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
|
||||
assertThat(e.getMessage(),
|
||||
// if request goes to 7.5+ node
|
||||
either(containsString("optype create not supported for indexing requests without explicit id until"))
|
||||
// if request goes to < 7.5 node
|
||||
.or(containsString("an id must be provided if version type or value are set")
|
||||
));
|
||||
} else {
|
||||
client().performRequest(bulk);
|
||||
}
|
||||
break;
|
||||
case UPGRADED:
|
||||
client().performRequest(bulk);
|
||||
break;
|
||||
default:
|
||||
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
|
||||
}
|
||||
}
|
||||
|
||||
private void bulk(String index, String valueSuffix, int count) throws IOException {
|
||||
StringBuilder b = new StringBuilder();
|
||||
for (int i = 0; i < count; i++) {
|
||||
|
@ -61,6 +61,50 @@
|
||||
|
||||
- match: { count: 2 }
|
||||
|
||||
---
|
||||
"Empty _id with op_type create":
|
||||
- skip:
|
||||
version: " - 7.99.99"
|
||||
reason: "auto id + op type create only supported since 8.0"
|
||||
|
||||
- do:
|
||||
bulk:
|
||||
refresh: true
|
||||
body:
|
||||
- index:
|
||||
_index: test
|
||||
_id: ''
|
||||
- f: 1
|
||||
- index:
|
||||
_index: test
|
||||
_id: id
|
||||
- f: 2
|
||||
- index:
|
||||
_index: test
|
||||
- f: 3
|
||||
- create:
|
||||
_index: test
|
||||
- f: 4
|
||||
- index:
|
||||
_index: test
|
||||
op_type: create
|
||||
- f: 5
|
||||
- match: { errors: true }
|
||||
- match: { items.0.index.status: 400 }
|
||||
- match: { items.0.index.error.type: illegal_argument_exception }
|
||||
- match: { items.0.index.error.reason: if _id is specified it must not be empty }
|
||||
- match: { items.1.index.result: created }
|
||||
- match: { items.2.index.result: created }
|
||||
- match: { items.3.create.result: created }
|
||||
- match: { items.4.create.result: created }
|
||||
|
||||
- do:
|
||||
count:
|
||||
index: test
|
||||
|
||||
- match: { count: 4 }
|
||||
|
||||
|
||||
---
|
||||
"empty action":
|
||||
|
||||
|
@ -56,6 +56,53 @@
|
||||
|
||||
- match: { count: 2 }
|
||||
|
||||
---
|
||||
"Empty _id with op_type create":
|
||||
- skip:
|
||||
version: " - 7.99.99"
|
||||
reason: "auto id + op type create only supported since 8.0"
|
||||
|
||||
- do:
|
||||
bulk:
|
||||
refresh: true
|
||||
body:
|
||||
- index:
|
||||
_index: test
|
||||
_type: type
|
||||
_id: ''
|
||||
- f: 1
|
||||
- index:
|
||||
_index: test
|
||||
_type: type
|
||||
_id: id
|
||||
- f: 2
|
||||
- index:
|
||||
_index: test
|
||||
_type: type
|
||||
- f: 3
|
||||
- create:
|
||||
_index: test
|
||||
_type: type
|
||||
- f: 4
|
||||
- index:
|
||||
_index: test
|
||||
op_type: create
|
||||
- f: 5
|
||||
- match: { errors: true }
|
||||
- match: { items.0.index.status: 400 }
|
||||
- match: { items.0.index.error.type: illegal_argument_exception }
|
||||
- match: { items.0.index.error.reason: if _id is specified it must not be empty }
|
||||
- match: { items.1.index.result: created }
|
||||
- match: { items.2.index.result: created }
|
||||
- match: { items.3.create.result: created }
|
||||
- match: { items.4.create.result: created }
|
||||
|
||||
- do:
|
||||
count:
|
||||
index: test
|
||||
|
||||
- match: { count: 4 }
|
||||
|
||||
---
|
||||
"empty action":
|
||||
- skip:
|
||||
|
@ -162,6 +162,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||
|
||||
boolean hasIndexRequestsWithPipelines = false;
|
||||
final MetaData metaData = clusterService.state().getMetaData();
|
||||
final Version minNodeVersion = clusterService.state().getNodes().getMinNodeVersion();
|
||||
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
|
||||
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
|
||||
if (indexRequest != null) {
|
||||
@ -169,6 +170,10 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||
boolean indexRequestHasPipeline = resolveRequiredOrDefaultPipeline(actionRequest, indexRequest, metaData);
|
||||
hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
|
||||
}
|
||||
|
||||
if (actionRequest instanceof IndexRequest) {
|
||||
((IndexRequest) actionRequest).checkAutoIdWithOpTypeCreateSupportedByVersion(minNodeVersion);
|
||||
}
|
||||
}
|
||||
|
||||
if (hasIndexRequestsWithPipelines) {
|
||||
|
@ -206,8 +206,9 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
|
||||
if (contentType == null) {
|
||||
validationException = addValidationError("content type is missing", validationException);
|
||||
}
|
||||
assert opType == OpType.INDEX || opType == OpType.CREATE : "unexpected op-type: " + opType;
|
||||
final long resolvedVersion = resolveVersionDefaults();
|
||||
if (opType() == OpType.CREATE) {
|
||||
if (opType == OpType.CREATE) {
|
||||
if (versionType != VersionType.INTERNAL) {
|
||||
validationException = addValidationError("create operations only support internal versioning. use index instead",
|
||||
validationException);
|
||||
@ -227,8 +228,11 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
|
||||
}
|
||||
}
|
||||
|
||||
if (opType() != OpType.INDEX && id == null) {
|
||||
addValidationError("an id is required for a " + opType() + " operation", validationException);
|
||||
if (id == null) {
|
||||
if (versionType != VersionType.INTERNAL ||
|
||||
(resolvedVersion != Versions.MATCH_DELETED && resolvedVersion != Versions.MATCH_ANY)) {
|
||||
validationException = addValidationError("an id must be provided if version type or value are set", validationException);
|
||||
}
|
||||
}
|
||||
|
||||
validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException);
|
||||
@ -238,10 +242,6 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
|
||||
id.getBytes(StandardCharsets.UTF_8).length, validationException);
|
||||
}
|
||||
|
||||
if (id == null && (versionType == VersionType.INTERNAL && resolvedVersion == Versions.MATCH_ANY) == false) {
|
||||
validationException = addValidationError("an id must be provided if version type or value are set", validationException);
|
||||
}
|
||||
|
||||
if (pipeline != null && pipeline.isEmpty()) {
|
||||
validationException = addValidationError("pipeline cannot be an empty string", validationException);
|
||||
}
|
||||
@ -655,8 +655,16 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
|
||||
routing(metaData.resolveWriteIndexRouting(routing, index));
|
||||
}
|
||||
|
||||
public void checkAutoIdWithOpTypeCreateSupportedByVersion(Version version) {
|
||||
if (id == null && opType == OpType.CREATE && version.before(Version.V_7_5_0)) {
|
||||
throw new IllegalArgumentException("optype create not supported for indexing requests without explicit id until all nodes " +
|
||||
"are on version 7.5.0 or higher");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
checkAutoIdWithOpTypeCreateSupportedByVersion(out.getVersion());
|
||||
super.writeTo(out);
|
||||
// A 7.x request allows null types but if deserialized in a 6.x node will cause nullpointer exceptions.
|
||||
// So we use the type accessor method here to make the type non-null (will default it to "_doc").
|
||||
|
@ -803,7 +803,8 @@ public class InternalEngine extends Engine {
|
||||
}
|
||||
|
||||
protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) {
|
||||
assert (index.version() == Versions.MATCH_ANY && index.versionType() == VersionType.INTERNAL)
|
||||
assert (index.version() == Versions.MATCH_DELETED || index.version() == Versions.MATCH_ANY) &&
|
||||
index.versionType() == VersionType.INTERNAL
|
||||
: "version: " + index.version() + " type: " + index.versionType();
|
||||
return true;
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.elasticsearch.action.bulk;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
@ -28,6 +29,7 @@ import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
@ -107,6 +109,9 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa
|
||||
ClusterState state = mock(ClusterState.class);
|
||||
when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
|
||||
when(clusterService.state()).thenReturn(state);
|
||||
DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
|
||||
when(state.getNodes()).thenReturn(discoveryNodes);
|
||||
when(discoveryNodes.getMinNodeVersion()).thenReturn(Version.CURRENT);
|
||||
DiscoveryNode localNode = mock(DiscoveryNode.class);
|
||||
when(clusterService.localNode()).thenReturn(localNode);
|
||||
when(localNode.isIngestNode()).thenReturn(randomBoolean());
|
||||
|
@ -189,6 +189,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||
ImmutableOpenMap<String, DiscoveryNode> ingestNodes = ImmutableOpenMap.<String, DiscoveryNode>builder(2)
|
||||
.fPut("node1", remoteNode1).fPut("node2", remoteNode2).build();
|
||||
when(nodes.getIngestNodes()).thenReturn(ingestNodes);
|
||||
when(nodes.getMinNodeVersion()).thenReturn(Version.CURRENT);
|
||||
ClusterState state = mock(ClusterState.class);
|
||||
when(state.getNodes()).thenReturn(nodes);
|
||||
MetaData metaData = MetaData.builder().indices(ImmutableOpenMap.<String, IndexMetaData>builder()
|
||||
|
@ -189,12 +189,13 @@ public class DocumentActionsIT extends ESIntegTestCase {
|
||||
.add(client().prepareIndex().setIndex("test").setType("type1").setId("1").setSource(source("1", "test")))
|
||||
.add(client().prepareIndex().setIndex("test").setType("type1").setId("2").setSource(source("2", "test")).setCreate(true))
|
||||
.add(client().prepareIndex().setIndex("test").setType("type1").setSource(source("3", "test")))
|
||||
.add(client().prepareIndex().setIndex("test").setType("type1").setCreate(true).setSource(source("4", "test")))
|
||||
.add(client().prepareDelete().setIndex("test").setType("type1").setId("1"))
|
||||
.add(client().prepareIndex().setIndex("test").setType("type1").setSource("{ xxx }", XContentType.JSON)) // failure
|
||||
.execute().actionGet();
|
||||
|
||||
assertThat(bulkResponse.hasFailures(), equalTo(true));
|
||||
assertThat(bulkResponse.getItems().length, equalTo(5));
|
||||
assertThat(bulkResponse.getItems().length, equalTo(6));
|
||||
|
||||
assertThat(bulkResponse.getItems()[0].isFailed(), equalTo(false));
|
||||
assertThat(bulkResponse.getItems()[0].getOpType(), equalTo(OpType.INDEX));
|
||||
@ -215,15 +216,21 @@ public class DocumentActionsIT extends ESIntegTestCase {
|
||||
String generatedId3 = bulkResponse.getItems()[2].getId();
|
||||
|
||||
assertThat(bulkResponse.getItems()[3].isFailed(), equalTo(false));
|
||||
assertThat(bulkResponse.getItems()[3].getOpType(), equalTo(OpType.DELETE));
|
||||
assertThat(bulkResponse.getItems()[3].getOpType(), equalTo(OpType.CREATE));
|
||||
assertThat(bulkResponse.getItems()[3].getIndex(), equalTo(getConcreteIndexName()));
|
||||
assertThat(bulkResponse.getItems()[3].getType(), equalTo("type1"));
|
||||
assertThat(bulkResponse.getItems()[3].getId(), equalTo("1"));
|
||||
String generatedId4 = bulkResponse.getItems()[3].getId();
|
||||
|
||||
assertThat(bulkResponse.getItems()[4].isFailed(), equalTo(true));
|
||||
assertThat(bulkResponse.getItems()[4].getOpType(), equalTo(OpType.INDEX));
|
||||
assertThat(bulkResponse.getItems()[4].isFailed(), equalTo(false));
|
||||
assertThat(bulkResponse.getItems()[4].getOpType(), equalTo(OpType.DELETE));
|
||||
assertThat(bulkResponse.getItems()[4].getIndex(), equalTo(getConcreteIndexName()));
|
||||
assertThat(bulkResponse.getItems()[4].getType(), equalTo("type1"));
|
||||
assertThat(bulkResponse.getItems()[4].getId(), equalTo("1"));
|
||||
|
||||
assertThat(bulkResponse.getItems()[5].isFailed(), equalTo(true));
|
||||
assertThat(bulkResponse.getItems()[5].getOpType(), equalTo(OpType.INDEX));
|
||||
assertThat(bulkResponse.getItems()[5].getIndex(), equalTo(getConcreteIndexName()));
|
||||
assertThat(bulkResponse.getItems()[5].getType(), equalTo("type1"));
|
||||
|
||||
waitForRelocation(ClusterHealthStatus.GREEN);
|
||||
RefreshResponse refreshResponse = client().admin().indices().prepareRefresh("test").execute().actionGet();
|
||||
@ -243,6 +250,10 @@ public class DocumentActionsIT extends ESIntegTestCase {
|
||||
getResult = client().get(getRequest("test").type("type1").id(generatedId3)).actionGet();
|
||||
assertThat("cycle #" + i, getResult.getSourceAsString(), equalTo(Strings.toString(source("3", "test"))));
|
||||
assertThat(getResult.getIndex(), equalTo(getConcreteIndexName()));
|
||||
|
||||
getResult = client().get(getRequest("test").id(generatedId4)).actionGet();
|
||||
assertThat("cycle #" + i, getResult.getSourceAsString(), equalTo(Strings.toString(source("4", "test"))));
|
||||
assertThat(getResult.getIndex(), equalTo(getConcreteIndexName()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -758,7 +758,8 @@ public class InternalEngineTests extends EngineTestCase {
|
||||
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
|
||||
if (randomBoolean()) {
|
||||
final Engine.Index operation = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO,
|
||||
0, i, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
|
||||
0, i, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(),
|
||||
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, UNASSIGNED_SEQ_NO, 0);
|
||||
operations.add(operation);
|
||||
initialEngine.index(operation);
|
||||
} else {
|
||||
@ -3639,7 +3640,8 @@ public class InternalEngineTests extends EngineTestCase {
|
||||
long autoGeneratedIdTimestamp = 0;
|
||||
|
||||
Engine.Index index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
|
||||
randomBoolean() ? Versions.MATCH_DELETED : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(),
|
||||
autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
|
||||
Engine.IndexResult indexResult = engine.index(index);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
|
||||
@ -3649,7 +3651,7 @@ public class InternalEngineTests extends EngineTestCase {
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
|
||||
isRetry = true;
|
||||
index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL,
|
||||
index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL,
|
||||
PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
|
||||
indexResult = engine.index(index);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
@ -3678,7 +3680,8 @@ public class InternalEngineTests extends EngineTestCase {
|
||||
long autoGeneratedIdTimestamp = 0;
|
||||
|
||||
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
|
||||
randomBoolean() ? Versions.MATCH_DELETED : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(),
|
||||
autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
|
||||
Engine.IndexResult result = engine.index(firstIndexRequest);
|
||||
assertThat(result.getVersion(), equalTo(1L));
|
||||
|
||||
@ -3689,7 +3692,8 @@ public class InternalEngineTests extends EngineTestCase {
|
||||
|
||||
isRetry = false;
|
||||
Engine.Index secondIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
|
||||
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO,
|
||||
0);
|
||||
Engine.IndexResult indexResult = engine.index(secondIndexRequest);
|
||||
assertTrue(indexResult.isCreated());
|
||||
engine.refresh("test");
|
||||
@ -3717,8 +3721,9 @@ public class InternalEngineTests extends EngineTestCase {
|
||||
}
|
||||
|
||||
public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp) {
|
||||
return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL,
|
||||
Engine.Operation.Origin.PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, retry, UNASSIGNED_SEQ_NO, 0);
|
||||
return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, randomBoolean() ? Versions.MATCH_DELETED : Versions.MATCH_ANY,
|
||||
VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, retry,
|
||||
UNASSIGNED_SEQ_NO, 0);
|
||||
}
|
||||
|
||||
public Engine.Index appendOnlyReplica(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp, final long seqNo) {
|
||||
@ -4760,8 +4765,8 @@ public class InternalEngineTests extends EngineTestCase {
|
||||
Versions.MATCH_ANY,
|
||||
VersionType.INTERNAL,
|
||||
Engine.Operation.Origin.PRIMARY,
|
||||
System.currentTimeMillis(),
|
||||
System.currentTimeMillis(),
|
||||
System.nanoTime(),
|
||||
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
|
||||
randomBoolean(), UNASSIGNED_SEQ_NO, 0);
|
||||
final Engine.IndexResult indexResult = e.index(index);
|
||||
assertThat(indexResult.getSeqNo(), equalTo(seqNo));
|
||||
@ -4776,7 +4781,7 @@ public class InternalEngineTests extends EngineTestCase {
|
||||
Versions.MATCH_ANY,
|
||||
VersionType.INTERNAL,
|
||||
Engine.Operation.Origin.PRIMARY,
|
||||
System.currentTimeMillis(), UNASSIGNED_SEQ_NO, 0);
|
||||
System.nanoTime(), UNASSIGNED_SEQ_NO, 0);
|
||||
final Engine.DeleteResult deleteResult = e.delete(delete);
|
||||
assertThat(deleteResult.getSeqNo(), equalTo(seqNo + 1));
|
||||
assertThat(seqNoGenerator.get(), equalTo(seqNo + 2));
|
||||
@ -5126,8 +5131,8 @@ public class InternalEngineTests extends EngineTestCase {
|
||||
engine.onSettingsChanged();
|
||||
ParsedDocument document = testParsedDocument(Integer.toString(0), null, testDocumentWithTextField(), SOURCE, null);
|
||||
final Engine.Index doc = new Engine.Index(newUid(document), document, UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false,
|
||||
UNASSIGNED_SEQ_NO, 0);
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(),
|
||||
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, UNASSIGNED_SEQ_NO, 0);
|
||||
// first index an append only document and then delete it. such that we have it in the tombstones
|
||||
engine.index(doc);
|
||||
engine.delete(new Engine.Delete(doc.type(), doc.id(), doc.uid(), primaryTerm.get()));
|
||||
@ -5135,16 +5140,18 @@ public class InternalEngineTests extends EngineTestCase {
|
||||
// now index more append only docs and refresh so we re-enabel the optimization for unsafe version map
|
||||
ParsedDocument document1 = testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null);
|
||||
engine.index(new Engine.Index(newUid(document1), document1, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL,
|
||||
Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false, UNASSIGNED_SEQ_NO, 0));
|
||||
Engine.Operation.Origin.PRIMARY, System.nanoTime(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
UNASSIGNED_SEQ_NO, 0));
|
||||
engine.refresh("test");
|
||||
ParsedDocument document2 = testParsedDocument(Integer.toString(2), null, testDocumentWithTextField(), SOURCE, null);
|
||||
engine.index(new Engine.Index(newUid(document2), document2, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL,
|
||||
Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false, UNASSIGNED_SEQ_NO, 0));
|
||||
Engine.Operation.Origin.PRIMARY, System.nanoTime(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
|
||||
UNASSIGNED_SEQ_NO, 0));
|
||||
engine.refresh("test");
|
||||
ParsedDocument document3 = testParsedDocument(Integer.toString(3), null, testDocumentWithTextField(), SOURCE, null);
|
||||
final Engine.Index doc3 = new Engine.Index(newUid(document3), document3, UNASSIGNED_SEQ_NO, 0,
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false,
|
||||
UNASSIGNED_SEQ_NO, 0);
|
||||
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(),
|
||||
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, UNASSIGNED_SEQ_NO, 0);
|
||||
engine.index(doc3);
|
||||
engine.engineConfig.setEnableGcDeletes(true);
|
||||
// once we are here the version map is unsafe again and we need to do a refresh inside the get calls to ensure we
|
||||
|
@ -212,7 +212,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
||||
Versions.MATCH_ANY,
|
||||
VersionType.INTERNAL,
|
||||
new SourceToParse("index", "type", "primary", new BytesArray("{}"), XContentType.JSON),
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, 0, randomNonNegativeLong(),
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
|
||||
false);
|
||||
}
|
||||
final IndexShard recoveredReplica =
|
||||
|
@ -91,7 +91,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
|
||||
shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
|
||||
new SourceToParse(shard.shardId().getIndexName(), "_doc", Integer.toString(i), new BytesArray("{}"), XContentType.JSON),
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
|
||||
randomBoolean() ? IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP : randomNonNegativeLong(), true);
|
||||
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, true);
|
||||
}
|
||||
|
||||
long globalCheckPoint = numDocs > 0 ? randomIntBetween(0, numDocs - 1) : 0;
|
||||
|
@ -21,6 +21,7 @@ import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
@ -310,7 +311,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
|
||||
assert source != null : "_source is null but should have been filtered out at snapshot time";
|
||||
Engine.Result result = targetShard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
|
||||
new SourceToParse(index, uid.type(), uid.id(), source, XContentHelper.xContentType(source),
|
||||
rootFieldsVisitor.routing()), SequenceNumbers.UNASSIGNED_SEQ_NO, 0, 1, false);
|
||||
rootFieldsVisitor.routing()), SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
|
||||
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
|
||||
if (result.getResultType() != Engine.Result.Type.SUCCESS) {
|
||||
throw new IllegalStateException("failed applying post restore operation result: " + result
|
||||
.getResultType(), result.getFailure());
|
||||
|
Loading…
x
Reference in New Issue
Block a user