ShardFollowNodeTask should fetch operation once (#32455)
Today ShardFollowNodeTask might fetch some operations more than once. This happens because we ask the leading for up to max_batch_count operations (instead of the left-over size) for the left-over request. The leading then can freely respond up to the max_batch_count, and at the same time, if one of the previous requests completed, we might issue another read request whose range overlaps with the response of the left-over request. Closes #32453
This commit is contained in:
parent
1fdc3f08be
commit
8cfbb64d6e
|
@ -103,13 +103,21 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
||||||
params.getFollowShardId(), lastRequestedSeqno, leaderGlobalCheckpoint);
|
params.getFollowShardId(), lastRequestedSeqno, leaderGlobalCheckpoint);
|
||||||
final int maxBatchOperationCount = params.getMaxBatchOperationCount();
|
final int maxBatchOperationCount = params.getMaxBatchOperationCount();
|
||||||
while (hasReadBudget() && lastRequestedSeqno < leaderGlobalCheckpoint) {
|
while (hasReadBudget() && lastRequestedSeqno < leaderGlobalCheckpoint) {
|
||||||
|
final long from = lastRequestedSeqno + 1;
|
||||||
|
final long maxRequiredSeqNo = Math.min(leaderGlobalCheckpoint, from + maxBatchOperationCount - 1);
|
||||||
|
final int requestBatchCount;
|
||||||
|
if (numConcurrentReads == 0) {
|
||||||
|
// This is the only request, we can optimistically fetch more documents if possible but not enforce max_required_seqno.
|
||||||
|
requestBatchCount = maxBatchOperationCount;
|
||||||
|
} else {
|
||||||
|
requestBatchCount = Math.toIntExact(maxRequiredSeqNo - from + 1);
|
||||||
|
}
|
||||||
|
assert 0 < requestBatchCount && requestBatchCount <= maxBatchOperationCount : "request_batch_count=" + requestBatchCount;
|
||||||
|
LOGGER.trace("{}[{} ongoing reads] read from_seqno={} max_required_seqno={} batch_count={}",
|
||||||
|
params.getFollowShardId(), numConcurrentReads, from, maxRequiredSeqNo, requestBatchCount);
|
||||||
numConcurrentReads++;
|
numConcurrentReads++;
|
||||||
long from = lastRequestedSeqno + 1;
|
sendShardChangesRequest(from, requestBatchCount, maxRequiredSeqNo);
|
||||||
// -1 is needed, because maxRequiredSeqno is inclusive
|
lastRequestedSeqno = maxRequiredSeqNo;
|
||||||
long maxRequiredSeqno = Math.min(leaderGlobalCheckpoint, (from + maxBatchOperationCount) - 1);
|
|
||||||
LOGGER.trace("{}[{}] read [{}/{}]", params.getFollowShardId(), numConcurrentReads, maxRequiredSeqno, maxBatchOperationCount);
|
|
||||||
sendShardChangesRequest(from, maxBatchOperationCount, maxRequiredSeqno);
|
|
||||||
lastRequestedSeqno = maxRequiredSeqno;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (numConcurrentReads == 0 && hasReadBudget()) {
|
if (numConcurrentReads == 0 && hasReadBudget()) {
|
||||||
|
@ -186,7 +194,13 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
||||||
maybeUpdateMapping(response.getIndexMetadataVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response));
|
maybeUpdateMapping(response.getIndexMetadataVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Called when some operations are fetched from the leading */
|
||||||
|
protected void onOperationsFetched(Translog.Operation[] operations) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {
|
synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {
|
||||||
|
onOperationsFetched(response.getOperations());
|
||||||
leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint());
|
leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint());
|
||||||
final long newFromSeqNo;
|
final long newFromSeqNo;
|
||||||
if (response.getOperations().length == 0) {
|
if (response.getOperations().length == 0) {
|
||||||
|
|
|
@ -160,7 +160,7 @@ public class ShardChangesIT extends ESIntegTestCase {
|
||||||
|
|
||||||
public void testFollowIndex() throws Exception {
|
public void testFollowIndex() throws Exception {
|
||||||
final int numberOfPrimaryShards = randomIntBetween(1, 3);
|
final int numberOfPrimaryShards = randomIntBetween(1, 3);
|
||||||
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards,
|
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
|
||||||
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||||
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||||
ensureYellow("index1");
|
ensureYellow("index1");
|
||||||
|
@ -218,7 +218,7 @@ public class ShardChangesIT extends ESIntegTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testSyncMappings() throws Exception {
|
public void testSyncMappings() throws Exception {
|
||||||
final String leaderIndexSettings = getIndexSettings(2,
|
final String leaderIndexSettings = getIndexSettings(2, between(0, 1),
|
||||||
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||||
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||||
ensureYellow("index1");
|
ensureYellow("index1");
|
||||||
|
@ -255,7 +255,8 @@ public class ShardChangesIT extends ESIntegTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testFollowIndex_backlog() throws Exception {
|
public void testFollowIndex_backlog() throws Exception {
|
||||||
String leaderIndexSettings = getIndexSettings(3, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
String leaderIndexSettings = getIndexSettings(between(1, 5), between(0, 1),
|
||||||
|
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||||
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||||
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
|
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -306,10 +307,10 @@ public class ShardChangesIT extends ESIntegTestCase {
|
||||||
|
|
||||||
public void testFollowIndexAndCloseNode() throws Exception {
|
public void testFollowIndexAndCloseNode() throws Exception {
|
||||||
internalCluster().ensureAtLeastNumDataNodes(3);
|
internalCluster().ensureAtLeastNumDataNodes(3);
|
||||||
String leaderIndexSettings = getIndexSettings(3, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
String leaderIndexSettings = getIndexSettings(3, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||||
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||||
|
|
||||||
String followerIndexSettings = getIndexSettings(3, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true"));
|
String followerIndexSettings = getIndexSettings(3, 1, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true"));
|
||||||
assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON));
|
assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON));
|
||||||
ensureGreen("index1", "index2");
|
ensureGreen("index1", "index2");
|
||||||
|
|
||||||
|
@ -366,13 +367,14 @@ public class ShardChangesIT extends ESIntegTestCase {
|
||||||
|
|
||||||
public void testFollowIndexWithNestedField() throws Exception {
|
public void testFollowIndexWithNestedField() throws Exception {
|
||||||
final String leaderIndexSettings =
|
final String leaderIndexSettings =
|
||||||
getIndexSettingsWithNestedMapping(1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
getIndexSettingsWithNestedMapping(1, between(0, 1), singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||||
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||||
|
|
||||||
final String followerIndexSettings =
|
final String followerIndexSettings =
|
||||||
getIndexSettingsWithNestedMapping(1, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true"));
|
getIndexSettingsWithNestedMapping(1, between(0, 1), singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true"));
|
||||||
assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON));
|
assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON));
|
||||||
|
|
||||||
|
internalCluster().ensureAtLeastNumDataNodes(2);
|
||||||
ensureGreen("index1", "index2");
|
ensureGreen("index1", "index2");
|
||||||
|
|
||||||
final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
|
final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
|
||||||
|
@ -455,7 +457,8 @@ public class ShardChangesIT extends ESIntegTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testFollowIndex_lowMaxTranslogBytes() throws Exception {
|
public void testFollowIndex_lowMaxTranslogBytes() throws Exception {
|
||||||
final String leaderIndexSettings = getIndexSettings(1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
final String leaderIndexSettings = getIndexSettings(1, between(0, 1),
|
||||||
|
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||||
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||||
ensureYellow("index1");
|
ensureYellow("index1");
|
||||||
|
|
||||||
|
@ -554,15 +557,16 @@ public class ShardChangesIT extends ESIntegTestCase {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getIndexSettings(final int numberOfPrimaryShards, final Map<String, String> additionalIndexSettings) throws IOException {
|
private String getIndexSettings(final int numberOfShards, final int numberOfReplicas,
|
||||||
|
final Map<String, String> additionalIndexSettings) throws IOException {
|
||||||
final String settings;
|
final String settings;
|
||||||
try (XContentBuilder builder = jsonBuilder()) {
|
try (XContentBuilder builder = jsonBuilder()) {
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
{
|
{
|
||||||
builder.startObject("settings");
|
builder.startObject("settings");
|
||||||
{
|
{
|
||||||
builder.field("index.number_of_shards", numberOfPrimaryShards);
|
builder.field("index.number_of_shards", numberOfShards);
|
||||||
builder.field("index.number_of_replicas", 1);
|
builder.field("index.number_of_replicas", numberOfReplicas);
|
||||||
for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
|
for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
|
||||||
builder.field(additionalSetting.getKey(), additionalSetting.getValue());
|
builder.field(additionalSetting.getKey(), additionalSetting.getValue());
|
||||||
}
|
}
|
||||||
|
@ -592,7 +596,7 @@ public class ShardChangesIT extends ESIntegTestCase {
|
||||||
return settings;
|
return settings;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getIndexSettingsWithNestedMapping(final int numberOfPrimaryShards,
|
private String getIndexSettingsWithNestedMapping(final int numberOfShards, final int numberOfReplicas,
|
||||||
final Map<String, String> additionalIndexSettings) throws IOException {
|
final Map<String, String> additionalIndexSettings) throws IOException {
|
||||||
final String settings;
|
final String settings;
|
||||||
try (XContentBuilder builder = jsonBuilder()) {
|
try (XContentBuilder builder = jsonBuilder()) {
|
||||||
|
@ -600,7 +604,8 @@ public class ShardChangesIT extends ESIntegTestCase {
|
||||||
{
|
{
|
||||||
builder.startObject("settings");
|
builder.startObject("settings");
|
||||||
{
|
{
|
||||||
builder.field("index.number_of_shards", numberOfPrimaryShards);
|
builder.field("index.number_of_shards", numberOfShards);
|
||||||
|
builder.field("index.number_of_replicas", numberOfReplicas);
|
||||||
for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
|
for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
|
||||||
builder.field(additionalSetting.getKey(), additionalSetting.getValue());
|
builder.field(additionalSetting.getKey(), additionalSetting.getValue());
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,7 +42,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
||||||
|
|
||||||
public void testMultipleReaderWriter() throws Exception {
|
public void testMultipleReaderWriter() throws Exception {
|
||||||
int concurrency = randomIntBetween(2, 8);
|
int concurrency = randomIntBetween(2, 8);
|
||||||
TestRun testRun = createTestRun(0, 0, 1024);
|
TestRun testRun = createTestRun(0, 0, between(1, 1024));
|
||||||
ShardFollowNodeTask task = createShardFollowTask(concurrency, testRun);
|
ShardFollowNodeTask task = createShardFollowTask(concurrency, testRun);
|
||||||
startAndAssertAndStopTask(task, testRun);
|
startAndAssertAndStopTask(task, testRun);
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,18 +45,18 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||||
private Queue<Long> followerGlobalCheckpoints;
|
private Queue<Long> followerGlobalCheckpoints;
|
||||||
|
|
||||||
public void testCoordinateReads() {
|
public void testCoordinateReads() {
|
||||||
ShardFollowNodeTask task = createShardFollowTask(8, 8, 8, Integer.MAX_VALUE, Long.MAX_VALUE);
|
ShardFollowNodeTask task = createShardFollowTask(8, between(8, 20), between(1, 20), Integer.MAX_VALUE, Long.MAX_VALUE);
|
||||||
startTask(task, 64, -1);
|
startTask(task, 3, -1);
|
||||||
|
|
||||||
task.coordinateReads();
|
task.coordinateReads();
|
||||||
assertThat(shardChangesRequests.size(), equalTo(8));
|
assertThat(shardChangesRequests, contains(new long[]{0L, 8L})); // treat this a peak request
|
||||||
|
shardChangesRequests.clear();
|
||||||
|
task.innerHandleReadResponse(0, 5L, generateShardChangesResponse(0, 5L, 0L, 60L));
|
||||||
assertThat(shardChangesRequests, contains(new long[][]{
|
assertThat(shardChangesRequests, contains(new long[][]{
|
||||||
{0L, 8L}, {8L, 8L}, {16L, 8L}, {24L, 8L}, {32L, 8L}, {40L, 8L}, {48L, 8L}, {56L, 8L}}
|
{6L, 8L}, {14L, 8L}, {22L, 8L}, {30L, 8L}, {38L, 8L}, {46L, 8L}, {54L, 7L}}
|
||||||
));
|
));
|
||||||
|
|
||||||
ShardFollowNodeTask.Status status = task.getStatus();
|
ShardFollowNodeTask.Status status = task.getStatus();
|
||||||
assertThat(status.getNumberOfConcurrentReads(), equalTo(8));
|
assertThat(status.getNumberOfConcurrentReads(), equalTo(7));
|
||||||
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
|
assertThat(status.getLastRequestedSeqno(), equalTo(60L));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testWriteBuffer() {
|
public void testWriteBuffer() {
|
||||||
|
@ -263,12 +263,12 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||||
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
|
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
|
||||||
|
|
||||||
shardChangesRequests.clear();
|
shardChangesRequests.clear();
|
||||||
ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 31L);
|
ShardChangesAction.Response response = generateShardChangesResponse(0, 20, 0L, 31L);
|
||||||
task.innerHandleReadResponse(0L, 64L, response);
|
task.innerHandleReadResponse(0L, 63L, response);
|
||||||
|
|
||||||
assertThat(shardChangesRequests.size(), equalTo(1));
|
assertThat(shardChangesRequests.size(), equalTo(1));
|
||||||
assertThat(shardChangesRequests.get(0)[0], equalTo(32L));
|
assertThat(shardChangesRequests.get(0)[0], equalTo(21L));
|
||||||
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
|
assertThat(shardChangesRequests.get(0)[1], equalTo(43L));
|
||||||
|
|
||||||
ShardFollowNodeTask.Status status = task.getStatus();
|
ShardFollowNodeTask.Status status = task.getStatus();
|
||||||
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
|
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
|
||||||
|
@ -310,7 +310,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||||
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
|
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
|
||||||
|
|
||||||
shardChangesRequests.clear();
|
shardChangesRequests.clear();
|
||||||
task.innerHandleReadResponse(0L, 64L,
|
task.innerHandleReadResponse(0L, 63L,
|
||||||
new ShardChangesAction.Response(0, 0, new Translog.Operation[0]));
|
new ShardChangesAction.Response(0, 0, new Translog.Operation[0]));
|
||||||
|
|
||||||
assertThat(shardChangesRequests.size(), equalTo(1));
|
assertThat(shardChangesRequests.size(), equalTo(1));
|
||||||
|
@ -675,9 +675,9 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler,
|
protected void innerSendShardChangesRequest(long from, int requestBatchSize, Consumer<ShardChangesAction.Response> handler,
|
||||||
Consumer<Exception> errorHandler) {
|
Consumer<Exception> errorHandler) {
|
||||||
shardChangesRequests.add(new long[]{from, maxBatchOperationCount});
|
shardChangesRequests.add(new long[]{from, requestBatchSize});
|
||||||
Exception readFailure = ShardFollowNodeTaskTests.this.readFailures.poll();
|
Exception readFailure = ShardFollowNodeTaskTests.this.readFailures.poll();
|
||||||
if (readFailure != null) {
|
if (readFailure != null) {
|
||||||
errorHandler.accept(readFailure);
|
errorHandler.accept(readFailure);
|
||||||
|
|
|
@ -5,6 +5,8 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.ccr.action;
|
package org.elasticsearch.xpack.ccr.action;
|
||||||
|
|
||||||
|
import com.carrotsearch.hppc.LongHashSet;
|
||||||
|
import com.carrotsearch.hppc.LongSet;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.DocWriteResponse;
|
import org.elasticsearch.action.DocWriteResponse;
|
||||||
|
@ -72,6 +74,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
|
||||||
followerGroup.assertAllEqual(indexedDocIds.size() - deleteDocIds.size());
|
followerGroup.assertAllEqual(indexedDocIds.size() - deleteDocIds.size());
|
||||||
});
|
});
|
||||||
shardFollowTask.markAsCompleted();
|
shardFollowTask.markAsCompleted();
|
||||||
|
assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,6 +110,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
|
||||||
leaderGroup.assertAllEqual(docCount);
|
leaderGroup.assertAllEqual(docCount);
|
||||||
assertBusy(() -> followerGroup.assertAllEqual(docCount));
|
assertBusy(() -> followerGroup.assertAllEqual(docCount));
|
||||||
shardFollowTask.markAsCompleted();
|
shardFollowTask.markAsCompleted();
|
||||||
|
assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,12 +145,23 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
|
||||||
|
|
||||||
private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, ReplicationGroup followerGroup) {
|
private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, ReplicationGroup followerGroup) {
|
||||||
ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0),
|
ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0),
|
||||||
new ShardId("leader_index", "", 0), 1024, 1, Long.MAX_VALUE, 1, 10240,
|
new ShardId("leader_index", "", 0), between(1, 64), between(1, 8), Long.MAX_VALUE, between(1, 4), 10240,
|
||||||
TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap());
|
TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap());
|
||||||
|
|
||||||
BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task);
|
BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task);
|
||||||
AtomicBoolean stopped = new AtomicBoolean(false);
|
AtomicBoolean stopped = new AtomicBoolean(false);
|
||||||
|
LongSet fetchOperations = new LongHashSet();
|
||||||
return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler) {
|
return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler) {
|
||||||
|
@Override
|
||||||
|
protected synchronized void onOperationsFetched(Translog.Operation[] operations) {
|
||||||
|
super.onOperationsFetched(operations);
|
||||||
|
for (Translog.Operation operation : operations) {
|
||||||
|
if (fetchOperations.add(operation.seqNo()) == false) {
|
||||||
|
throw new AssertionError("Operation [" + operation + " ] was fetched already");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
|
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
|
||||||
// noop, as mapping updates are not tested
|
// noop, as mapping updates are not tested
|
||||||
|
@ -210,6 +225,13 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void assertConsistentHistoryBetweenLeaderAndFollower(ReplicationGroup leader, ReplicationGroup follower) throws IOException {
|
||||||
|
int totalOps = leader.getPrimary().estimateNumberOfHistoryOperations("test", 0);
|
||||||
|
for (IndexShard followingShard : follower) {
|
||||||
|
assertThat(followingShard.estimateNumberOfHistoryOperations("test", 0), equalTo(totalOps));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
class CCRAction extends ReplicationAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> {
|
class CCRAction extends ReplicationAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> {
|
||||||
|
|
||||||
CCRAction(BulkShardOperationsRequest request, ActionListener<BulkShardOperationsResponse> listener, ReplicationGroup group) {
|
CCRAction(BulkShardOperationsRequest request, ActionListener<BulkShardOperationsResponse> listener, ReplicationGroup group) {
|
||||||
|
|
Loading…
Reference in New Issue