[CCR] FollowingEngine should fail with 403 if operation has no seqno assigned (#37213)
Fail with a 403 when indexing a document directly into a follower index. In order to test this change, I had to move specific assertions into a dedicated class and disable assertions for that class in the rest qa module. I think that is the right trade off.
This commit is contained in:
parent
c812e6aea6
commit
6d81e7c3e7
|
@ -13,6 +13,10 @@ task restTest(type: RestIntegTestTask) {
|
|||
|
||||
restTestCluster {
|
||||
distribution 'zip'
|
||||
// Disable assertions in FollowingEngineAssertions, otherwise an AssertionError is thrown before
|
||||
// indexing a document directly in a follower index. In a rest test we like to test the exception
|
||||
// that is thrown in production when indexing a document directly in a follower index.
|
||||
environment 'ES_JAVA_OPTS', '-da:org.elasticsearch.xpack.ccr.index.engine.FollowingEngineAssertions'
|
||||
setting 'xpack.ml.enabled', 'false'
|
||||
setting 'xpack.monitoring.enabled', 'false'
|
||||
setting 'xpack.security.enabled', 'true'
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
---
|
||||
"Test indexing direcly into a follower index":
|
||||
- do:
|
||||
cluster.state: {}
|
||||
|
||||
- set: {master_node: master}
|
||||
|
||||
- do:
|
||||
nodes.info: {}
|
||||
|
||||
- set: {nodes.$master.transport_address: local_ip}
|
||||
|
||||
- do:
|
||||
cluster.put_settings:
|
||||
body:
|
||||
transient:
|
||||
cluster.remote.local.seeds: $local_ip
|
||||
flat_settings: true
|
||||
|
||||
- match: {transient: {cluster.remote.local.seeds: $local_ip}}
|
||||
|
||||
- do:
|
||||
indices.create:
|
||||
index: foo
|
||||
body:
|
||||
settings:
|
||||
index:
|
||||
soft_deletes:
|
||||
enabled: true
|
||||
mappings:
|
||||
doc:
|
||||
properties:
|
||||
field:
|
||||
type: keyword
|
||||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
ccr.follow:
|
||||
index: bar
|
||||
body:
|
||||
remote_cluster: local
|
||||
leader_index: foo
|
||||
- is_true: follow_index_created
|
||||
- is_true: follow_index_shards_acked
|
||||
- is_true: index_following_started
|
||||
|
||||
- do:
|
||||
catch: forbidden
|
||||
index:
|
||||
index: bar
|
||||
body: {}
|
||||
|
||||
- do:
|
||||
ccr.pause_follow:
|
||||
index: bar
|
||||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
indices.close:
|
||||
index: bar
|
||||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
ccr.unfollow:
|
||||
index: bar
|
||||
- is_true: acknowledged
|
|
@ -16,6 +16,7 @@ import org.apache.lucene.search.DocValuesFieldExistsQuery;
|
|||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.metrics.CounterMetric;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
|
@ -23,6 +24,7 @@ import org.elasticsearch.index.engine.EngineConfig;
|
|||
import org.elasticsearch.index.engine.InternalEngine;
|
||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -56,17 +58,11 @@ public final class FollowingEngine extends InternalEngine {
|
|||
}
|
||||
|
||||
private void preFlight(final Operation operation) {
|
||||
/*
|
||||
* We assert here so that this goes uncaught in unit tests and fails nodes in standalone tests (we want a harsh failure so that we
|
||||
* do not have a situation where a shard fails and is recovered elsewhere and a test subsequently passes). We throw an exception so
|
||||
* that we also prevent issues in production code.
|
||||
*/
|
||||
assert operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
|
||||
assert FollowingEngineAssertions.preFlight(operation);
|
||||
if (operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||
throw new IllegalStateException("a following engine does not accept operations without an assigned sequence number");
|
||||
throw new ElasticsearchStatusException("a following engine does not accept operations without an assigned sequence number",
|
||||
RestStatus.FORBIDDEN);
|
||||
}
|
||||
assert (operation.origin() == Operation.Origin.PRIMARY) == (operation.versionType() == VersionType.EXTERNAL) :
|
||||
"invalid version_type in a following engine; version_type=" + operation.versionType() + "origin=" + operation.origin();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -133,8 +129,7 @@ public final class FollowingEngine extends InternalEngine {
|
|||
|
||||
@Override
|
||||
protected boolean assertPrimaryIncomingSequenceNumber(final Operation.Origin origin, final long seqNo) {
|
||||
// sequence number should be set when operation origin is primary
|
||||
assert seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO : "primary operations on a following index must have an assigned sequence number";
|
||||
assert FollowingEngineAssertions.assertPrimaryIncomingSequenceNumber(origin, seqNo);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr.index.engine;
|
||||
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
|
||||
/**
|
||||
* Moved preflight and assertPrimaryIncomingSequenceNumber check to its own class,
|
||||
* so that when testing writing directly into a follower index,
|
||||
* only these assertions here need to be disabled instead of all assertions in FollowingEngine class.
|
||||
*/
|
||||
final class FollowingEngineAssertions {
|
||||
|
||||
static boolean preFlight(final Engine.Operation operation) {
|
||||
/*
|
||||
* We assert here so that this goes uncaught in unit tests and fails nodes in standalone tests (we want a harsh failure so that we
|
||||
* do not have a situation where a shard fails and is recovered elsewhere and a test subsequently passes). We throw an exception so
|
||||
* that we also prevent issues in production code.
|
||||
*/
|
||||
assert operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
|
||||
assert (operation.origin() == Engine.Operation.Origin.PRIMARY) == (operation.versionType() == VersionType.EXTERNAL) :
|
||||
"invalid version_type in a following engine; version_type=" + operation.versionType() + "origin=" + operation.origin();
|
||||
return true;
|
||||
}
|
||||
|
||||
static boolean assertPrimaryIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
|
||||
// sequence number should be set when operation origin is primary
|
||||
assert seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO : "primary operations on a following index must have an assigned sequence number";
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue