[CCR] Added write buffer size limit (#34797)

This limit is based on the size in bytes of the operations in the write buffer. If this limit is exceeded then no more read operations will be coordinated until the size in bytes of the write buffer has dropped below the configured write buffer size limit.

Renamed existing `max_write_buffer_size` to ``max_write_buffer_count` to indicate that limit is count based.

Closes #34705
This commit is contained in:
Martijn van Groningen 2018-10-24 23:48:49 +02:00 committed by GitHub
parent d94406a68a
commit 6fe0e62b7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 342 additions and 97 deletions

View File

@ -328,6 +328,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
followRequest.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches());
followRequest.setMaxBatchSize(pattern.getMaxBatchSize());
followRequest.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches());
followRequest.setMaxWriteBufferCount(pattern.getMaxWriteBufferCount());
followRequest.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
followRequest.setMaxRetryDelay(pattern.getMaxRetryDelay());
followRequest.setPollTimeout(pattern.getPollTimeout());

View File

@ -85,6 +85,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private long numberOfOperationsIndexed = 0;
private long lastFetchTime = -1;
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));
private long bufferSizeInBytes = 0;
private final LinkedHashMap<Long, Tuple<AtomicInteger, ElasticsearchException>> fetchExceptions;
private volatile ElasticsearchException fatalException;
@ -183,8 +184,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
params.getFollowShardId(), numConcurrentReads);
return false;
}
if (buffer.size() > params.getMaxWriteBufferSize()) {
LOGGER.trace("{} no new reads, buffer limit has been reached [{}]", params.getFollowShardId(), buffer.size());
if (bufferSizeInBytes >= params.getMaxWriteBufferSize().getBytes()) {
LOGGER.trace("{} no new reads, buffer size limit has been reached [{}]", params.getFollowShardId(), bufferSizeInBytes);
return false;
}
if (buffer.size() > params.getMaxWriteBufferCount()) {
LOGGER.trace("{} no new reads, buffer count limit has been reached [{}]", params.getFollowShardId(), buffer.size());
return false;
}
return true;
@ -208,6 +213,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
break;
}
}
bufferSizeInBytes -= sumEstimatedSize;
numConcurrentWrites++;
LOGGER.trace("{}[{}] write [{}/{}] [{}]", params.getFollowShardId(), numConcurrentWrites, ops.get(0).seqNo(),
ops.get(ops.size() - 1).seqNo(), ops.size());
@ -281,7 +287,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
} else {
assert response.getOperations()[0].seqNo() == from :
"first operation is not what we asked for. From is [" + from + "], got " + response.getOperations()[0];
buffer.addAll(Arrays.asList(response.getOperations()));
List<Translog.Operation> operations = Arrays.asList(response.getOperations());
long operationsSize = operations.stream()
.mapToLong(Translog.Operation::estimateSize)
.sum();
buffer.addAll(operations);
bufferSizeInBytes += operationsSize;
final long maxSeqNo = response.getOperations()[response.getOperations().length - 1].seqNo();
assert maxSeqNo ==
Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::seqNo).max().getAsLong();
@ -455,6 +466,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
numConcurrentReads,
numConcurrentWrites,
buffer.size(),
bufferSizeInBytes,
currentMappingVersion,
totalFetchTimeMillis,
totalFetchTookTimeMillis,

View File

@ -48,6 +48,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
public static final ParseField MAX_CONCURRENT_READ_BATCHES = new ParseField("max_concurrent_read_batches");
public static final ParseField MAX_BATCH_SIZE = new ParseField("max_batch_size");
public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches");
public static final ParseField MAX_WRITE_BUFFER_COUNT = new ParseField("max_write_buffer_count");
public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay");
public static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout");
@ -56,7 +57,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
private static ConstructingObjectParser<ShardFollowTask, Void> PARSER = new ConstructingObjectParser<>(NAME,
(a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]),
new ShardId((String) a[4], (String) a[5], (int) a[6]), (int) a[7], (int) a[8], (ByteSizeValue) a[9],
(int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (Map<String, String>) a[14]));
(int) a[10], (int) a[11], (ByteSizeValue) a[12], (TimeValue) a[13], (TimeValue) a[14], (Map<String, String>) a[15]));
static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REMOTE_CLUSTER_FIELD);
@ -74,7 +75,12 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
MAX_BATCH_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_CONCURRENT_WRITE_BATCHES);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_BUFFER_SIZE);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_BUFFER_COUNT);
PARSER.declareField(
ConstructingObjectParser.constructorArg(),
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()),
MAX_WRITE_BUFFER_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareField(ConstructingObjectParser.constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()),
MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
@ -91,7 +97,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
private final int maxConcurrentReadBatches;
private final ByteSizeValue maxBatchSize;
private final int maxConcurrentWriteBatches;
private final int maxWriteBufferSize;
private final int maxWriteBufferCount;
private final ByteSizeValue maxWriteBufferSize;
private final TimeValue maxRetryDelay;
private final TimeValue pollTimeout;
private final Map<String, String> headers;
@ -104,7 +111,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
final int maxConcurrentReadBatches,
final ByteSizeValue maxBatchSize,
final int maxConcurrentWriteBatches,
final int maxWriteBufferSize,
final int maxWriteBufferCount,
final ByteSizeValue maxWriteBufferSize,
final TimeValue maxRetryDelay,
final TimeValue pollTimeout,
final Map<String, String> headers) {
@ -115,6 +123,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
this.maxConcurrentReadBatches = maxConcurrentReadBatches;
this.maxBatchSize = maxBatchSize;
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
this.maxWriteBufferCount = maxWriteBufferCount;
this.maxWriteBufferSize = maxWriteBufferSize;
this.maxRetryDelay = maxRetryDelay;
this.pollTimeout = pollTimeout;
@ -129,7 +138,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
this.maxConcurrentReadBatches = in.readVInt();
this.maxBatchSize = new ByteSizeValue(in);
this.maxConcurrentWriteBatches = in.readVInt();
this.maxWriteBufferSize = in.readVInt();
this.maxWriteBufferCount = in.readVInt();
this.maxWriteBufferSize = new ByteSizeValue(in);
this.maxRetryDelay = in.readTimeValue();
this.pollTimeout = in.readTimeValue();
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
@ -159,7 +169,11 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
return maxConcurrentWriteBatches;
}
public int getMaxWriteBufferSize() {
public int getMaxWriteBufferCount() {
return maxWriteBufferCount;
}
public ByteSizeValue getMaxWriteBufferSize() {
return maxWriteBufferSize;
}
@ -197,7 +211,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
out.writeVInt(maxConcurrentReadBatches);
maxBatchSize.writeTo(out);
out.writeVInt(maxConcurrentWriteBatches);
out.writeVInt(maxWriteBufferSize);
out.writeVInt(maxWriteBufferCount);
maxWriteBufferSize.writeTo(out);
out.writeTimeValue(maxRetryDelay);
out.writeTimeValue(pollTimeout);
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
@ -221,7 +236,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches);
builder.field(MAX_BATCH_SIZE.getPreferredName(), maxBatchSize.getStringRep());
builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount);
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep());
builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep());
builder.field(HEADERS.getPreferredName(), headers);
@ -240,7 +256,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
maxConcurrentReadBatches == that.maxConcurrentReadBatches &&
maxConcurrentWriteBatches == that.maxConcurrentWriteBatches &&
maxBatchSize.equals(that.maxBatchSize) &&
maxWriteBufferSize == that.maxWriteBufferSize &&
maxWriteBufferCount == that.maxWriteBufferCount &&
maxWriteBufferSize.equals(that.maxWriteBufferSize) &&
Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
Objects.equals(pollTimeout, that.pollTimeout) &&
Objects.equals(headers, that.headers);
@ -256,6 +273,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
maxConcurrentReadBatches,
maxConcurrentWriteBatches,
maxBatchSize,
maxWriteBufferCount,
maxWriteBufferSize,
maxRetryDelay,
pollTimeout,

View File

@ -164,6 +164,7 @@ public class TransportPutAutoFollowPatternAction extends
request.getMaxConcurrentReadBatches(),
request.getMaxBatchSize(),
request.getMaxConcurrentWriteBatches(),
request.getMaxWriteBufferCount(),
request.getMaxWriteBufferSize(),
request.getMaxRetryDelay(),
request.getPollTimeout());

View File

@ -55,7 +55,8 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo
static final ByteSizeValue DEFAULT_MAX_BATCH_SIZE = new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES);
private static final TimeValue DEFAULT_MAX_RETRY_DELAY = new TimeValue(500);
private static final int DEFAULT_MAX_CONCURRENT_WRITE_BATCHES = 9;
private static final int DEFAULT_MAX_WRITE_BUFFER_SIZE = 10240;
private static final int DEFAULT_MAX_WRITE_BUFFER_COUNT = Integer.MAX_VALUE;
private static final ByteSizeValue DEFAULT_MAX_WRITE_BUFFER_SIZE = new ByteSizeValue(512, ByteSizeUnit.MB);
private static final int DEFAULT_MAX_BATCH_OPERATION_COUNT = 5120;
private static final int DEFAULT_MAX_CONCURRENT_READ_BATCHES = 12;
static final TimeValue DEFAULT_POLL_TIMEOUT = TimeValue.timeValueMinutes(1);
@ -259,7 +260,14 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo
maxConcurrentWriteBatches = DEFAULT_MAX_CONCURRENT_WRITE_BATCHES;
}
int maxWriteBufferSize;
int maxWriteBufferCount;
if (request.getMaxWriteBufferCount() != null) {
maxWriteBufferCount = request.getMaxWriteBufferCount();
} else {
maxWriteBufferCount = DEFAULT_MAX_WRITE_BUFFER_COUNT;
}
ByteSizeValue maxWriteBufferSize;
if (request.getMaxWriteBufferSize() != null) {
maxWriteBufferSize = request.getMaxWriteBufferSize();
} else {
@ -277,6 +285,7 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo
maxConcurrentReadBatches,
maxBatchSize,
maxConcurrentWriteBatches,
maxWriteBufferCount,
maxWriteBufferSize,
maxRetryDelay,
pollTimeout,

View File

@ -46,6 +46,8 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -65,6 +67,7 @@ import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVID
import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public abstract class CcrIntegTestCase extends ESTestCase {
@ -103,6 +106,7 @@ public abstract class CcrIntegTestCase extends ESTestCase {
@After
public void afterTest() throws Exception {
ensureEmptyWriteBuffers();
String masterNode = clusterGroup.followerCluster.getMasterName();
ClusterService clusterService = clusterGroup.followerCluster.getInstance(ClusterService.class, masterNode);
removeCCRRelatedMetadataFromClusterState(clusterService);
@ -263,6 +267,18 @@ public abstract class CcrIntegTestCase extends ESTestCase {
return actionGet;
}
protected void ensureEmptyWriteBuffers() throws Exception {
assertBusy(() -> {
FollowStatsAction.StatsResponses statsResponses =
leaderClient().execute(FollowStatsAction.INSTANCE, new FollowStatsAction.StatsRequest()).actionGet();
for (FollowStatsAction.StatsResponse statsResponse : statsResponses.getStatsResponses()) {
ShardFollowNodeTaskStatus status = statsResponse.status();
assertThat(status.numberOfQueuedWrites(), equalTo(0));
assertThat(status.bufferSize(), equalTo(0L));
}
});
}
static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() {

View File

@ -16,6 +16,8 @@ import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.junit.After;
@ -26,6 +28,7 @@ import java.util.Collections;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.CcrIntegTestCase.removeCCRRelatedMetadataFromClusterState;
import static org.hamcrest.Matchers.equalTo;
public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase {
@ -80,4 +83,16 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase {
return request;
}
protected void ensureEmptyWriteBuffers() throws Exception {
assertBusy(() -> {
FollowStatsAction.StatsResponses statsResponses =
client().execute(FollowStatsAction.INSTANCE, new FollowStatsAction.StatsRequest()).actionGet();
for (FollowStatsAction.StatsResponse statsResponse : statsResponses.getStatsResponses()) {
ShardFollowNodeTaskStatus status = statsResponse.status();
assertThat(status.numberOfQueuedWrites(), equalTo(0));
assertThat(status.bufferSize(), equalTo(0L));
}
});
}
}

View File

@ -126,7 +126,7 @@ public class AutoFollowIT extends CcrIntegTestCase {
// Need to set this, because following an index in the same cluster
request.setFollowIndexNamePattern("copy-{{leader_index}}");
if (randomBoolean()) {
request.setMaxWriteBufferSize(randomIntBetween(0, Integer.MAX_VALUE));
request.setMaxWriteBufferCount(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxConcurrentReadBatches(randomIntBetween(0, Integer.MAX_VALUE));
@ -137,6 +137,9 @@ public class AutoFollowIT extends CcrIntegTestCase {
if (randomBoolean()) {
request.setMaxBatchOperationCount(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong()));
}
if (randomBoolean()) {
request.setMaxBatchSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES));
}
@ -157,6 +160,9 @@ public class AutoFollowIT extends CcrIntegTestCase {
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTasksMetaData.tasks().iterator().next().getParams();
assertThat(shardFollowTask.getLeaderShardId().getIndexName(), equalTo("logs-201901"));
assertThat(shardFollowTask.getFollowShardId().getIndexName(), equalTo("copy-logs-201901"));
if (request.getMaxWriteBufferCount() != null) {
assertThat(shardFollowTask.getMaxWriteBufferCount(), equalTo(request.getMaxWriteBufferCount()));
}
if (request.getMaxWriteBufferSize() != null) {
assertThat(shardFollowTask.getMaxWriteBufferSize(), equalTo(request.getMaxWriteBufferSize()));
}

View File

@ -49,6 +49,7 @@ public class AutoFollowMetadataTests extends AbstractSerializingTestCase<AutoFol
new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
new ByteSizeValue(randomNonNegativeLong()),
TimeValue.timeValueMillis(500),
TimeValue.timeValueMillis(500));
configs.put(Integer.toString(i), autoFollowPattern);

View File

@ -148,7 +148,7 @@ public class CcrLicenseIT extends CcrSingleNodeTestCase {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("test_alias", Collections.singletonList("logs-*"),
null, null, null, null, null, null, null, null);
null, null, null, null, null, null, null, null, null);
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(
Collections.singletonMap("test_alias", autoFollowPattern),
Collections.emptyMap(),

View File

@ -255,7 +255,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
followRequest.getFollowRequest().setMaxBatchOperationCount(maxReadSize);
followRequest.getFollowRequest().setMaxConcurrentReadBatches(randomIntBetween(2, 10));
followRequest.getFollowRequest().setMaxConcurrentWriteBatches(randomIntBetween(2, 10));
followRequest.getFollowRequest().setMaxWriteBufferSize(randomIntBetween(1024, 10240));
followRequest.getFollowRequest().setMaxWriteBufferCount(randomIntBetween(1024, 10240));
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
atLeastDocsIndexed(leaderClient(), "index1", numDocsIndexed);

View File

@ -65,6 +65,7 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
assertThat(client().prepareSearch("follower").get().getHits().totalHits,
equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs));
});
ensureEmptyWriteBuffers();
}
public void testDoNotCreateFollowerIfLeaderDoesNotHaveSoftDeletes() throws Exception {

View File

@ -57,7 +57,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
.build();
AutoFollowPattern autoFollowPattern =
new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null);
new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
@ -121,7 +121,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
AutoFollowPattern autoFollowPattern =
new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null);
new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
@ -179,7 +179,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
.build();
AutoFollowPattern autoFollowPattern =
new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null);
new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
@ -242,7 +242,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
.build();
AutoFollowPattern autoFollowPattern =
new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null);
new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
@ -296,7 +296,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
public void testGetLeaderIndicesToFollow() {
AutoFollowPattern autoFollowPattern =
new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null);
new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null, null);
Map<String, Map<String, String>> headers = new HashMap<>();
ClusterState followerState = ClusterState.builder(new ClusterName("remote"))
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
@ -341,15 +341,15 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
public void testGetFollowerIndexName() {
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null,
null, null, null, null, null, null);
null, null, null, null, null, null, null);
assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("metrics-0"));
autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-metrics-0", null, null,
null, null, null, null, null);
null, null, null, null, null, null);
assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0"));
autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-{{leader_index}}", null,
null, null, null, null, null, null);
null, null, null, null, null, null, null);
assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0"));
}

View File

@ -37,6 +37,7 @@ public class GetAutoFollowPatternResponseTests extends AbstractStreamableTestCas
new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
new ByteSizeValue(randomNonNegativeLong()),
TimeValue.timeValueMillis(500),
TimeValue.timeValueMillis(500));
patterns.put(randomAlphaOfLength(4), autoFollowPattern);

View File

@ -66,7 +66,10 @@ public class PutAutoFollowPatternRequestTests extends AbstractStreamableXContent
request.setMaxBatchSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES));
}
if (randomBoolean()) {
request.setMaxWriteBufferSize(randomIntBetween(0, Integer.MAX_VALUE));
request.setMaxWriteBufferCount(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong()));
}
return request;
}

View File

@ -57,7 +57,10 @@ public class ResumeFollowActionRequestTests extends AbstractStreamableXContentTe
request.setMaxBatchSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES));
}
if (randomBoolean()) {
request.setMaxWriteBufferSize(randomIntBetween(1, Integer.MAX_VALUE));
request.setMaxWriteBufferCount(randomIntBetween(1, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES));
}
if (randomBoolean()) {
request.setMaxRetryDelay(TimeValue.timeValueMillis(500));

View File

@ -7,6 +7,8 @@ package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.shard.ShardId;
@ -83,6 +85,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE,
concurrency,
10240,
new ByteSizeValue(512, ByteSizeUnit.MB),
TimeValue.timeValueMillis(10),
TimeValue.timeValueMillis(10),
Collections.emptyMap()

View File

@ -57,6 +57,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomReadExceptions(),
randomLong(),
randomBoolean() ? new ElasticsearchException("fatal error") : null);

View File

@ -63,7 +63,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
private Queue<Integer> responseSizes;
public void testCoordinateReads() {
ShardFollowNodeTask task = createShardFollowTask(8, between(8, 20), between(1, 20), Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task = createShardFollowTask(8, between(8, 20), between(1, 20), Integer.MAX_VALUE,
new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 3, -1);
task.coordinateReads();
assertThat(shardChangesRequests, contains(new long[]{0L, 8L})); // treat this a peak request
@ -77,9 +78,10 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(status.lastRequestedSeqNo(), equalTo(60L));
}
public void testWriteBuffer() {
public void testMaxWriteBufferCount() {
// Need to set concurrentWrites to 0, other the write buffer gets flushed immediately:
ShardFollowNodeTask task = createShardFollowTask(64, 1, 0, 32, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 0, 32, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 63, -1);
task.coordinateReads();
@ -90,7 +92,30 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
shardChangesRequests.clear();
// Also invokes the coordinatesReads() method:
task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 128L));
assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer is full
assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer count limit has been reached
ShardFollowNodeTaskStatus status = task.getStatus();
assertThat(status.numberOfConcurrentReads(), equalTo(0));
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(128L));
}
public void testMaxWriteBufferSize() {
// Need to set concurrentWrites to 0, other the write buffer gets flushed immediately:
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 0, Integer.MAX_VALUE, new ByteSizeValue(1, ByteSizeUnit.KB), Long.MAX_VALUE);
startTask(task, 63, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
shardChangesRequests.clear();
// Also invokes the coordinatesReads() method:
task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 128L));
assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer size limit has been reached
ShardFollowNodeTaskStatus status = task.getStatus();
assertThat(status.numberOfConcurrentReads(), equalTo(0));
@ -100,7 +125,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testMaxConcurrentReads() {
ShardFollowNodeTask task = createShardFollowTask(8, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(8, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 64, -1);
task.coordinateReads();
@ -114,7 +140,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testTaskCancelled() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 64, -1);
task.coordinateReads();
@ -131,7 +158,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testTaskCancelledAfterReadLimitHasBeenReached() {
ShardFollowNodeTask task = createShardFollowTask(16, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(16, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 31, -1);
task.coordinateReads();
@ -155,7 +183,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testTaskCancelledAfterWriteBufferLimitHasBeenReached() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, 32, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 1, 32, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 64, -1);
task.coordinateReads();
@ -179,7 +208,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testReceiveRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 63, -1);
int max = randomIntBetween(1, 30);
@ -229,7 +259,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testEmptyShardChangesResponseShouldClearFetchException() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, -1, -1);
readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
@ -258,7 +289,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testReceiveTimeout() {
final ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
final ShardFollowNodeTask task =
createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 63, -1);
final int numberOfTimeouts = randomIntBetween(1, 32);
@ -322,7 +354,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testReceiveNonRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 63, -1);
Exception failure = new RuntimeException("replication failed");
@ -362,7 +395,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testHandleReadResponse() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 63, -1);
task.coordinateReads();
@ -383,7 +417,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testReceiveLessThanRequested() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 63, -1);
task.coordinateReads();
@ -407,7 +442,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testCancelAndReceiveLessThanRequested() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 63, -1);
task.coordinateReads();
@ -430,7 +466,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testReceiveNothingExpectedSomething() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 63, -1);
task.coordinateReads();
@ -453,7 +490,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testMappingUpdate() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 63, -1);
mappingVersions.add(1L);
@ -474,7 +512,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testMappingUpdateRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 63, -1);
int max = randomIntBetween(1, 30);
@ -499,7 +538,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testMappingUpdateNonRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 63, -1);
mappingUpdateFailures.add(new RuntimeException());
@ -518,7 +558,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testCoordinateWrites() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 63, -1);
task.coordinateReads();
@ -542,7 +583,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testMaxConcurrentWrites() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 2, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 2, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
@ -554,7 +596,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
ShardFollowNodeTaskStatus status = task.getStatus();
assertThat(status.numberOfConcurrentWrites(), equalTo(2));
task = createShardFollowTask(64, 1, 4, Integer.MAX_VALUE, Long.MAX_VALUE);
task = createShardFollowTask(64, 1, 4, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
response = generateShardChangesResponse(0, 256, 0L, 256L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
@ -570,7 +612,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testMaxBatchOperationCount() {
ShardFollowNodeTask task = createShardFollowTask(8, 1, 32, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(8, 1, 32, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L);
// Also invokes coordinatesWrites()
task.innerHandleReadResponse(0L, 64L, response);
@ -586,7 +629,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 63, -1);
task.coordinateReads();
@ -614,7 +658,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testNonRetryableError() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 63, -1);
task.coordinateReads();
@ -636,7 +681,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testMaxBatchBytesLimit() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 128, Integer.MAX_VALUE, 1L);
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 128, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), 1L);
startTask(task, 64, -1);
task.coordinateReads();
@ -652,7 +698,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
public void testHandleWriteResponse() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
ShardFollowNodeTask task =
createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE);
startTask(task, 63, -1);
task.coordinateReads();
@ -698,7 +745,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
private ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount,
int maxConcurrentReadBatches,
int maxConcurrentWriteBatches,
int bufferWriteLimit,
int maxWriteBufferCount,
ByteSizeValue maxWriteBufferSize,
long maxBatchSizeInBytes) {
AtomicBoolean stopped = new AtomicBoolean(false);
ShardFollowTask params = new ShardFollowTask(
@ -709,7 +757,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
maxConcurrentReadBatches,
new ByteSizeValue(maxBatchSizeInBytes, ByteSizeUnit.BYTES),
maxConcurrentWriteBatches,
bufferWriteLimit,
maxWriteBufferCount,
maxWriteBufferSize,
TimeValue.ZERO,
TimeValue.ZERO,
Collections.emptyMap()

View File

@ -370,7 +370,9 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
between(1, 64),
between(1, 8),
new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES),
between(1, 4), 10240,
between(1, 4),
10240,
new ByteSizeValue(512, ByteSizeUnit.MB),
TimeValue.timeValueMillis(10),
TimeValue.timeValueMillis(10),
Collections.emptyMap()

View File

@ -34,6 +34,7 @@ public class ShardFollowTaskTests extends AbstractSerializingTestCase<ShardFollo
new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES),
randomIntBetween(1, Integer.MAX_VALUE),
randomIntBetween(1, Integer.MAX_VALUE),
new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES),
TimeValue.parseTimeValue(randomTimeValue(), ""),
TimeValue.parseTimeValue(randomTimeValue(), ""),
randomBoolean() ? null : Collections.singletonMap("key", "value")

View File

@ -50,6 +50,7 @@ public class StatsResponsesTests extends AbstractStreamableTestCase<FollowStatsA
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
Collections.emptyNavigableMap(),
randomLong(),
randomBoolean() ? new ElasticsearchException("fatal error") : null);

View File

@ -33,7 +33,7 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase {
List<String> existingPatterns = new ArrayList<>();
existingPatterns.add("transactions-*");
existingAutoFollowPatterns.put("name1",
new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null));
new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null));
List<String> existingUUIDS = new ArrayList<>();
existingUUIDS.add("_val");
@ -44,7 +44,7 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase {
List<String> existingPatterns = new ArrayList<>();
existingPatterns.add("logs-*");
existingAutoFollowPatterns.put("name2",
new AutoFollowPattern("asia_cluster", existingPatterns, null, null, null, null, null, null, null, null));
new AutoFollowPattern("asia_cluster", existingPatterns, null, null, null, null, null, null, null, null, null));
List<String> existingUUIDS = new ArrayList<>();
existingUUIDS.add("_val");
@ -78,7 +78,7 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase {
List<String> existingPatterns = new ArrayList<>();
existingPatterns.add("transactions-*");
existingAutoFollowPatterns.put("name1",
new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null));
new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null));
existingHeaders.put("key", Collections.singletonMap("key", "val"));
}
ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster"))

View File

@ -23,10 +23,10 @@ public class TransportGetAutoFollowPatternActionTests extends ESTestCase {
public void testGetAutoFollowPattern() {
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("name1",
new AutoFollowPattern("test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null));
patterns.put("name2",
new AutoFollowPattern("test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null));
patterns.put("name1", new AutoFollowPattern(
"test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null, null));
patterns.put("name2", new AutoFollowPattern(
"test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null, null));
MetaData metaData = MetaData.builder()
.putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))
.build();

View File

@ -103,7 +103,7 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase {
List<String> existingPatterns = new ArrayList<>();
existingPatterns.add("transactions-*");
existingAutoFollowPatterns.put("name1",
new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null));
new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null));
Map<String, List<String>> existingAlreadyFollowedIndexUUIDS = new HashMap<>();
List<String> existingUUIDS = new ArrayList<>();
existingUUIDS.add("_val");

View File

@ -11,6 +11,8 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@ -81,6 +83,7 @@ public class TransportUnfollowActionTests extends ESTestCase {
TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE,
1,
10240,
new ByteSizeValue(512, ByteSizeUnit.MB),
TimeValue.timeValueMillis(10),
TimeValue.timeValueMillis(10),
Collections.emptyMap()

View File

@ -92,6 +92,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
final int numberOfConcurrentReads = randomIntBetween(1, Integer.MAX_VALUE);
final int numberOfConcurrentWrites = randomIntBetween(1, Integer.MAX_VALUE);
final int numberOfQueuedWrites = randomIntBetween(0, Integer.MAX_VALUE);
final long bufferSize = randomNonNegativeLong();
final long mappingVersion = randomIntBetween(0, Integer.MAX_VALUE);
final long totalFetchTimeMillis = randomLongBetween(0, 4096);
final long totalFetchTookTimeMillis = randomLongBetween(0, 4096);
@ -121,6 +122,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
numberOfConcurrentReads,
numberOfConcurrentWrites,
numberOfQueuedWrites,
bufferSize,
mappingVersion,
totalFetchTimeMillis,
totalFetchTookTimeMillis,
@ -166,6 +168,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
+ "\"number_of_concurrent_reads\":" + numberOfConcurrentReads + ","
+ "\"number_of_concurrent_writes\":" + numberOfConcurrentWrites + ","
+ "\"number_of_queued_writes\":" + numberOfQueuedWrites + ","
+ "\"buffer_size_in_bytes\":" + bufferSize + ","
+ "\"mapping_version\":" + mappingVersion + ","
+ "\"total_fetch_time_millis\":" + totalFetchTimeMillis + ","
+ "\"total_fetch_remote_time_millis\":" + totalFetchTookTimeMillis + ","
@ -210,6 +213,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
1,
1,
1,
1,
100,
50,
10,
@ -234,7 +238,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
for (Map.Entry<String, Object> entry : serializedStatus.entrySet()) {
String fieldName = entry.getKey();
Map<?, ?> fieldMapping = (Map<?, ?>) followStatsMapping.get(fieldName);
assertThat(fieldMapping, notNullValue());
assertThat("no field mapping for field [" + fieldName + "]", fieldMapping, notNullValue());
Object fieldValue = entry.getValue();
String fieldType = (String) fieldMapping.get("type");

View File

@ -182,6 +182,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
public static final ParseField MAX_CONCURRENT_READ_BATCHES = new ParseField("max_concurrent_read_batches");
public static final ParseField MAX_BATCH_SIZE = new ParseField("max_batch_size");
public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches");
public static final ParseField MAX_WRITE_BUFFER_COUNT = new ParseField("max_write_buffer_count");
public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay");
public static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout");
@ -190,8 +191,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
private static final ConstructingObjectParser<AutoFollowPattern, Void> PARSER =
new ConstructingObjectParser<>("auto_follow_pattern",
args -> new AutoFollowPattern((String) args[0], (List<String>) args[1], (String) args[2], (Integer) args[3],
(Integer) args[4], (ByteSizeValue) args[5], (Integer) args[6], (Integer) args[7], (TimeValue) args[8],
(TimeValue) args[9]));
(Integer) args[4], (ByteSizeValue) args[5], (Integer) args[6], (Integer) args[7], (ByteSizeValue) args[8],
(TimeValue) args[9], (TimeValue) args[10]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), REMOTE_CLUSTER_FIELD);
@ -205,7 +206,12 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
MAX_BATCH_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_CONCURRENT_WRITE_BATCHES);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_WRITE_BUFFER_SIZE);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_WRITE_BUFFER_COUNT);
PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()),
MAX_WRITE_BUFFER_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()),
MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
@ -221,7 +227,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
private final Integer maxConcurrentReadBatches;
private final ByteSizeValue maxBatchSize;
private final Integer maxConcurrentWriteBatches;
private final Integer maxWriteBufferSize;
private final Integer maxWriteBufferCount;
private final ByteSizeValue maxWriteBufferSize;
private final TimeValue maxRetryDelay;
private final TimeValue pollTimeout;
@ -232,8 +239,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
Integer maxConcurrentReadBatches,
ByteSizeValue maxBatchSize,
Integer maxConcurrentWriteBatches,
Integer maxWriteBufferSize,
TimeValue maxRetryDelay,
Integer maxWriteBufferCount,
ByteSizeValue maxWriteBufferSize, TimeValue maxRetryDelay,
TimeValue pollTimeout) {
this.remoteCluster = remoteCluster;
this.leaderIndexPatterns = leaderIndexPatterns;
@ -242,6 +249,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
this.maxConcurrentReadBatches = maxConcurrentReadBatches;
this.maxBatchSize = maxBatchSize;
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
this.maxWriteBufferCount = maxWriteBufferCount;
this.maxWriteBufferSize = maxWriteBufferSize;
this.maxRetryDelay = maxRetryDelay;
this.pollTimeout = pollTimeout;
@ -255,7 +263,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
maxConcurrentReadBatches = in.readOptionalVInt();
maxBatchSize = in.readOptionalWriteable(ByteSizeValue::new);
maxConcurrentWriteBatches = in.readOptionalVInt();
maxWriteBufferSize = in.readOptionalVInt();
maxWriteBufferCount = in.readOptionalVInt();
maxWriteBufferSize = in.readOptionalWriteable(ByteSizeValue::new);
maxRetryDelay = in.readOptionalTimeValue();
pollTimeout = in.readOptionalTimeValue();
}
@ -296,7 +305,11 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
return maxConcurrentWriteBatches;
}
public Integer getMaxWriteBufferSize() {
public Integer getMaxWriteBufferCount() {
return maxWriteBufferCount;
}
public ByteSizeValue getMaxWriteBufferSize() {
return maxWriteBufferSize;
}
@ -317,7 +330,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
out.writeOptionalVInt(maxConcurrentReadBatches);
out.writeOptionalWriteable(maxBatchSize);
out.writeOptionalVInt(maxConcurrentWriteBatches);
out.writeOptionalVInt(maxWriteBufferSize);
out.writeOptionalVInt(maxWriteBufferCount);
out.writeOptionalWriteable(maxWriteBufferSize);
out.writeOptionalTimeValue(maxRetryDelay);
out.writeOptionalTimeValue(pollTimeout);
}
@ -341,8 +355,11 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
if (maxConcurrentWriteBatches != null) {
builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
}
if (maxWriteBufferCount != null){
builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount);
}
if (maxWriteBufferSize != null) {
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep());
}
if (maxRetryDelay != null) {
builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay);
@ -370,6 +387,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
Objects.equals(maxConcurrentReadBatches, that.maxConcurrentReadBatches) &&
Objects.equals(maxBatchSize, that.maxBatchSize) &&
Objects.equals(maxConcurrentWriteBatches, that.maxConcurrentWriteBatches) &&
Objects.equals(maxWriteBufferCount, that.maxWriteBufferCount) &&
Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) &&
Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
Objects.equals(pollTimeout, that.pollTimeout);
@ -385,6 +403,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
maxConcurrentReadBatches,
maxBatchSize,
maxConcurrentWriteBatches,
maxWriteBufferCount,
maxWriteBufferSize,
maxRetryDelay,
pollTimeout);

View File

@ -46,6 +46,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
private static final ParseField NUMBER_OF_CONCURRENT_READS_FIELD = new ParseField("number_of_concurrent_reads");
private static final ParseField NUMBER_OF_CONCURRENT_WRITES_FIELD = new ParseField("number_of_concurrent_writes");
private static final ParseField NUMBER_OF_QUEUED_WRITES_FIELD = new ParseField("number_of_queued_writes");
private static final ParseField BUFFER_SIZE_IN_BYTES_FIELD = new ParseField("buffer_size_in_bytes");
private static final ParseField MAPPING_VERSION_FIELD = new ParseField("mapping_version");
private static final ParseField TOTAL_FETCH_TIME_MILLIS_FIELD = new ParseField("total_fetch_time_millis");
private static final ParseField TOTAL_FETCH_REMOTE_TIME_MILLIS_FIELD = new ParseField("total_fetch_remote_time_millis");
@ -89,12 +90,13 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
(long) args[20],
(long) args[21],
(long) args[22],
(long) args[23],
new TreeMap<>(
((List<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>>) args[23])
((List<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>>) args[24])
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
(long) args[24],
(ElasticsearchException) args[25]));
(long) args[25],
(ElasticsearchException) args[26]));
public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry";
@ -116,6 +118,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD);
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD);
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUFFER_SIZE_IN_BYTES_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAPPING_VERSION_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_REMOTE_TIME_MILLIS_FIELD);
@ -219,6 +222,12 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
return numberOfQueuedWrites;
}
private final long bufferSize;
public long bufferSize() {
return bufferSize;
}
private final long mappingVersion;
public long mappingVersion() {
@ -316,6 +325,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
final int numberOfConcurrentReads,
final int numberOfConcurrentWrites,
final int numberOfQueuedWrites,
final long bufferSize,
final long mappingVersion,
final long totalFetchTimeMillis,
final long totalFetchRemoteTimeMillis,
@ -342,6 +352,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
this.numberOfConcurrentReads = numberOfConcurrentReads;
this.numberOfConcurrentWrites = numberOfConcurrentWrites;
this.numberOfQueuedWrites = numberOfQueuedWrites;
this.bufferSize = bufferSize;
this.mappingVersion = mappingVersion;
this.totalFetchTimeMillis = totalFetchTimeMillis;
this.totalFetchRemoteTimeMillis = totalFetchRemoteTimeMillis;
@ -371,6 +382,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
this.numberOfConcurrentReads = in.readVInt();
this.numberOfConcurrentWrites = in.readVInt();
this.numberOfQueuedWrites = in.readVInt();
this.bufferSize = in.readVLong();
this.mappingVersion = in.readVLong();
this.totalFetchTimeMillis = in.readVLong();
this.totalFetchRemoteTimeMillis = in.readVLong();
@ -407,6 +419,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
out.writeVInt(numberOfConcurrentReads);
out.writeVInt(numberOfConcurrentWrites);
out.writeVInt(numberOfQueuedWrites);
out.writeVLong(bufferSize);
out.writeVLong(mappingVersion);
out.writeVLong(totalFetchTimeMillis);
out.writeVLong(totalFetchRemoteTimeMillis);
@ -452,6 +465,10 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
builder.field(NUMBER_OF_CONCURRENT_READS_FIELD.getPreferredName(), numberOfConcurrentReads);
builder.field(NUMBER_OF_CONCURRENT_WRITES_FIELD.getPreferredName(), numberOfConcurrentWrites);
builder.field(NUMBER_OF_QUEUED_WRITES_FIELD.getPreferredName(), numberOfQueuedWrites);
builder.humanReadableField(
BUFFER_SIZE_IN_BYTES_FIELD.getPreferredName(),
"buffer_size",
new ByteSizeValue(bufferSize));
builder.field(MAPPING_VERSION_FIELD.getPreferredName(), mappingVersion);
builder.humanReadableField(
TOTAL_FETCH_TIME_MILLIS_FIELD.getPreferredName(),
@ -531,6 +548,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
numberOfConcurrentReads == that.numberOfConcurrentReads &&
numberOfConcurrentWrites == that.numberOfConcurrentWrites &&
numberOfQueuedWrites == that.numberOfQueuedWrites &&
bufferSize == that.bufferSize &&
mappingVersion == that.mappingVersion &&
totalFetchTimeMillis == that.totalFetchTimeMillis &&
totalFetchRemoteTimeMillis == that.totalFetchRemoteTimeMillis &&
@ -568,6 +586,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
numberOfConcurrentReads,
numberOfConcurrentWrites,
numberOfQueuedWrites,
bufferSize,
mappingVersion,
totalFetchTimeMillis,
totalFetchRemoteTimeMillis,

View File

@ -60,7 +60,12 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
AutoFollowPattern.MAX_BATCH_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareInt(Request::setMaxConcurrentWriteBatches, AutoFollowPattern.MAX_CONCURRENT_WRITE_BATCHES);
PARSER.declareInt(Request::setMaxWriteBufferSize, AutoFollowPattern.MAX_WRITE_BUFFER_SIZE);
PARSER.declareInt(Request::setMaxWriteBufferCount, AutoFollowPattern.MAX_WRITE_BUFFER_COUNT);
PARSER.declareField(
Request::setMaxWriteBufferSize,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), AutoFollowPattern.MAX_WRITE_BUFFER_SIZE.getPreferredName()),
AutoFollowPattern.MAX_WRITE_BUFFER_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareField(Request::setMaxRetryDelay,
(p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.MAX_RETRY_DELAY.getPreferredName()),
AutoFollowPattern.MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
@ -92,7 +97,8 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
private Integer maxConcurrentReadBatches;
private ByteSizeValue maxBatchSize;
private Integer maxConcurrentWriteBatches;
private Integer maxWriteBufferSize;
private Integer maxWriteBufferCount;
private ByteSizeValue maxWriteBufferSize;
private TimeValue maxRetryDelay;
private TimeValue pollTimeout;
@ -190,11 +196,19 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
}
public Integer getMaxWriteBufferSize() {
public Integer getMaxWriteBufferCount() {
return maxWriteBufferCount;
}
public void setMaxWriteBufferCount(Integer maxWriteBufferCount) {
this.maxWriteBufferCount = maxWriteBufferCount;
}
public ByteSizeValue getMaxWriteBufferSize() {
return maxWriteBufferSize;
}
public void setMaxWriteBufferSize(Integer maxWriteBufferSize) {
public void setMaxWriteBufferSize(ByteSizeValue maxWriteBufferSize) {
this.maxWriteBufferSize = maxWriteBufferSize;
}
@ -225,7 +239,8 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
maxConcurrentReadBatches = in.readOptionalVInt();
maxBatchSize = in.readOptionalWriteable(ByteSizeValue::new);
maxConcurrentWriteBatches = in.readOptionalVInt();
maxWriteBufferSize = in.readOptionalVInt();
maxWriteBufferCount = in.readOptionalVInt();
maxWriteBufferSize = in.readOptionalWriteable(ByteSizeValue::new);
maxRetryDelay = in.readOptionalTimeValue();
pollTimeout = in.readOptionalTimeValue();
}
@ -241,7 +256,8 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
out.writeOptionalVInt(maxConcurrentReadBatches);
out.writeOptionalWriteable(maxBatchSize);
out.writeOptionalVInt(maxConcurrentWriteBatches);
out.writeOptionalVInt(maxWriteBufferSize);
out.writeOptionalVInt(maxWriteBufferCount);
out.writeOptionalWriteable(maxWriteBufferSize);
out.writeOptionalTimeValue(maxRetryDelay);
out.writeOptionalTimeValue(pollTimeout);
}
@ -262,8 +278,11 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
if (maxBatchSize != null) {
builder.field(AutoFollowPattern.MAX_BATCH_SIZE.getPreferredName(), maxBatchSize.getStringRep());
}
if (maxWriteBufferCount != null) {
builder.field(AutoFollowPattern.MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount);
}
if (maxWriteBufferSize != null) {
builder.field(AutoFollowPattern.MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
builder.field(AutoFollowPattern.MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep());
}
if (maxConcurrentReadBatches != null) {
builder.field(AutoFollowPattern.MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches);
@ -295,6 +314,7 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
Objects.equals(maxConcurrentReadBatches, request.maxConcurrentReadBatches) &&
Objects.equals(maxBatchSize, request.maxBatchSize) &&
Objects.equals(maxConcurrentWriteBatches, request.maxConcurrentWriteBatches) &&
Objects.equals(maxWriteBufferCount, request.maxWriteBufferCount) &&
Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) &&
Objects.equals(maxRetryDelay, request.maxRetryDelay) &&
Objects.equals(pollTimeout, request.pollTimeout);
@ -311,6 +331,7 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
maxConcurrentReadBatches,
maxBatchSize,
maxConcurrentWriteBatches,
maxWriteBufferCount,
maxWriteBufferSize,
maxRetryDelay,
pollTimeout);

View File

@ -32,6 +32,7 @@ import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_CONCURRENT_READ_BATCHES;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_CONCURRENT_WRITE_BATCHES;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_RETRY_DELAY_FIELD;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_COUNT;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_SIZE;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.POLL_TIMEOUT;
@ -72,7 +73,12 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
MAX_BATCH_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareInt((request, value) -> request.followRequest.setMaxConcurrentWriteBatches(value), MAX_CONCURRENT_WRITE_BATCHES);
PARSER.declareInt((request, value) -> request.followRequest.setMaxWriteBufferSize(value), MAX_WRITE_BUFFER_SIZE);
PARSER.declareInt((request, value) -> request.followRequest.setMaxWriteBufferCount(value), MAX_WRITE_BUFFER_COUNT);
PARSER.declareField(
(request, value) -> request.followRequest.setMaxWriteBufferSize(value),
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()),
MAX_WRITE_BUFFER_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareField(
(request, value) -> request.followRequest.setMaxRetryDelay(value),
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY_FIELD.getPreferredName()),

View File

@ -48,6 +48,7 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
static final ParseField MAX_CONCURRENT_READ_BATCHES = new ParseField("max_concurrent_read_batches");
static final ParseField MAX_BATCH_SIZE = new ParseField("max_batch_size");
static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches");
static final ParseField MAX_WRITE_BUFFER_COUNT = new ParseField("max_write_buffer_count");
static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay");
static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout");
@ -63,7 +64,12 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
MAX_BATCH_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareInt(Request::setMaxConcurrentWriteBatches, MAX_CONCURRENT_WRITE_BATCHES);
PARSER.declareInt(Request::setMaxWriteBufferSize, MAX_WRITE_BUFFER_SIZE);
PARSER.declareInt(Request::setMaxWriteBufferCount, MAX_WRITE_BUFFER_COUNT);
PARSER.declareField(
Request::setMaxWriteBufferSize,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()),
MAX_WRITE_BUFFER_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareField(
Request::setMaxRetryDelay,
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY_FIELD.getPreferredName()),
@ -140,13 +146,23 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
}
private Integer maxWriteBufferSize;
private Integer maxWriteBufferCount;
public Integer getMaxWriteBufferSize() {
public Integer getMaxWriteBufferCount() {
return maxWriteBufferCount;
}
public void setMaxWriteBufferCount(Integer maxWriteBufferCount) {
this.maxWriteBufferCount = maxWriteBufferCount;
}
private ByteSizeValue maxWriteBufferSize;
public ByteSizeValue getMaxWriteBufferSize() {
return maxWriteBufferSize;
}
public void setMaxWriteBufferSize(Integer maxWriteBufferSize) {
public void setMaxWriteBufferSize(ByteSizeValue maxWriteBufferSize) {
this.maxWriteBufferSize = maxWriteBufferSize;
}
@ -192,7 +208,10 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
if (maxConcurrentWriteBatches != null && maxConcurrentWriteBatches < 1) {
e = addValidationError(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName() + " must be larger than 0", e);
}
if (maxWriteBufferSize != null && maxWriteBufferSize < 1) {
if (maxWriteBufferCount != null && maxWriteBufferCount < 1) {
e = addValidationError(MAX_WRITE_BUFFER_COUNT.getPreferredName() + " must be larger than 0", e);
}
if (maxWriteBufferSize != null && maxWriteBufferSize.compareTo(ByteSizeValue.ZERO) <= 0) {
e = addValidationError(MAX_WRITE_BUFFER_SIZE.getPreferredName() + " must be larger than 0", e);
}
if (maxRetryDelay != null && maxRetryDelay.millis() <= 0) {
@ -217,7 +236,8 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
maxConcurrentReadBatches = in.readOptionalVInt();
maxBatchSize = in.readOptionalWriteable(ByteSizeValue::new);
maxConcurrentWriteBatches = in.readOptionalVInt();
maxWriteBufferSize = in.readOptionalVInt();
maxWriteBufferCount = in.readOptionalVInt();
maxWriteBufferSize = in.readOptionalWriteable(ByteSizeValue::new);
maxRetryDelay = in.readOptionalTimeValue();
pollTimeout = in.readOptionalTimeValue();
}
@ -230,7 +250,8 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
out.writeOptionalVInt(maxConcurrentReadBatches);
out.writeOptionalWriteable(maxBatchSize);
out.writeOptionalVInt(maxConcurrentWriteBatches);
out.writeOptionalVInt(maxWriteBufferSize);
out.writeOptionalVInt(maxWriteBufferCount);
out.writeOptionalWriteable(maxWriteBufferSize);
out.writeOptionalTimeValue(maxRetryDelay);
out.writeOptionalTimeValue(pollTimeout);
}
@ -253,8 +274,11 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
if (maxBatchSize != null) {
builder.field(MAX_BATCH_SIZE.getPreferredName(), maxBatchSize.getStringRep());
}
if (maxWriteBufferCount != null) {
builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount);
}
if (maxWriteBufferSize != null) {
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep());
}
if (maxConcurrentReadBatches != null) {
builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches);
@ -279,6 +303,7 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
Objects.equals(maxConcurrentReadBatches, request.maxConcurrentReadBatches) &&
Objects.equals(maxBatchSize, request.maxBatchSize) &&
Objects.equals(maxConcurrentWriteBatches, request.maxConcurrentWriteBatches) &&
Objects.equals(maxWriteBufferCount, request.maxWriteBufferCount) &&
Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) &&
Objects.equals(maxRetryDelay, request.maxRetryDelay) &&
Objects.equals(pollTimeout, request.pollTimeout) &&
@ -293,6 +318,7 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
maxConcurrentReadBatches,
maxBatchSize,
maxConcurrentWriteBatches,
maxWriteBufferCount,
maxWriteBufferSize,
maxRetryDelay,
pollTimeout);

View File

@ -965,6 +965,9 @@
"number_of_queued_writes": {
"type": "long"
},
"buffer_size_in_bytes": {
"type": "long"
},
"mapping_version": {
"type": "long"
},