From 6fe0e62b7a29242cd19ec54bac0364e0e04e4be5 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 24 Oct 2018 23:48:49 +0200 Subject: [PATCH] [CCR] Added write buffer size limit (#34797) This limit is based on the size in bytes of the operations in the write buffer. If this limit is exceeded then no more read operations will be coordinated until the size in bytes of the write buffer has dropped below the configured write buffer size limit. Renamed existing `max_write_buffer_size` to ``max_write_buffer_count` to indicate that limit is count based. Closes #34705 --- .../ccr/action/AutoFollowCoordinator.java | 1 + .../xpack/ccr/action/ShardFollowNodeTask.java | 18 ++- .../xpack/ccr/action/ShardFollowTask.java | 36 ++++-- .../TransportPutAutoFollowPatternAction.java | 1 + .../action/TransportResumeFollowAction.java | 13 ++- .../elasticsearch/xpack/CcrIntegTestCase.java | 16 +++ .../xpack/CcrSingleNodeTestCase.java | 15 +++ .../elasticsearch/xpack/ccr/AutoFollowIT.java | 8 +- .../xpack/ccr/AutoFollowMetadataTests.java | 1 + .../elasticsearch/xpack/ccr/CcrLicenseIT.java | 2 +- .../xpack/ccr/IndexFollowingIT.java | 2 +- .../xpack/ccr/LocalIndexFollowingIT.java | 1 + .../action/AutoFollowCoordinatorTests.java | 16 +-- .../GetAutoFollowPatternResponseTests.java | 1 + .../PutAutoFollowPatternRequestTests.java | 5 +- .../ResumeFollowActionRequestTests.java | 5 +- .../ShardFollowNodeTaskRandomTests.java | 3 + .../ShardFollowNodeTaskStatusTests.java | 1 + .../ccr/action/ShardFollowNodeTaskTests.java | 107 +++++++++++++----- .../ShardFollowTaskReplicationTests.java | 4 +- .../ccr/action/ShardFollowTaskTests.java | 1 + .../xpack/ccr/action/StatsResponsesTests.java | 1 + ...ortDeleteAutoFollowPatternActionTests.java | 6 +- ...nsportGetAutoFollowPatternActionTests.java | 8 +- ...nsportPutAutoFollowPatternActionTests.java | 2 +- .../action/TransportUnfollowActionTests.java | 3 + .../ccr/FollowStatsMonitoringDocTests.java | 6 +- .../xpack/core/ccr/AutoFollowMetadata.java | 41 +++++-- .../core/ccr/ShardFollowNodeTaskStatus.java | 25 +++- .../action/PutAutoFollowPatternAction.java | 35 ++++-- .../core/ccr/action/PutFollowAction.java | 8 +- .../core/ccr/action/ResumeFollowAction.java | 44 +++++-- .../src/main/resources/monitoring-es.json | 3 + 33 files changed, 342 insertions(+), 97 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index a05dc0914e5..b5ba39ae7e2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -328,6 +328,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier { followRequest.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches()); followRequest.setMaxBatchSize(pattern.getMaxBatchSize()); followRequest.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches()); + followRequest.setMaxWriteBufferCount(pattern.getMaxWriteBufferCount()); followRequest.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize()); followRequest.setMaxRetryDelay(pattern.getMaxRetryDelay()); followRequest.setPollTimeout(pattern.getPollTimeout()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 19843ac4efb..9788195c7e5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -85,6 +85,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private long numberOfOperationsIndexed = 0; private long lastFetchTime = -1; private final Queue buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo)); + private long bufferSizeInBytes = 0; private final LinkedHashMap> fetchExceptions; private volatile ElasticsearchException fatalException; @@ -183,8 +184,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { params.getFollowShardId(), numConcurrentReads); return false; } - if (buffer.size() > params.getMaxWriteBufferSize()) { - LOGGER.trace("{} no new reads, buffer limit has been reached [{}]", params.getFollowShardId(), buffer.size()); + if (bufferSizeInBytes >= params.getMaxWriteBufferSize().getBytes()) { + LOGGER.trace("{} no new reads, buffer size limit has been reached [{}]", params.getFollowShardId(), bufferSizeInBytes); + return false; + } + if (buffer.size() > params.getMaxWriteBufferCount()) { + LOGGER.trace("{} no new reads, buffer count limit has been reached [{}]", params.getFollowShardId(), buffer.size()); return false; } return true; @@ -208,6 +213,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { break; } } + bufferSizeInBytes -= sumEstimatedSize; numConcurrentWrites++; LOGGER.trace("{}[{}] write [{}/{}] [{}]", params.getFollowShardId(), numConcurrentWrites, ops.get(0).seqNo(), ops.get(ops.size() - 1).seqNo(), ops.size()); @@ -281,7 +287,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { } else { assert response.getOperations()[0].seqNo() == from : "first operation is not what we asked for. From is [" + from + "], got " + response.getOperations()[0]; - buffer.addAll(Arrays.asList(response.getOperations())); + List operations = Arrays.asList(response.getOperations()); + long operationsSize = operations.stream() + .mapToLong(Translog.Operation::estimateSize) + .sum(); + buffer.addAll(operations); + bufferSizeInBytes += operationsSize; final long maxSeqNo = response.getOperations()[response.getOperations().length - 1].seqNo(); assert maxSeqNo == Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::seqNo).max().getAsLong(); @@ -455,6 +466,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { numConcurrentReads, numConcurrentWrites, buffer.size(), + bufferSizeInBytes, currentMappingVersion, totalFetchTimeMillis, totalFetchTookTimeMillis, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index ea75ee2d9e1..13e3da77491 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -48,6 +48,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { public static final ParseField MAX_CONCURRENT_READ_BATCHES = new ParseField("max_concurrent_read_batches"); public static final ParseField MAX_BATCH_SIZE = new ParseField("max_batch_size"); public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches"); + public static final ParseField MAX_WRITE_BUFFER_COUNT = new ParseField("max_write_buffer_count"); public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay"); public static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout"); @@ -56,7 +57,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { private static ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, (a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]), new ShardId((String) a[4], (String) a[5], (int) a[6]), (int) a[7], (int) a[8], (ByteSizeValue) a[9], - (int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (Map) a[14])); + (int) a[10], (int) a[11], (ByteSizeValue) a[12], (TimeValue) a[13], (TimeValue) a[14], (Map) a[15])); static { PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REMOTE_CLUSTER_FIELD); @@ -74,7 +75,12 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { MAX_BATCH_SIZE, ObjectParser.ValueType.STRING); PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_CONCURRENT_WRITE_BATCHES); - PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_BUFFER_SIZE); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_BUFFER_COUNT); + PARSER.declareField( + ConstructingObjectParser.constructorArg(), + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()), + MAX_WRITE_BUFFER_SIZE, + ObjectParser.ValueType.STRING); PARSER.declareField(ConstructingObjectParser.constructorArg(), (p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()), MAX_RETRY_DELAY, ObjectParser.ValueType.STRING); @@ -91,7 +97,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { private final int maxConcurrentReadBatches; private final ByteSizeValue maxBatchSize; private final int maxConcurrentWriteBatches; - private final int maxWriteBufferSize; + private final int maxWriteBufferCount; + private final ByteSizeValue maxWriteBufferSize; private final TimeValue maxRetryDelay; private final TimeValue pollTimeout; private final Map headers; @@ -104,7 +111,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { final int maxConcurrentReadBatches, final ByteSizeValue maxBatchSize, final int maxConcurrentWriteBatches, - final int maxWriteBufferSize, + final int maxWriteBufferCount, + final ByteSizeValue maxWriteBufferSize, final TimeValue maxRetryDelay, final TimeValue pollTimeout, final Map headers) { @@ -115,6 +123,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { this.maxConcurrentReadBatches = maxConcurrentReadBatches; this.maxBatchSize = maxBatchSize; this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; + this.maxWriteBufferCount = maxWriteBufferCount; this.maxWriteBufferSize = maxWriteBufferSize; this.maxRetryDelay = maxRetryDelay; this.pollTimeout = pollTimeout; @@ -129,7 +138,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { this.maxConcurrentReadBatches = in.readVInt(); this.maxBatchSize = new ByteSizeValue(in); this.maxConcurrentWriteBatches = in.readVInt(); - this.maxWriteBufferSize = in.readVInt(); + this.maxWriteBufferCount = in.readVInt(); + this.maxWriteBufferSize = new ByteSizeValue(in); this.maxRetryDelay = in.readTimeValue(); this.pollTimeout = in.readTimeValue(); this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); @@ -159,7 +169,11 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { return maxConcurrentWriteBatches; } - public int getMaxWriteBufferSize() { + public int getMaxWriteBufferCount() { + return maxWriteBufferCount; + } + + public ByteSizeValue getMaxWriteBufferSize() { return maxWriteBufferSize; } @@ -197,7 +211,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { out.writeVInt(maxConcurrentReadBatches); maxBatchSize.writeTo(out); out.writeVInt(maxConcurrentWriteBatches); - out.writeVInt(maxWriteBufferSize); + out.writeVInt(maxWriteBufferCount); + maxWriteBufferSize.writeTo(out); out.writeTimeValue(maxRetryDelay); out.writeTimeValue(pollTimeout); out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); @@ -221,7 +236,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); builder.field(MAX_BATCH_SIZE.getPreferredName(), maxBatchSize.getStringRep()); builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); - builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); + builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount); + builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep()); builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep()); builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep()); builder.field(HEADERS.getPreferredName(), headers); @@ -240,7 +256,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { maxConcurrentReadBatches == that.maxConcurrentReadBatches && maxConcurrentWriteBatches == that.maxConcurrentWriteBatches && maxBatchSize.equals(that.maxBatchSize) && - maxWriteBufferSize == that.maxWriteBufferSize && + maxWriteBufferCount == that.maxWriteBufferCount && + maxWriteBufferSize.equals(that.maxWriteBufferSize) && Objects.equals(maxRetryDelay, that.maxRetryDelay) && Objects.equals(pollTimeout, that.pollTimeout) && Objects.equals(headers, that.headers); @@ -256,6 +273,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { maxConcurrentReadBatches, maxConcurrentWriteBatches, maxBatchSize, + maxWriteBufferCount, maxWriteBufferSize, maxRetryDelay, pollTimeout, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index 8832275f9a9..79f1ed7a2ee 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -164,6 +164,7 @@ public class TransportPutAutoFollowPatternAction extends request.getMaxConcurrentReadBatches(), request.getMaxBatchSize(), request.getMaxConcurrentWriteBatches(), + request.getMaxWriteBufferCount(), request.getMaxWriteBufferSize(), request.getMaxRetryDelay(), request.getPollTimeout()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java index 53ac116d38e..97905f92721 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java @@ -55,7 +55,8 @@ public class TransportResumeFollowAction extends HandledTransportAction { + FollowStatsAction.StatsResponses statsResponses = + leaderClient().execute(FollowStatsAction.INSTANCE, new FollowStatsAction.StatsRequest()).actionGet(); + for (FollowStatsAction.StatsResponse statsResponse : statsResponses.getStatsResponses()) { + ShardFollowNodeTaskStatus status = statsResponse.status(); + assertThat(status.numberOfQueuedWrites(), equalTo(0)); + assertThat(status.bufferSize(), equalTo(0L)); + } + }); + } + static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception { CountDownLatch latch = new CountDownLatch(1); clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java index 169c009207c..611fb0c27fa 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java @@ -16,6 +16,8 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.LocalStateCcr; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; +import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.junit.After; @@ -26,6 +28,7 @@ import java.util.Collections; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.CcrIntegTestCase.removeCCRRelatedMetadataFromClusterState; +import static org.hamcrest.Matchers.equalTo; public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase { @@ -80,4 +83,16 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase { return request; } + protected void ensureEmptyWriteBuffers() throws Exception { + assertBusy(() -> { + FollowStatsAction.StatsResponses statsResponses = + client().execute(FollowStatsAction.INSTANCE, new FollowStatsAction.StatsRequest()).actionGet(); + for (FollowStatsAction.StatsResponse statsResponse : statsResponses.getStatsResponses()) { + ShardFollowNodeTaskStatus status = statsResponse.status(); + assertThat(status.numberOfQueuedWrites(), equalTo(0)); + assertThat(status.bufferSize(), equalTo(0L)); + } + }); + } + } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 8b3de8f5f8d..50e6008d91d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -126,7 +126,7 @@ public class AutoFollowIT extends CcrIntegTestCase { // Need to set this, because following an index in the same cluster request.setFollowIndexNamePattern("copy-{{leader_index}}"); if (randomBoolean()) { - request.setMaxWriteBufferSize(randomIntBetween(0, Integer.MAX_VALUE)); + request.setMaxWriteBufferCount(randomIntBetween(0, Integer.MAX_VALUE)); } if (randomBoolean()) { request.setMaxConcurrentReadBatches(randomIntBetween(0, Integer.MAX_VALUE)); @@ -137,6 +137,9 @@ public class AutoFollowIT extends CcrIntegTestCase { if (randomBoolean()) { request.setMaxBatchOperationCount(randomIntBetween(0, Integer.MAX_VALUE)); } + if (randomBoolean()) { + request.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong())); + } if (randomBoolean()) { request.setMaxBatchSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES)); } @@ -157,6 +160,9 @@ public class AutoFollowIT extends CcrIntegTestCase { ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTasksMetaData.tasks().iterator().next().getParams(); assertThat(shardFollowTask.getLeaderShardId().getIndexName(), equalTo("logs-201901")); assertThat(shardFollowTask.getFollowShardId().getIndexName(), equalTo("copy-logs-201901")); + if (request.getMaxWriteBufferCount() != null) { + assertThat(shardFollowTask.getMaxWriteBufferCount(), equalTo(request.getMaxWriteBufferCount())); + } if (request.getMaxWriteBufferSize() != null) { assertThat(shardFollowTask.getMaxWriteBufferSize(), equalTo(request.getMaxWriteBufferSize())); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java index 67071bd1be5..0ca175cef82 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java @@ -49,6 +49,7 @@ public class AutoFollowMetadataTests extends AbstractSerializingTestCase patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -121,7 +121,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = - new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); + new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -179,7 +179,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { .build(); AutoFollowPattern autoFollowPattern = - new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); + new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -242,7 +242,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { .build(); AutoFollowPattern autoFollowPattern = - new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); + new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -296,7 +296,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testGetLeaderIndicesToFollow() { AutoFollowPattern autoFollowPattern = - new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null); + new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null, null); Map> headers = new HashMap<>(); ClusterState followerState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, @@ -341,15 +341,15 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testGetFollowerIndexName() { AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null, - null, null, null, null, null, null); + null, null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("metrics-0")); autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-metrics-0", null, null, - null, null, null, null, null); + null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-{{leader_index}}", null, - null, null, null, null, null, null); + null, null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java index e67509f7ee8..301dabeef89 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java @@ -37,6 +37,7 @@ public class GetAutoFollowPatternResponseTests extends AbstractStreamableTestCas new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES), randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), + new ByteSizeValue(randomNonNegativeLong()), TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(500)); patterns.put(randomAlphaOfLength(4), autoFollowPattern); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java index 67957d1e366..e4e365312ad 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java @@ -66,7 +66,10 @@ public class PutAutoFollowPatternRequestTests extends AbstractStreamableXContent request.setMaxBatchSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES)); } if (randomBoolean()) { - request.setMaxWriteBufferSize(randomIntBetween(0, Integer.MAX_VALUE)); + request.setMaxWriteBufferCount(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong())); } return request; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ResumeFollowActionRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ResumeFollowActionRequestTests.java index 55c0c79e5b2..122082537fd 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ResumeFollowActionRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ResumeFollowActionRequestTests.java @@ -57,7 +57,10 @@ public class ResumeFollowActionRequestTests extends AbstractStreamableXContentTe request.setMaxBatchSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES)); } if (randomBoolean()) { - request.setMaxWriteBufferSize(randomIntBetween(1, Integer.MAX_VALUE)); + request.setMaxWriteBufferCount(randomIntBetween(1, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES)); } if (randomBoolean()) { request.setMaxRetryDelay(TimeValue.timeValueMillis(500)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index 50c0dd9ca49..8576bc28905 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.ccr.action; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.shard.ShardId; @@ -83,6 +85,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase { TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE, concurrency, 10240, + new ByteSizeValue(512, ByteSizeUnit.MB), TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap() diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java index a3881a6728f..93d9556d0e4 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java @@ -57,6 +57,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase< randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), randomReadExceptions(), randomLong(), randomBoolean() ? new ElasticsearchException("fatal error") : null); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 1988513c95d..a1582d4c2f1 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -63,7 +63,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { private Queue responseSizes; public void testCoordinateReads() { - ShardFollowNodeTask task = createShardFollowTask(8, between(8, 20), between(1, 20), Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = createShardFollowTask(8, between(8, 20), between(1, 20), Integer.MAX_VALUE, + new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 3, -1); task.coordinateReads(); assertThat(shardChangesRequests, contains(new long[]{0L, 8L})); // treat this a peak request @@ -77,9 +78,10 @@ public class ShardFollowNodeTaskTests extends ESTestCase { assertThat(status.lastRequestedSeqNo(), equalTo(60L)); } - public void testWriteBuffer() { + public void testMaxWriteBufferCount() { // Need to set concurrentWrites to 0, other the write buffer gets flushed immediately: - ShardFollowNodeTask task = createShardFollowTask(64, 1, 0, 32, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 0, 32, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); task.coordinateReads(); @@ -90,7 +92,30 @@ public class ShardFollowNodeTaskTests extends ESTestCase { shardChangesRequests.clear(); // Also invokes the coordinatesReads() method: task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 128L)); - assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer is full + assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer count limit has been reached + + ShardFollowNodeTaskStatus status = task.getStatus(); + assertThat(status.numberOfConcurrentReads(), equalTo(0)); + assertThat(status.numberOfConcurrentWrites(), equalTo(0)); + assertThat(status.lastRequestedSeqNo(), equalTo(63L)); + assertThat(status.leaderGlobalCheckpoint(), equalTo(128L)); + } + + public void testMaxWriteBufferSize() { + // Need to set concurrentWrites to 0, other the write buffer gets flushed immediately: + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 0, Integer.MAX_VALUE, new ByteSizeValue(1, ByteSizeUnit.KB), Long.MAX_VALUE); + startTask(task, 63, -1); + + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + + shardChangesRequests.clear(); + // Also invokes the coordinatesReads() method: + task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 128L)); + assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer size limit has been reached ShardFollowNodeTaskStatus status = task.getStatus(); assertThat(status.numberOfConcurrentReads(), equalTo(0)); @@ -100,7 +125,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testMaxConcurrentReads() { - ShardFollowNodeTask task = createShardFollowTask(8, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(8, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 64, -1); task.coordinateReads(); @@ -114,7 +140,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testTaskCancelled() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 64, -1); task.coordinateReads(); @@ -131,7 +158,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testTaskCancelledAfterReadLimitHasBeenReached() { - ShardFollowNodeTask task = createShardFollowTask(16, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(16, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 31, -1); task.coordinateReads(); @@ -155,7 +183,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testTaskCancelledAfterWriteBufferLimitHasBeenReached() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, 32, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, 32, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 64, -1); task.coordinateReads(); @@ -179,7 +208,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testReceiveRetryableError() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); int max = randomIntBetween(1, 30); @@ -229,7 +259,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testEmptyShardChangesResponseShouldClearFetchException() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, -1, -1); readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); @@ -258,7 +289,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testReceiveTimeout() { - final ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + final ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); final int numberOfTimeouts = randomIntBetween(1, 32); @@ -322,7 +354,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testReceiveNonRetryableError() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); Exception failure = new RuntimeException("replication failed"); @@ -362,7 +395,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testHandleReadResponse() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); task.coordinateReads(); @@ -383,7 +417,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testReceiveLessThanRequested() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); task.coordinateReads(); @@ -407,7 +442,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testCancelAndReceiveLessThanRequested() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); task.coordinateReads(); @@ -430,7 +466,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testReceiveNothingExpectedSomething() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); task.coordinateReads(); @@ -453,7 +490,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testMappingUpdate() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); mappingVersions.add(1L); @@ -474,7 +512,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testMappingUpdateRetryableError() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); int max = randomIntBetween(1, 30); @@ -499,7 +538,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testMappingUpdateNonRetryableError() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); mappingUpdateFailures.add(new RuntimeException()); @@ -518,7 +558,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testCoordinateWrites() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); task.coordinateReads(); @@ -542,7 +583,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testMaxConcurrentWrites() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 2, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 2, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -554,7 +596,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { ShardFollowNodeTaskStatus status = task.getStatus(); assertThat(status.numberOfConcurrentWrites(), equalTo(2)); - task = createShardFollowTask(64, 1, 4, Integer.MAX_VALUE, Long.MAX_VALUE); + task = createShardFollowTask(64, 1, 4, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); response = generateShardChangesResponse(0, 256, 0L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -570,7 +612,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testMaxBatchOperationCount() { - ShardFollowNodeTask task = createShardFollowTask(8, 1, 32, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(8, 1, 32, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -586,7 +629,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testRetryableError() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); task.coordinateReads(); @@ -614,7 +658,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testNonRetryableError() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); task.coordinateReads(); @@ -636,7 +681,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testMaxBatchBytesLimit() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 128, Integer.MAX_VALUE, 1L); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 128, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), 1L); startTask(task, 64, -1); task.coordinateReads(); @@ -652,7 +698,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testHandleWriteResponse() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); task.coordinateReads(); @@ -698,7 +745,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { private ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxConcurrentReadBatches, int maxConcurrentWriteBatches, - int bufferWriteLimit, + int maxWriteBufferCount, + ByteSizeValue maxWriteBufferSize, long maxBatchSizeInBytes) { AtomicBoolean stopped = new AtomicBoolean(false); ShardFollowTask params = new ShardFollowTask( @@ -709,7 +757,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { maxConcurrentReadBatches, new ByteSizeValue(maxBatchSizeInBytes, ByteSizeUnit.BYTES), maxConcurrentWriteBatches, - bufferWriteLimit, + maxWriteBufferCount, + maxWriteBufferSize, TimeValue.ZERO, TimeValue.ZERO, Collections.emptyMap() diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 96bc2f04f59..07c3121eba4 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -370,7 +370,9 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest between(1, 64), between(1, 8), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES), - between(1, 4), 10240, + between(1, 4), + 10240, + new ByteSizeValue(512, ByteSizeUnit.MB), TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap() diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java index 865d18e6067..e955f77d733 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java @@ -34,6 +34,7 @@ public class ShardFollowTaskTests extends AbstractSerializingTestCase existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); existingAutoFollowPatterns.put("name1", - new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null)); + new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null)); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); @@ -44,7 +44,7 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase { List existingPatterns = new ArrayList<>(); existingPatterns.add("logs-*"); existingAutoFollowPatterns.put("name2", - new AutoFollowPattern("asia_cluster", existingPatterns, null, null, null, null, null, null, null, null)); + new AutoFollowPattern("asia_cluster", existingPatterns, null, null, null, null, null, null, null, null, null)); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); @@ -78,7 +78,7 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase { List existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); existingAutoFollowPatterns.put("name1", - new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null)); + new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null)); existingHeaders.put("key", Collections.singletonMap("key", "val")); } ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster")) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java index ffc2d115091..128474bbc30 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java @@ -23,10 +23,10 @@ public class TransportGetAutoFollowPatternActionTests extends ESTestCase { public void testGetAutoFollowPattern() { Map patterns = new HashMap<>(); - patterns.put("name1", - new AutoFollowPattern("test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null)); - patterns.put("name2", - new AutoFollowPattern("test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null)); + patterns.put("name1", new AutoFollowPattern( + "test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null, null)); + patterns.put("name2", new AutoFollowPattern( + "test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null, null)); MetaData metaData = MetaData.builder() .putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap())) .build(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java index 6d4ef138fb4..433ef402af8 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java @@ -103,7 +103,7 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase { List existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); existingAutoFollowPatterns.put("name1", - new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null)); + new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null)); Map> existingAlreadyFollowedIndexUUIDS = new HashMap<>(); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java index 07b0fc078ac..82cbe2622b7 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java @@ -11,6 +11,8 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -81,6 +83,7 @@ public class TransportUnfollowActionTests extends ESTestCase { TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE, 1, 10240, + new ByteSizeValue(512, ByteSizeUnit.MB), TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap() diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java index 4b36005de36..9b4ed7c8a97 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java @@ -92,6 +92,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase entry : serializedStatus.entrySet()) { String fieldName = entry.getKey(); Map fieldMapping = (Map) followStatsMapping.get(fieldName); - assertThat(fieldMapping, notNullValue()); + assertThat("no field mapping for field [" + fieldName + "]", fieldMapping, notNullValue()); Object fieldValue = entry.getValue(); String fieldType = (String) fieldMapping.get("type"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java index 5234151010c..8172612b78f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java @@ -182,6 +182,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i public static final ParseField MAX_CONCURRENT_READ_BATCHES = new ParseField("max_concurrent_read_batches"); public static final ParseField MAX_BATCH_SIZE = new ParseField("max_batch_size"); public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches"); + public static final ParseField MAX_WRITE_BUFFER_COUNT = new ParseField("max_write_buffer_count"); public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay"); public static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout"); @@ -190,8 +191,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("auto_follow_pattern", args -> new AutoFollowPattern((String) args[0], (List) args[1], (String) args[2], (Integer) args[3], - (Integer) args[4], (ByteSizeValue) args[5], (Integer) args[6], (Integer) args[7], (TimeValue) args[8], - (TimeValue) args[9])); + (Integer) args[4], (ByteSizeValue) args[5], (Integer) args[6], (Integer) args[7], (ByteSizeValue) args[8], + (TimeValue) args[9], (TimeValue) args[10])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), REMOTE_CLUSTER_FIELD); @@ -205,7 +206,12 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i MAX_BATCH_SIZE, ObjectParser.ValueType.STRING); PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_CONCURRENT_WRITE_BATCHES); - PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_WRITE_BUFFER_SIZE); + PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_WRITE_BUFFER_COUNT); + PARSER.declareField( + ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()), + MAX_WRITE_BUFFER_SIZE, + ObjectParser.ValueType.STRING); PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()), MAX_RETRY_DELAY, ObjectParser.ValueType.STRING); @@ -221,7 +227,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i private final Integer maxConcurrentReadBatches; private final ByteSizeValue maxBatchSize; private final Integer maxConcurrentWriteBatches; - private final Integer maxWriteBufferSize; + private final Integer maxWriteBufferCount; + private final ByteSizeValue maxWriteBufferSize; private final TimeValue maxRetryDelay; private final TimeValue pollTimeout; @@ -232,8 +239,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i Integer maxConcurrentReadBatches, ByteSizeValue maxBatchSize, Integer maxConcurrentWriteBatches, - Integer maxWriteBufferSize, - TimeValue maxRetryDelay, + Integer maxWriteBufferCount, + ByteSizeValue maxWriteBufferSize, TimeValue maxRetryDelay, TimeValue pollTimeout) { this.remoteCluster = remoteCluster; this.leaderIndexPatterns = leaderIndexPatterns; @@ -242,6 +249,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i this.maxConcurrentReadBatches = maxConcurrentReadBatches; this.maxBatchSize = maxBatchSize; this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; + this.maxWriteBufferCount = maxWriteBufferCount; this.maxWriteBufferSize = maxWriteBufferSize; this.maxRetryDelay = maxRetryDelay; this.pollTimeout = pollTimeout; @@ -255,7 +263,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i maxConcurrentReadBatches = in.readOptionalVInt(); maxBatchSize = in.readOptionalWriteable(ByteSizeValue::new); maxConcurrentWriteBatches = in.readOptionalVInt(); - maxWriteBufferSize = in.readOptionalVInt(); + maxWriteBufferCount = in.readOptionalVInt(); + maxWriteBufferSize = in.readOptionalWriteable(ByteSizeValue::new); maxRetryDelay = in.readOptionalTimeValue(); pollTimeout = in.readOptionalTimeValue(); } @@ -296,7 +305,11 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i return maxConcurrentWriteBatches; } - public Integer getMaxWriteBufferSize() { + public Integer getMaxWriteBufferCount() { + return maxWriteBufferCount; + } + + public ByteSizeValue getMaxWriteBufferSize() { return maxWriteBufferSize; } @@ -317,7 +330,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i out.writeOptionalVInt(maxConcurrentReadBatches); out.writeOptionalWriteable(maxBatchSize); out.writeOptionalVInt(maxConcurrentWriteBatches); - out.writeOptionalVInt(maxWriteBufferSize); + out.writeOptionalVInt(maxWriteBufferCount); + out.writeOptionalWriteable(maxWriteBufferSize); out.writeOptionalTimeValue(maxRetryDelay); out.writeOptionalTimeValue(pollTimeout); } @@ -341,8 +355,11 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i if (maxConcurrentWriteBatches != null) { builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); } - if (maxWriteBufferSize != null){ - builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); + if (maxWriteBufferCount != null){ + builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount); + } + if (maxWriteBufferSize != null) { + builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep()); } if (maxRetryDelay != null) { builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay); @@ -370,6 +387,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i Objects.equals(maxConcurrentReadBatches, that.maxConcurrentReadBatches) && Objects.equals(maxBatchSize, that.maxBatchSize) && Objects.equals(maxConcurrentWriteBatches, that.maxConcurrentWriteBatches) && + Objects.equals(maxWriteBufferCount, that.maxWriteBufferCount) && Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) && Objects.equals(maxRetryDelay, that.maxRetryDelay) && Objects.equals(pollTimeout, that.pollTimeout); @@ -385,6 +403,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i maxConcurrentReadBatches, maxBatchSize, maxConcurrentWriteBatches, + maxWriteBufferCount, maxWriteBufferSize, maxRetryDelay, pollTimeout); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java index 5869c78bc73..e2e907f80d1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java @@ -46,6 +46,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { private static final ParseField NUMBER_OF_CONCURRENT_READS_FIELD = new ParseField("number_of_concurrent_reads"); private static final ParseField NUMBER_OF_CONCURRENT_WRITES_FIELD = new ParseField("number_of_concurrent_writes"); private static final ParseField NUMBER_OF_QUEUED_WRITES_FIELD = new ParseField("number_of_queued_writes"); + private static final ParseField BUFFER_SIZE_IN_BYTES_FIELD = new ParseField("buffer_size_in_bytes"); private static final ParseField MAPPING_VERSION_FIELD = new ParseField("mapping_version"); private static final ParseField TOTAL_FETCH_TIME_MILLIS_FIELD = new ParseField("total_fetch_time_millis"); private static final ParseField TOTAL_FETCH_REMOTE_TIME_MILLIS_FIELD = new ParseField("total_fetch_remote_time_millis"); @@ -89,12 +90,13 @@ public class ShardFollowNodeTaskStatus implements Task.Status { (long) args[20], (long) args[21], (long) args[22], + (long) args[23], new TreeMap<>( - ((List>>) args[23]) + ((List>>) args[24]) .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), - (long) args[24], - (ElasticsearchException) args[25])); + (long) args[25], + (ElasticsearchException) args[26])); public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry"; @@ -116,6 +118,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD); STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD); STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUFFER_SIZE_IN_BYTES_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAPPING_VERSION_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_REMOTE_TIME_MILLIS_FIELD); @@ -219,6 +222,12 @@ public class ShardFollowNodeTaskStatus implements Task.Status { return numberOfQueuedWrites; } + private final long bufferSize; + + public long bufferSize() { + return bufferSize; + } + private final long mappingVersion; public long mappingVersion() { @@ -316,6 +325,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { final int numberOfConcurrentReads, final int numberOfConcurrentWrites, final int numberOfQueuedWrites, + final long bufferSize, final long mappingVersion, final long totalFetchTimeMillis, final long totalFetchRemoteTimeMillis, @@ -342,6 +352,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { this.numberOfConcurrentReads = numberOfConcurrentReads; this.numberOfConcurrentWrites = numberOfConcurrentWrites; this.numberOfQueuedWrites = numberOfQueuedWrites; + this.bufferSize = bufferSize; this.mappingVersion = mappingVersion; this.totalFetchTimeMillis = totalFetchTimeMillis; this.totalFetchRemoteTimeMillis = totalFetchRemoteTimeMillis; @@ -371,6 +382,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { this.numberOfConcurrentReads = in.readVInt(); this.numberOfConcurrentWrites = in.readVInt(); this.numberOfQueuedWrites = in.readVInt(); + this.bufferSize = in.readVLong(); this.mappingVersion = in.readVLong(); this.totalFetchTimeMillis = in.readVLong(); this.totalFetchRemoteTimeMillis = in.readVLong(); @@ -407,6 +419,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { out.writeVInt(numberOfConcurrentReads); out.writeVInt(numberOfConcurrentWrites); out.writeVInt(numberOfQueuedWrites); + out.writeVLong(bufferSize); out.writeVLong(mappingVersion); out.writeVLong(totalFetchTimeMillis); out.writeVLong(totalFetchRemoteTimeMillis); @@ -452,6 +465,10 @@ public class ShardFollowNodeTaskStatus implements Task.Status { builder.field(NUMBER_OF_CONCURRENT_READS_FIELD.getPreferredName(), numberOfConcurrentReads); builder.field(NUMBER_OF_CONCURRENT_WRITES_FIELD.getPreferredName(), numberOfConcurrentWrites); builder.field(NUMBER_OF_QUEUED_WRITES_FIELD.getPreferredName(), numberOfQueuedWrites); + builder.humanReadableField( + BUFFER_SIZE_IN_BYTES_FIELD.getPreferredName(), + "buffer_size", + new ByteSizeValue(bufferSize)); builder.field(MAPPING_VERSION_FIELD.getPreferredName(), mappingVersion); builder.humanReadableField( TOTAL_FETCH_TIME_MILLIS_FIELD.getPreferredName(), @@ -531,6 +548,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { numberOfConcurrentReads == that.numberOfConcurrentReads && numberOfConcurrentWrites == that.numberOfConcurrentWrites && numberOfQueuedWrites == that.numberOfQueuedWrites && + bufferSize == that.bufferSize && mappingVersion == that.mappingVersion && totalFetchTimeMillis == that.totalFetchTimeMillis && totalFetchRemoteTimeMillis == that.totalFetchRemoteTimeMillis && @@ -568,6 +586,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { numberOfConcurrentReads, numberOfConcurrentWrites, numberOfQueuedWrites, + bufferSize, mappingVersion, totalFetchTimeMillis, totalFetchRemoteTimeMillis, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutAutoFollowPatternAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutAutoFollowPatternAction.java index 5a87666d050..8010c9bf344 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutAutoFollowPatternAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutAutoFollowPatternAction.java @@ -60,7 +60,12 @@ public class PutAutoFollowPatternAction extends Action { AutoFollowPattern.MAX_BATCH_SIZE, ObjectParser.ValueType.STRING); PARSER.declareInt(Request::setMaxConcurrentWriteBatches, AutoFollowPattern.MAX_CONCURRENT_WRITE_BATCHES); - PARSER.declareInt(Request::setMaxWriteBufferSize, AutoFollowPattern.MAX_WRITE_BUFFER_SIZE); + PARSER.declareInt(Request::setMaxWriteBufferCount, AutoFollowPattern.MAX_WRITE_BUFFER_COUNT); + PARSER.declareField( + Request::setMaxWriteBufferSize, + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), AutoFollowPattern.MAX_WRITE_BUFFER_SIZE.getPreferredName()), + AutoFollowPattern.MAX_WRITE_BUFFER_SIZE, + ObjectParser.ValueType.STRING); PARSER.declareField(Request::setMaxRetryDelay, (p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.MAX_RETRY_DELAY.getPreferredName()), AutoFollowPattern.MAX_RETRY_DELAY, ObjectParser.ValueType.STRING); @@ -92,7 +97,8 @@ public class PutAutoFollowPatternAction extends Action { private Integer maxConcurrentReadBatches; private ByteSizeValue maxBatchSize; private Integer maxConcurrentWriteBatches; - private Integer maxWriteBufferSize; + private Integer maxWriteBufferCount; + private ByteSizeValue maxWriteBufferSize; private TimeValue maxRetryDelay; private TimeValue pollTimeout; @@ -190,11 +196,19 @@ public class PutAutoFollowPatternAction extends Action { this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; } - public Integer getMaxWriteBufferSize() { + public Integer getMaxWriteBufferCount() { + return maxWriteBufferCount; + } + + public void setMaxWriteBufferCount(Integer maxWriteBufferCount) { + this.maxWriteBufferCount = maxWriteBufferCount; + } + + public ByteSizeValue getMaxWriteBufferSize() { return maxWriteBufferSize; } - public void setMaxWriteBufferSize(Integer maxWriteBufferSize) { + public void setMaxWriteBufferSize(ByteSizeValue maxWriteBufferSize) { this.maxWriteBufferSize = maxWriteBufferSize; } @@ -225,7 +239,8 @@ public class PutAutoFollowPatternAction extends Action { maxConcurrentReadBatches = in.readOptionalVInt(); maxBatchSize = in.readOptionalWriteable(ByteSizeValue::new); maxConcurrentWriteBatches = in.readOptionalVInt(); - maxWriteBufferSize = in.readOptionalVInt(); + maxWriteBufferCount = in.readOptionalVInt(); + maxWriteBufferSize = in.readOptionalWriteable(ByteSizeValue::new); maxRetryDelay = in.readOptionalTimeValue(); pollTimeout = in.readOptionalTimeValue(); } @@ -241,7 +256,8 @@ public class PutAutoFollowPatternAction extends Action { out.writeOptionalVInt(maxConcurrentReadBatches); out.writeOptionalWriteable(maxBatchSize); out.writeOptionalVInt(maxConcurrentWriteBatches); - out.writeOptionalVInt(maxWriteBufferSize); + out.writeOptionalVInt(maxWriteBufferCount); + out.writeOptionalWriteable(maxWriteBufferSize); out.writeOptionalTimeValue(maxRetryDelay); out.writeOptionalTimeValue(pollTimeout); } @@ -262,8 +278,11 @@ public class PutAutoFollowPatternAction extends Action { if (maxBatchSize != null) { builder.field(AutoFollowPattern.MAX_BATCH_SIZE.getPreferredName(), maxBatchSize.getStringRep()); } + if (maxWriteBufferCount != null) { + builder.field(AutoFollowPattern.MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount); + } if (maxWriteBufferSize != null) { - builder.field(AutoFollowPattern.MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); + builder.field(AutoFollowPattern.MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep()); } if (maxConcurrentReadBatches != null) { builder.field(AutoFollowPattern.MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); @@ -295,6 +314,7 @@ public class PutAutoFollowPatternAction extends Action { Objects.equals(maxConcurrentReadBatches, request.maxConcurrentReadBatches) && Objects.equals(maxBatchSize, request.maxBatchSize) && Objects.equals(maxConcurrentWriteBatches, request.maxConcurrentWriteBatches) && + Objects.equals(maxWriteBufferCount, request.maxWriteBufferCount) && Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) && Objects.equals(maxRetryDelay, request.maxRetryDelay) && Objects.equals(pollTimeout, request.pollTimeout); @@ -311,6 +331,7 @@ public class PutAutoFollowPatternAction extends Action { maxConcurrentReadBatches, maxBatchSize, maxConcurrentWriteBatches, + maxWriteBufferCount, maxWriteBufferSize, maxRetryDelay, pollTimeout); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java index 28895a59073..0f36af4db10 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java @@ -32,6 +32,7 @@ import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_CONCURRENT_READ_BATCHES; import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_CONCURRENT_WRITE_BATCHES; import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_RETRY_DELAY_FIELD; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_COUNT; import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_SIZE; import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.POLL_TIMEOUT; @@ -72,7 +73,12 @@ public final class PutFollowAction extends Action { MAX_BATCH_SIZE, ObjectParser.ValueType.STRING); PARSER.declareInt((request, value) -> request.followRequest.setMaxConcurrentWriteBatches(value), MAX_CONCURRENT_WRITE_BATCHES); - PARSER.declareInt((request, value) -> request.followRequest.setMaxWriteBufferSize(value), MAX_WRITE_BUFFER_SIZE); + PARSER.declareInt((request, value) -> request.followRequest.setMaxWriteBufferCount(value), MAX_WRITE_BUFFER_COUNT); + PARSER.declareField( + (request, value) -> request.followRequest.setMaxWriteBufferSize(value), + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()), + MAX_WRITE_BUFFER_SIZE, + ObjectParser.ValueType.STRING); PARSER.declareField( (request, value) -> request.followRequest.setMaxRetryDelay(value), (p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY_FIELD.getPreferredName()), diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java index 127ccf7610f..587223e3fbc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java @@ -48,6 +48,7 @@ public final class ResumeFollowAction extends Action { static final ParseField MAX_CONCURRENT_READ_BATCHES = new ParseField("max_concurrent_read_batches"); static final ParseField MAX_BATCH_SIZE = new ParseField("max_batch_size"); static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches"); + static final ParseField MAX_WRITE_BUFFER_COUNT = new ParseField("max_write_buffer_count"); static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay"); static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout"); @@ -63,11 +64,16 @@ public final class ResumeFollowAction extends Action { MAX_BATCH_SIZE, ObjectParser.ValueType.STRING); PARSER.declareInt(Request::setMaxConcurrentWriteBatches, MAX_CONCURRENT_WRITE_BATCHES); - PARSER.declareInt(Request::setMaxWriteBufferSize, MAX_WRITE_BUFFER_SIZE); + PARSER.declareInt(Request::setMaxWriteBufferCount, MAX_WRITE_BUFFER_COUNT); + PARSER.declareField( + Request::setMaxWriteBufferSize, + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()), + MAX_WRITE_BUFFER_SIZE, + ObjectParser.ValueType.STRING); PARSER.declareField( Request::setMaxRetryDelay, (p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY_FIELD.getPreferredName()), - MAX_RETRY_DELAY_FIELD, + MAX_RETRY_DELAY_FIELD, ObjectParser.ValueType.STRING); PARSER.declareField( Request::setPollTimeout, @@ -140,13 +146,23 @@ public final class ResumeFollowAction extends Action { this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; } - private Integer maxWriteBufferSize; + private Integer maxWriteBufferCount; - public Integer getMaxWriteBufferSize() { + public Integer getMaxWriteBufferCount() { + return maxWriteBufferCount; + } + + public void setMaxWriteBufferCount(Integer maxWriteBufferCount) { + this.maxWriteBufferCount = maxWriteBufferCount; + } + + private ByteSizeValue maxWriteBufferSize; + + public ByteSizeValue getMaxWriteBufferSize() { return maxWriteBufferSize; } - public void setMaxWriteBufferSize(Integer maxWriteBufferSize) { + public void setMaxWriteBufferSize(ByteSizeValue maxWriteBufferSize) { this.maxWriteBufferSize = maxWriteBufferSize; } @@ -192,7 +208,10 @@ public final class ResumeFollowAction extends Action { if (maxConcurrentWriteBatches != null && maxConcurrentWriteBatches < 1) { e = addValidationError(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName() + " must be larger than 0", e); } - if (maxWriteBufferSize != null && maxWriteBufferSize < 1) { + if (maxWriteBufferCount != null && maxWriteBufferCount < 1) { + e = addValidationError(MAX_WRITE_BUFFER_COUNT.getPreferredName() + " must be larger than 0", e); + } + if (maxWriteBufferSize != null && maxWriteBufferSize.compareTo(ByteSizeValue.ZERO) <= 0) { e = addValidationError(MAX_WRITE_BUFFER_SIZE.getPreferredName() + " must be larger than 0", e); } if (maxRetryDelay != null && maxRetryDelay.millis() <= 0) { @@ -217,7 +236,8 @@ public final class ResumeFollowAction extends Action { maxConcurrentReadBatches = in.readOptionalVInt(); maxBatchSize = in.readOptionalWriteable(ByteSizeValue::new); maxConcurrentWriteBatches = in.readOptionalVInt(); - maxWriteBufferSize = in.readOptionalVInt(); + maxWriteBufferCount = in.readOptionalVInt(); + maxWriteBufferSize = in.readOptionalWriteable(ByteSizeValue::new); maxRetryDelay = in.readOptionalTimeValue(); pollTimeout = in.readOptionalTimeValue(); } @@ -230,7 +250,8 @@ public final class ResumeFollowAction extends Action { out.writeOptionalVInt(maxConcurrentReadBatches); out.writeOptionalWriteable(maxBatchSize); out.writeOptionalVInt(maxConcurrentWriteBatches); - out.writeOptionalVInt(maxWriteBufferSize); + out.writeOptionalVInt(maxWriteBufferCount); + out.writeOptionalWriteable(maxWriteBufferSize); out.writeOptionalTimeValue(maxRetryDelay); out.writeOptionalTimeValue(pollTimeout); } @@ -253,8 +274,11 @@ public final class ResumeFollowAction extends Action { if (maxBatchSize != null) { builder.field(MAX_BATCH_SIZE.getPreferredName(), maxBatchSize.getStringRep()); } + if (maxWriteBufferCount != null) { + builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount); + } if (maxWriteBufferSize != null) { - builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); + builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep()); } if (maxConcurrentReadBatches != null) { builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); @@ -279,6 +303,7 @@ public final class ResumeFollowAction extends Action { Objects.equals(maxConcurrentReadBatches, request.maxConcurrentReadBatches) && Objects.equals(maxBatchSize, request.maxBatchSize) && Objects.equals(maxConcurrentWriteBatches, request.maxConcurrentWriteBatches) && + Objects.equals(maxWriteBufferCount, request.maxWriteBufferCount) && Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) && Objects.equals(maxRetryDelay, request.maxRetryDelay) && Objects.equals(pollTimeout, request.pollTimeout) && @@ -293,6 +318,7 @@ public final class ResumeFollowAction extends Action { maxConcurrentReadBatches, maxBatchSize, maxConcurrentWriteBatches, + maxWriteBufferCount, maxWriteBufferSize, maxRetryDelay, pollTimeout); diff --git a/x-pack/plugin/core/src/main/resources/monitoring-es.json b/x-pack/plugin/core/src/main/resources/monitoring-es.json index bad47024f05..4c2a9792440 100644 --- a/x-pack/plugin/core/src/main/resources/monitoring-es.json +++ b/x-pack/plugin/core/src/main/resources/monitoring-es.json @@ -965,6 +965,9 @@ "number_of_queued_writes": { "type": "long" }, + "buffer_size_in_bytes": { + "type": "long" + }, "mapping_version": { "type": "long" },