diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index a05dc0914e5..b5ba39ae7e2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -328,6 +328,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier { followRequest.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches()); followRequest.setMaxBatchSize(pattern.getMaxBatchSize()); followRequest.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches()); + followRequest.setMaxWriteBufferCount(pattern.getMaxWriteBufferCount()); followRequest.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize()); followRequest.setMaxRetryDelay(pattern.getMaxRetryDelay()); followRequest.setPollTimeout(pattern.getPollTimeout()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 19843ac4efb..9788195c7e5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -85,6 +85,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private long numberOfOperationsIndexed = 0; private long lastFetchTime = -1; private final Queue buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo)); + private long bufferSizeInBytes = 0; private final LinkedHashMap> fetchExceptions; private volatile ElasticsearchException fatalException; @@ -183,8 +184,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { params.getFollowShardId(), numConcurrentReads); return false; } - if (buffer.size() > params.getMaxWriteBufferSize()) { - LOGGER.trace("{} no new reads, buffer limit has been reached [{}]", params.getFollowShardId(), buffer.size()); + if (bufferSizeInBytes >= params.getMaxWriteBufferSize().getBytes()) { + LOGGER.trace("{} no new reads, buffer size limit has been reached [{}]", params.getFollowShardId(), bufferSizeInBytes); + return false; + } + if (buffer.size() > params.getMaxWriteBufferCount()) { + LOGGER.trace("{} no new reads, buffer count limit has been reached [{}]", params.getFollowShardId(), buffer.size()); return false; } return true; @@ -208,6 +213,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { break; } } + bufferSizeInBytes -= sumEstimatedSize; numConcurrentWrites++; LOGGER.trace("{}[{}] write [{}/{}] [{}]", params.getFollowShardId(), numConcurrentWrites, ops.get(0).seqNo(), ops.get(ops.size() - 1).seqNo(), ops.size()); @@ -281,7 +287,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { } else { assert response.getOperations()[0].seqNo() == from : "first operation is not what we asked for. From is [" + from + "], got " + response.getOperations()[0]; - buffer.addAll(Arrays.asList(response.getOperations())); + List operations = Arrays.asList(response.getOperations()); + long operationsSize = operations.stream() + .mapToLong(Translog.Operation::estimateSize) + .sum(); + buffer.addAll(operations); + bufferSizeInBytes += operationsSize; final long maxSeqNo = response.getOperations()[response.getOperations().length - 1].seqNo(); assert maxSeqNo == Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::seqNo).max().getAsLong(); @@ -455,6 +466,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { numConcurrentReads, numConcurrentWrites, buffer.size(), + bufferSizeInBytes, currentMappingVersion, totalFetchTimeMillis, totalFetchTookTimeMillis, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index ea75ee2d9e1..13e3da77491 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -48,6 +48,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { public static final ParseField MAX_CONCURRENT_READ_BATCHES = new ParseField("max_concurrent_read_batches"); public static final ParseField MAX_BATCH_SIZE = new ParseField("max_batch_size"); public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches"); + public static final ParseField MAX_WRITE_BUFFER_COUNT = new ParseField("max_write_buffer_count"); public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay"); public static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout"); @@ -56,7 +57,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { private static ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, (a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]), new ShardId((String) a[4], (String) a[5], (int) a[6]), (int) a[7], (int) a[8], (ByteSizeValue) a[9], - (int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (Map) a[14])); + (int) a[10], (int) a[11], (ByteSizeValue) a[12], (TimeValue) a[13], (TimeValue) a[14], (Map) a[15])); static { PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REMOTE_CLUSTER_FIELD); @@ -74,7 +75,12 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { MAX_BATCH_SIZE, ObjectParser.ValueType.STRING); PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_CONCURRENT_WRITE_BATCHES); - PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_BUFFER_SIZE); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_BUFFER_COUNT); + PARSER.declareField( + ConstructingObjectParser.constructorArg(), + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()), + MAX_WRITE_BUFFER_SIZE, + ObjectParser.ValueType.STRING); PARSER.declareField(ConstructingObjectParser.constructorArg(), (p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()), MAX_RETRY_DELAY, ObjectParser.ValueType.STRING); @@ -91,7 +97,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { private final int maxConcurrentReadBatches; private final ByteSizeValue maxBatchSize; private final int maxConcurrentWriteBatches; - private final int maxWriteBufferSize; + private final int maxWriteBufferCount; + private final ByteSizeValue maxWriteBufferSize; private final TimeValue maxRetryDelay; private final TimeValue pollTimeout; private final Map headers; @@ -104,7 +111,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { final int maxConcurrentReadBatches, final ByteSizeValue maxBatchSize, final int maxConcurrentWriteBatches, - final int maxWriteBufferSize, + final int maxWriteBufferCount, + final ByteSizeValue maxWriteBufferSize, final TimeValue maxRetryDelay, final TimeValue pollTimeout, final Map headers) { @@ -115,6 +123,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { this.maxConcurrentReadBatches = maxConcurrentReadBatches; this.maxBatchSize = maxBatchSize; this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; + this.maxWriteBufferCount = maxWriteBufferCount; this.maxWriteBufferSize = maxWriteBufferSize; this.maxRetryDelay = maxRetryDelay; this.pollTimeout = pollTimeout; @@ -129,7 +138,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { this.maxConcurrentReadBatches = in.readVInt(); this.maxBatchSize = new ByteSizeValue(in); this.maxConcurrentWriteBatches = in.readVInt(); - this.maxWriteBufferSize = in.readVInt(); + this.maxWriteBufferCount = in.readVInt(); + this.maxWriteBufferSize = new ByteSizeValue(in); this.maxRetryDelay = in.readTimeValue(); this.pollTimeout = in.readTimeValue(); this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); @@ -159,7 +169,11 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { return maxConcurrentWriteBatches; } - public int getMaxWriteBufferSize() { + public int getMaxWriteBufferCount() { + return maxWriteBufferCount; + } + + public ByteSizeValue getMaxWriteBufferSize() { return maxWriteBufferSize; } @@ -197,7 +211,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { out.writeVInt(maxConcurrentReadBatches); maxBatchSize.writeTo(out); out.writeVInt(maxConcurrentWriteBatches); - out.writeVInt(maxWriteBufferSize); + out.writeVInt(maxWriteBufferCount); + maxWriteBufferSize.writeTo(out); out.writeTimeValue(maxRetryDelay); out.writeTimeValue(pollTimeout); out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); @@ -221,7 +236,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); builder.field(MAX_BATCH_SIZE.getPreferredName(), maxBatchSize.getStringRep()); builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); - builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); + builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount); + builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep()); builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep()); builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep()); builder.field(HEADERS.getPreferredName(), headers); @@ -240,7 +256,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { maxConcurrentReadBatches == that.maxConcurrentReadBatches && maxConcurrentWriteBatches == that.maxConcurrentWriteBatches && maxBatchSize.equals(that.maxBatchSize) && - maxWriteBufferSize == that.maxWriteBufferSize && + maxWriteBufferCount == that.maxWriteBufferCount && + maxWriteBufferSize.equals(that.maxWriteBufferSize) && Objects.equals(maxRetryDelay, that.maxRetryDelay) && Objects.equals(pollTimeout, that.pollTimeout) && Objects.equals(headers, that.headers); @@ -256,6 +273,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { maxConcurrentReadBatches, maxConcurrentWriteBatches, maxBatchSize, + maxWriteBufferCount, maxWriteBufferSize, maxRetryDelay, pollTimeout, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index 8832275f9a9..79f1ed7a2ee 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -164,6 +164,7 @@ public class TransportPutAutoFollowPatternAction extends request.getMaxConcurrentReadBatches(), request.getMaxBatchSize(), request.getMaxConcurrentWriteBatches(), + request.getMaxWriteBufferCount(), request.getMaxWriteBufferSize(), request.getMaxRetryDelay(), request.getPollTimeout()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java index 53ac116d38e..97905f92721 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java @@ -55,7 +55,8 @@ public class TransportResumeFollowAction extends HandledTransportAction { + FollowStatsAction.StatsResponses statsResponses = + leaderClient().execute(FollowStatsAction.INSTANCE, new FollowStatsAction.StatsRequest()).actionGet(); + for (FollowStatsAction.StatsResponse statsResponse : statsResponses.getStatsResponses()) { + ShardFollowNodeTaskStatus status = statsResponse.status(); + assertThat(status.numberOfQueuedWrites(), equalTo(0)); + assertThat(status.bufferSize(), equalTo(0L)); + } + }); + } + static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception { CountDownLatch latch = new CountDownLatch(1); clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java index 169c009207c..611fb0c27fa 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java @@ -16,6 +16,8 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.LocalStateCcr; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; +import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.junit.After; @@ -26,6 +28,7 @@ import java.util.Collections; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.CcrIntegTestCase.removeCCRRelatedMetadataFromClusterState; +import static org.hamcrest.Matchers.equalTo; public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase { @@ -80,4 +83,16 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase { return request; } + protected void ensureEmptyWriteBuffers() throws Exception { + assertBusy(() -> { + FollowStatsAction.StatsResponses statsResponses = + client().execute(FollowStatsAction.INSTANCE, new FollowStatsAction.StatsRequest()).actionGet(); + for (FollowStatsAction.StatsResponse statsResponse : statsResponses.getStatsResponses()) { + ShardFollowNodeTaskStatus status = statsResponse.status(); + assertThat(status.numberOfQueuedWrites(), equalTo(0)); + assertThat(status.bufferSize(), equalTo(0L)); + } + }); + } + } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 8b3de8f5f8d..50e6008d91d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -126,7 +126,7 @@ public class AutoFollowIT extends CcrIntegTestCase { // Need to set this, because following an index in the same cluster request.setFollowIndexNamePattern("copy-{{leader_index}}"); if (randomBoolean()) { - request.setMaxWriteBufferSize(randomIntBetween(0, Integer.MAX_VALUE)); + request.setMaxWriteBufferCount(randomIntBetween(0, Integer.MAX_VALUE)); } if (randomBoolean()) { request.setMaxConcurrentReadBatches(randomIntBetween(0, Integer.MAX_VALUE)); @@ -137,6 +137,9 @@ public class AutoFollowIT extends CcrIntegTestCase { if (randomBoolean()) { request.setMaxBatchOperationCount(randomIntBetween(0, Integer.MAX_VALUE)); } + if (randomBoolean()) { + request.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong())); + } if (randomBoolean()) { request.setMaxBatchSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES)); } @@ -157,6 +160,9 @@ public class AutoFollowIT extends CcrIntegTestCase { ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTasksMetaData.tasks().iterator().next().getParams(); assertThat(shardFollowTask.getLeaderShardId().getIndexName(), equalTo("logs-201901")); assertThat(shardFollowTask.getFollowShardId().getIndexName(), equalTo("copy-logs-201901")); + if (request.getMaxWriteBufferCount() != null) { + assertThat(shardFollowTask.getMaxWriteBufferCount(), equalTo(request.getMaxWriteBufferCount())); + } if (request.getMaxWriteBufferSize() != null) { assertThat(shardFollowTask.getMaxWriteBufferSize(), equalTo(request.getMaxWriteBufferSize())); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java index 67071bd1be5..0ca175cef82 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java @@ -49,6 +49,7 @@ public class AutoFollowMetadataTests extends AbstractSerializingTestCase patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -121,7 +121,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = - new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); + new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -179,7 +179,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { .build(); AutoFollowPattern autoFollowPattern = - new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); + new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -242,7 +242,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { .build(); AutoFollowPattern autoFollowPattern = - new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); + new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -296,7 +296,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testGetLeaderIndicesToFollow() { AutoFollowPattern autoFollowPattern = - new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null); + new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null, null); Map> headers = new HashMap<>(); ClusterState followerState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, @@ -341,15 +341,15 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testGetFollowerIndexName() { AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null, - null, null, null, null, null, null); + null, null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("metrics-0")); autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-metrics-0", null, null, - null, null, null, null, null); + null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-{{leader_index}}", null, - null, null, null, null, null, null); + null, null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java index e67509f7ee8..301dabeef89 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java @@ -37,6 +37,7 @@ public class GetAutoFollowPatternResponseTests extends AbstractStreamableTestCas new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES), randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), + new ByteSizeValue(randomNonNegativeLong()), TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(500)); patterns.put(randomAlphaOfLength(4), autoFollowPattern); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java index 67957d1e366..e4e365312ad 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java @@ -66,7 +66,10 @@ public class PutAutoFollowPatternRequestTests extends AbstractStreamableXContent request.setMaxBatchSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES)); } if (randomBoolean()) { - request.setMaxWriteBufferSize(randomIntBetween(0, Integer.MAX_VALUE)); + request.setMaxWriteBufferCount(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong())); } return request; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ResumeFollowActionRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ResumeFollowActionRequestTests.java index 55c0c79e5b2..122082537fd 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ResumeFollowActionRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ResumeFollowActionRequestTests.java @@ -57,7 +57,10 @@ public class ResumeFollowActionRequestTests extends AbstractStreamableXContentTe request.setMaxBatchSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES)); } if (randomBoolean()) { - request.setMaxWriteBufferSize(randomIntBetween(1, Integer.MAX_VALUE)); + request.setMaxWriteBufferCount(randomIntBetween(1, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES)); } if (randomBoolean()) { request.setMaxRetryDelay(TimeValue.timeValueMillis(500)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index 50c0dd9ca49..8576bc28905 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.ccr.action; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.shard.ShardId; @@ -83,6 +85,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase { TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE, concurrency, 10240, + new ByteSizeValue(512, ByteSizeUnit.MB), TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap() diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java index a3881a6728f..93d9556d0e4 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java @@ -57,6 +57,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase< randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), randomReadExceptions(), randomLong(), randomBoolean() ? new ElasticsearchException("fatal error") : null); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 1988513c95d..a1582d4c2f1 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -63,7 +63,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { private Queue responseSizes; public void testCoordinateReads() { - ShardFollowNodeTask task = createShardFollowTask(8, between(8, 20), between(1, 20), Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = createShardFollowTask(8, between(8, 20), between(1, 20), Integer.MAX_VALUE, + new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 3, -1); task.coordinateReads(); assertThat(shardChangesRequests, contains(new long[]{0L, 8L})); // treat this a peak request @@ -77,9 +78,10 @@ public class ShardFollowNodeTaskTests extends ESTestCase { assertThat(status.lastRequestedSeqNo(), equalTo(60L)); } - public void testWriteBuffer() { + public void testMaxWriteBufferCount() { // Need to set concurrentWrites to 0, other the write buffer gets flushed immediately: - ShardFollowNodeTask task = createShardFollowTask(64, 1, 0, 32, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 0, 32, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); task.coordinateReads(); @@ -90,7 +92,30 @@ public class ShardFollowNodeTaskTests extends ESTestCase { shardChangesRequests.clear(); // Also invokes the coordinatesReads() method: task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 128L)); - assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer is full + assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer count limit has been reached + + ShardFollowNodeTaskStatus status = task.getStatus(); + assertThat(status.numberOfConcurrentReads(), equalTo(0)); + assertThat(status.numberOfConcurrentWrites(), equalTo(0)); + assertThat(status.lastRequestedSeqNo(), equalTo(63L)); + assertThat(status.leaderGlobalCheckpoint(), equalTo(128L)); + } + + public void testMaxWriteBufferSize() { + // Need to set concurrentWrites to 0, other the write buffer gets flushed immediately: + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 0, Integer.MAX_VALUE, new ByteSizeValue(1, ByteSizeUnit.KB), Long.MAX_VALUE); + startTask(task, 63, -1); + + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + + shardChangesRequests.clear(); + // Also invokes the coordinatesReads() method: + task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 128L)); + assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer size limit has been reached ShardFollowNodeTaskStatus status = task.getStatus(); assertThat(status.numberOfConcurrentReads(), equalTo(0)); @@ -100,7 +125,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testMaxConcurrentReads() { - ShardFollowNodeTask task = createShardFollowTask(8, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(8, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 64, -1); task.coordinateReads(); @@ -114,7 +140,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testTaskCancelled() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 64, -1); task.coordinateReads(); @@ -131,7 +158,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testTaskCancelledAfterReadLimitHasBeenReached() { - ShardFollowNodeTask task = createShardFollowTask(16, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(16, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 31, -1); task.coordinateReads(); @@ -155,7 +183,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testTaskCancelledAfterWriteBufferLimitHasBeenReached() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, 32, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, 32, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 64, -1); task.coordinateReads(); @@ -179,7 +208,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testReceiveRetryableError() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); int max = randomIntBetween(1, 30); @@ -229,7 +259,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testEmptyShardChangesResponseShouldClearFetchException() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, -1, -1); readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); @@ -258,7 +289,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testReceiveTimeout() { - final ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + final ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); final int numberOfTimeouts = randomIntBetween(1, 32); @@ -322,7 +354,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testReceiveNonRetryableError() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); Exception failure = new RuntimeException("replication failed"); @@ -362,7 +395,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testHandleReadResponse() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); task.coordinateReads(); @@ -383,7 +417,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testReceiveLessThanRequested() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); task.coordinateReads(); @@ -407,7 +442,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testCancelAndReceiveLessThanRequested() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); task.coordinateReads(); @@ -430,7 +466,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testReceiveNothingExpectedSomething() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); task.coordinateReads(); @@ -453,7 +490,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testMappingUpdate() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); mappingVersions.add(1L); @@ -474,7 +512,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testMappingUpdateRetryableError() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); int max = randomIntBetween(1, 30); @@ -499,7 +538,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testMappingUpdateNonRetryableError() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); mappingUpdateFailures.add(new RuntimeException()); @@ -518,7 +558,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testCoordinateWrites() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); task.coordinateReads(); @@ -542,7 +583,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testMaxConcurrentWrites() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 2, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 2, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -554,7 +596,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { ShardFollowNodeTaskStatus status = task.getStatus(); assertThat(status.numberOfConcurrentWrites(), equalTo(2)); - task = createShardFollowTask(64, 1, 4, Integer.MAX_VALUE, Long.MAX_VALUE); + task = createShardFollowTask(64, 1, 4, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); response = generateShardChangesResponse(0, 256, 0L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -570,7 +612,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testMaxBatchOperationCount() { - ShardFollowNodeTask task = createShardFollowTask(8, 1, 32, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(8, 1, 32, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -586,7 +629,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testRetryableError() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); task.coordinateReads(); @@ -614,7 +658,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testNonRetryableError() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); task.coordinateReads(); @@ -636,7 +681,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testMaxBatchBytesLimit() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 128, Integer.MAX_VALUE, 1L); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 128, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), 1L); startTask(task, 64, -1); task.coordinateReads(); @@ -652,7 +698,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } public void testHandleWriteResponse() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + ShardFollowNodeTask task = + createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, new ByteSizeValue(512, ByteSizeUnit.MB), Long.MAX_VALUE); startTask(task, 63, -1); task.coordinateReads(); @@ -698,7 +745,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { private ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxConcurrentReadBatches, int maxConcurrentWriteBatches, - int bufferWriteLimit, + int maxWriteBufferCount, + ByteSizeValue maxWriteBufferSize, long maxBatchSizeInBytes) { AtomicBoolean stopped = new AtomicBoolean(false); ShardFollowTask params = new ShardFollowTask( @@ -709,7 +757,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { maxConcurrentReadBatches, new ByteSizeValue(maxBatchSizeInBytes, ByteSizeUnit.BYTES), maxConcurrentWriteBatches, - bufferWriteLimit, + maxWriteBufferCount, + maxWriteBufferSize, TimeValue.ZERO, TimeValue.ZERO, Collections.emptyMap() diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 96bc2f04f59..07c3121eba4 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -370,7 +370,9 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest between(1, 64), between(1, 8), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES), - between(1, 4), 10240, + between(1, 4), + 10240, + new ByteSizeValue(512, ByteSizeUnit.MB), TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap() diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java index 865d18e6067..e955f77d733 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java @@ -34,6 +34,7 @@ public class ShardFollowTaskTests extends AbstractSerializingTestCase existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); existingAutoFollowPatterns.put("name1", - new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null)); + new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null)); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); @@ -44,7 +44,7 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase { List existingPatterns = new ArrayList<>(); existingPatterns.add("logs-*"); existingAutoFollowPatterns.put("name2", - new AutoFollowPattern("asia_cluster", existingPatterns, null, null, null, null, null, null, null, null)); + new AutoFollowPattern("asia_cluster", existingPatterns, null, null, null, null, null, null, null, null, null)); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); @@ -78,7 +78,7 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase { List existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); existingAutoFollowPatterns.put("name1", - new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null)); + new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null)); existingHeaders.put("key", Collections.singletonMap("key", "val")); } ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster")) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java index ffc2d115091..128474bbc30 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java @@ -23,10 +23,10 @@ public class TransportGetAutoFollowPatternActionTests extends ESTestCase { public void testGetAutoFollowPattern() { Map patterns = new HashMap<>(); - patterns.put("name1", - new AutoFollowPattern("test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null)); - patterns.put("name2", - new AutoFollowPattern("test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null)); + patterns.put("name1", new AutoFollowPattern( + "test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null, null)); + patterns.put("name2", new AutoFollowPattern( + "test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null, null)); MetaData metaData = MetaData.builder() .putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap())) .build(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java index 6d4ef138fb4..433ef402af8 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java @@ -103,7 +103,7 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase { List existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); existingAutoFollowPatterns.put("name1", - new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null)); + new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null)); Map> existingAlreadyFollowedIndexUUIDS = new HashMap<>(); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java index 07b0fc078ac..82cbe2622b7 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java @@ -11,6 +11,8 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -81,6 +83,7 @@ public class TransportUnfollowActionTests extends ESTestCase { TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE, 1, 10240, + new ByteSizeValue(512, ByteSizeUnit.MB), TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap() diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java index 4b36005de36..9b4ed7c8a97 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java @@ -92,6 +92,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase entry : serializedStatus.entrySet()) { String fieldName = entry.getKey(); Map fieldMapping = (Map) followStatsMapping.get(fieldName); - assertThat(fieldMapping, notNullValue()); + assertThat("no field mapping for field [" + fieldName + "]", fieldMapping, notNullValue()); Object fieldValue = entry.getValue(); String fieldType = (String) fieldMapping.get("type"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java index 5234151010c..8172612b78f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java @@ -182,6 +182,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i public static final ParseField MAX_CONCURRENT_READ_BATCHES = new ParseField("max_concurrent_read_batches"); public static final ParseField MAX_BATCH_SIZE = new ParseField("max_batch_size"); public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches"); + public static final ParseField MAX_WRITE_BUFFER_COUNT = new ParseField("max_write_buffer_count"); public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay"); public static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout"); @@ -190,8 +191,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("auto_follow_pattern", args -> new AutoFollowPattern((String) args[0], (List) args[1], (String) args[2], (Integer) args[3], - (Integer) args[4], (ByteSizeValue) args[5], (Integer) args[6], (Integer) args[7], (TimeValue) args[8], - (TimeValue) args[9])); + (Integer) args[4], (ByteSizeValue) args[5], (Integer) args[6], (Integer) args[7], (ByteSizeValue) args[8], + (TimeValue) args[9], (TimeValue) args[10])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), REMOTE_CLUSTER_FIELD); @@ -205,7 +206,12 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i MAX_BATCH_SIZE, ObjectParser.ValueType.STRING); PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_CONCURRENT_WRITE_BATCHES); - PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_WRITE_BUFFER_SIZE); + PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_WRITE_BUFFER_COUNT); + PARSER.declareField( + ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()), + MAX_WRITE_BUFFER_SIZE, + ObjectParser.ValueType.STRING); PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()), MAX_RETRY_DELAY, ObjectParser.ValueType.STRING); @@ -221,7 +227,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i private final Integer maxConcurrentReadBatches; private final ByteSizeValue maxBatchSize; private final Integer maxConcurrentWriteBatches; - private final Integer maxWriteBufferSize; + private final Integer maxWriteBufferCount; + private final ByteSizeValue maxWriteBufferSize; private final TimeValue maxRetryDelay; private final TimeValue pollTimeout; @@ -232,8 +239,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i Integer maxConcurrentReadBatches, ByteSizeValue maxBatchSize, Integer maxConcurrentWriteBatches, - Integer maxWriteBufferSize, - TimeValue maxRetryDelay, + Integer maxWriteBufferCount, + ByteSizeValue maxWriteBufferSize, TimeValue maxRetryDelay, TimeValue pollTimeout) { this.remoteCluster = remoteCluster; this.leaderIndexPatterns = leaderIndexPatterns; @@ -242,6 +249,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i this.maxConcurrentReadBatches = maxConcurrentReadBatches; this.maxBatchSize = maxBatchSize; this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; + this.maxWriteBufferCount = maxWriteBufferCount; this.maxWriteBufferSize = maxWriteBufferSize; this.maxRetryDelay = maxRetryDelay; this.pollTimeout = pollTimeout; @@ -255,7 +263,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i maxConcurrentReadBatches = in.readOptionalVInt(); maxBatchSize = in.readOptionalWriteable(ByteSizeValue::new); maxConcurrentWriteBatches = in.readOptionalVInt(); - maxWriteBufferSize = in.readOptionalVInt(); + maxWriteBufferCount = in.readOptionalVInt(); + maxWriteBufferSize = in.readOptionalWriteable(ByteSizeValue::new); maxRetryDelay = in.readOptionalTimeValue(); pollTimeout = in.readOptionalTimeValue(); } @@ -296,7 +305,11 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i return maxConcurrentWriteBatches; } - public Integer getMaxWriteBufferSize() { + public Integer getMaxWriteBufferCount() { + return maxWriteBufferCount; + } + + public ByteSizeValue getMaxWriteBufferSize() { return maxWriteBufferSize; } @@ -317,7 +330,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i out.writeOptionalVInt(maxConcurrentReadBatches); out.writeOptionalWriteable(maxBatchSize); out.writeOptionalVInt(maxConcurrentWriteBatches); - out.writeOptionalVInt(maxWriteBufferSize); + out.writeOptionalVInt(maxWriteBufferCount); + out.writeOptionalWriteable(maxWriteBufferSize); out.writeOptionalTimeValue(maxRetryDelay); out.writeOptionalTimeValue(pollTimeout); } @@ -341,8 +355,11 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i if (maxConcurrentWriteBatches != null) { builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); } - if (maxWriteBufferSize != null){ - builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); + if (maxWriteBufferCount != null){ + builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount); + } + if (maxWriteBufferSize != null) { + builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep()); } if (maxRetryDelay != null) { builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay); @@ -370,6 +387,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i Objects.equals(maxConcurrentReadBatches, that.maxConcurrentReadBatches) && Objects.equals(maxBatchSize, that.maxBatchSize) && Objects.equals(maxConcurrentWriteBatches, that.maxConcurrentWriteBatches) && + Objects.equals(maxWriteBufferCount, that.maxWriteBufferCount) && Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) && Objects.equals(maxRetryDelay, that.maxRetryDelay) && Objects.equals(pollTimeout, that.pollTimeout); @@ -385,6 +403,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i maxConcurrentReadBatches, maxBatchSize, maxConcurrentWriteBatches, + maxWriteBufferCount, maxWriteBufferSize, maxRetryDelay, pollTimeout); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java index 5869c78bc73..e2e907f80d1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java @@ -46,6 +46,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { private static final ParseField NUMBER_OF_CONCURRENT_READS_FIELD = new ParseField("number_of_concurrent_reads"); private static final ParseField NUMBER_OF_CONCURRENT_WRITES_FIELD = new ParseField("number_of_concurrent_writes"); private static final ParseField NUMBER_OF_QUEUED_WRITES_FIELD = new ParseField("number_of_queued_writes"); + private static final ParseField BUFFER_SIZE_IN_BYTES_FIELD = new ParseField("buffer_size_in_bytes"); private static final ParseField MAPPING_VERSION_FIELD = new ParseField("mapping_version"); private static final ParseField TOTAL_FETCH_TIME_MILLIS_FIELD = new ParseField("total_fetch_time_millis"); private static final ParseField TOTAL_FETCH_REMOTE_TIME_MILLIS_FIELD = new ParseField("total_fetch_remote_time_millis"); @@ -89,12 +90,13 @@ public class ShardFollowNodeTaskStatus implements Task.Status { (long) args[20], (long) args[21], (long) args[22], + (long) args[23], new TreeMap<>( - ((List>>) args[23]) + ((List>>) args[24]) .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), - (long) args[24], - (ElasticsearchException) args[25])); + (long) args[25], + (ElasticsearchException) args[26])); public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry"; @@ -116,6 +118,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD); STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD); STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUFFER_SIZE_IN_BYTES_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAPPING_VERSION_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_REMOTE_TIME_MILLIS_FIELD); @@ -219,6 +222,12 @@ public class ShardFollowNodeTaskStatus implements Task.Status { return numberOfQueuedWrites; } + private final long bufferSize; + + public long bufferSize() { + return bufferSize; + } + private final long mappingVersion; public long mappingVersion() { @@ -316,6 +325,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { final int numberOfConcurrentReads, final int numberOfConcurrentWrites, final int numberOfQueuedWrites, + final long bufferSize, final long mappingVersion, final long totalFetchTimeMillis, final long totalFetchRemoteTimeMillis, @@ -342,6 +352,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { this.numberOfConcurrentReads = numberOfConcurrentReads; this.numberOfConcurrentWrites = numberOfConcurrentWrites; this.numberOfQueuedWrites = numberOfQueuedWrites; + this.bufferSize = bufferSize; this.mappingVersion = mappingVersion; this.totalFetchTimeMillis = totalFetchTimeMillis; this.totalFetchRemoteTimeMillis = totalFetchRemoteTimeMillis; @@ -371,6 +382,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { this.numberOfConcurrentReads = in.readVInt(); this.numberOfConcurrentWrites = in.readVInt(); this.numberOfQueuedWrites = in.readVInt(); + this.bufferSize = in.readVLong(); this.mappingVersion = in.readVLong(); this.totalFetchTimeMillis = in.readVLong(); this.totalFetchRemoteTimeMillis = in.readVLong(); @@ -407,6 +419,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { out.writeVInt(numberOfConcurrentReads); out.writeVInt(numberOfConcurrentWrites); out.writeVInt(numberOfQueuedWrites); + out.writeVLong(bufferSize); out.writeVLong(mappingVersion); out.writeVLong(totalFetchTimeMillis); out.writeVLong(totalFetchRemoteTimeMillis); @@ -452,6 +465,10 @@ public class ShardFollowNodeTaskStatus implements Task.Status { builder.field(NUMBER_OF_CONCURRENT_READS_FIELD.getPreferredName(), numberOfConcurrentReads); builder.field(NUMBER_OF_CONCURRENT_WRITES_FIELD.getPreferredName(), numberOfConcurrentWrites); builder.field(NUMBER_OF_QUEUED_WRITES_FIELD.getPreferredName(), numberOfQueuedWrites); + builder.humanReadableField( + BUFFER_SIZE_IN_BYTES_FIELD.getPreferredName(), + "buffer_size", + new ByteSizeValue(bufferSize)); builder.field(MAPPING_VERSION_FIELD.getPreferredName(), mappingVersion); builder.humanReadableField( TOTAL_FETCH_TIME_MILLIS_FIELD.getPreferredName(), @@ -531,6 +548,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { numberOfConcurrentReads == that.numberOfConcurrentReads && numberOfConcurrentWrites == that.numberOfConcurrentWrites && numberOfQueuedWrites == that.numberOfQueuedWrites && + bufferSize == that.bufferSize && mappingVersion == that.mappingVersion && totalFetchTimeMillis == that.totalFetchTimeMillis && totalFetchRemoteTimeMillis == that.totalFetchRemoteTimeMillis && @@ -568,6 +586,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { numberOfConcurrentReads, numberOfConcurrentWrites, numberOfQueuedWrites, + bufferSize, mappingVersion, totalFetchTimeMillis, totalFetchRemoteTimeMillis, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutAutoFollowPatternAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutAutoFollowPatternAction.java index 5a87666d050..8010c9bf344 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutAutoFollowPatternAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutAutoFollowPatternAction.java @@ -60,7 +60,12 @@ public class PutAutoFollowPatternAction extends Action { AutoFollowPattern.MAX_BATCH_SIZE, ObjectParser.ValueType.STRING); PARSER.declareInt(Request::setMaxConcurrentWriteBatches, AutoFollowPattern.MAX_CONCURRENT_WRITE_BATCHES); - PARSER.declareInt(Request::setMaxWriteBufferSize, AutoFollowPattern.MAX_WRITE_BUFFER_SIZE); + PARSER.declareInt(Request::setMaxWriteBufferCount, AutoFollowPattern.MAX_WRITE_BUFFER_COUNT); + PARSER.declareField( + Request::setMaxWriteBufferSize, + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), AutoFollowPattern.MAX_WRITE_BUFFER_SIZE.getPreferredName()), + AutoFollowPattern.MAX_WRITE_BUFFER_SIZE, + ObjectParser.ValueType.STRING); PARSER.declareField(Request::setMaxRetryDelay, (p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.MAX_RETRY_DELAY.getPreferredName()), AutoFollowPattern.MAX_RETRY_DELAY, ObjectParser.ValueType.STRING); @@ -92,7 +97,8 @@ public class PutAutoFollowPatternAction extends Action { private Integer maxConcurrentReadBatches; private ByteSizeValue maxBatchSize; private Integer maxConcurrentWriteBatches; - private Integer maxWriteBufferSize; + private Integer maxWriteBufferCount; + private ByteSizeValue maxWriteBufferSize; private TimeValue maxRetryDelay; private TimeValue pollTimeout; @@ -190,11 +196,19 @@ public class PutAutoFollowPatternAction extends Action { this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; } - public Integer getMaxWriteBufferSize() { + public Integer getMaxWriteBufferCount() { + return maxWriteBufferCount; + } + + public void setMaxWriteBufferCount(Integer maxWriteBufferCount) { + this.maxWriteBufferCount = maxWriteBufferCount; + } + + public ByteSizeValue getMaxWriteBufferSize() { return maxWriteBufferSize; } - public void setMaxWriteBufferSize(Integer maxWriteBufferSize) { + public void setMaxWriteBufferSize(ByteSizeValue maxWriteBufferSize) { this.maxWriteBufferSize = maxWriteBufferSize; } @@ -225,7 +239,8 @@ public class PutAutoFollowPatternAction extends Action { maxConcurrentReadBatches = in.readOptionalVInt(); maxBatchSize = in.readOptionalWriteable(ByteSizeValue::new); maxConcurrentWriteBatches = in.readOptionalVInt(); - maxWriteBufferSize = in.readOptionalVInt(); + maxWriteBufferCount = in.readOptionalVInt(); + maxWriteBufferSize = in.readOptionalWriteable(ByteSizeValue::new); maxRetryDelay = in.readOptionalTimeValue(); pollTimeout = in.readOptionalTimeValue(); } @@ -241,7 +256,8 @@ public class PutAutoFollowPatternAction extends Action { out.writeOptionalVInt(maxConcurrentReadBatches); out.writeOptionalWriteable(maxBatchSize); out.writeOptionalVInt(maxConcurrentWriteBatches); - out.writeOptionalVInt(maxWriteBufferSize); + out.writeOptionalVInt(maxWriteBufferCount); + out.writeOptionalWriteable(maxWriteBufferSize); out.writeOptionalTimeValue(maxRetryDelay); out.writeOptionalTimeValue(pollTimeout); } @@ -262,8 +278,11 @@ public class PutAutoFollowPatternAction extends Action { if (maxBatchSize != null) { builder.field(AutoFollowPattern.MAX_BATCH_SIZE.getPreferredName(), maxBatchSize.getStringRep()); } + if (maxWriteBufferCount != null) { + builder.field(AutoFollowPattern.MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount); + } if (maxWriteBufferSize != null) { - builder.field(AutoFollowPattern.MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); + builder.field(AutoFollowPattern.MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep()); } if (maxConcurrentReadBatches != null) { builder.field(AutoFollowPattern.MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); @@ -295,6 +314,7 @@ public class PutAutoFollowPatternAction extends Action { Objects.equals(maxConcurrentReadBatches, request.maxConcurrentReadBatches) && Objects.equals(maxBatchSize, request.maxBatchSize) && Objects.equals(maxConcurrentWriteBatches, request.maxConcurrentWriteBatches) && + Objects.equals(maxWriteBufferCount, request.maxWriteBufferCount) && Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) && Objects.equals(maxRetryDelay, request.maxRetryDelay) && Objects.equals(pollTimeout, request.pollTimeout); @@ -311,6 +331,7 @@ public class PutAutoFollowPatternAction extends Action { maxConcurrentReadBatches, maxBatchSize, maxConcurrentWriteBatches, + maxWriteBufferCount, maxWriteBufferSize, maxRetryDelay, pollTimeout); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java index 28895a59073..0f36af4db10 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java @@ -32,6 +32,7 @@ import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_CONCURRENT_READ_BATCHES; import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_CONCURRENT_WRITE_BATCHES; import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_RETRY_DELAY_FIELD; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_COUNT; import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_SIZE; import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.POLL_TIMEOUT; @@ -72,7 +73,12 @@ public final class PutFollowAction extends Action { MAX_BATCH_SIZE, ObjectParser.ValueType.STRING); PARSER.declareInt((request, value) -> request.followRequest.setMaxConcurrentWriteBatches(value), MAX_CONCURRENT_WRITE_BATCHES); - PARSER.declareInt((request, value) -> request.followRequest.setMaxWriteBufferSize(value), MAX_WRITE_BUFFER_SIZE); + PARSER.declareInt((request, value) -> request.followRequest.setMaxWriteBufferCount(value), MAX_WRITE_BUFFER_COUNT); + PARSER.declareField( + (request, value) -> request.followRequest.setMaxWriteBufferSize(value), + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()), + MAX_WRITE_BUFFER_SIZE, + ObjectParser.ValueType.STRING); PARSER.declareField( (request, value) -> request.followRequest.setMaxRetryDelay(value), (p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY_FIELD.getPreferredName()), diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java index 127ccf7610f..587223e3fbc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java @@ -48,6 +48,7 @@ public final class ResumeFollowAction extends Action { static final ParseField MAX_CONCURRENT_READ_BATCHES = new ParseField("max_concurrent_read_batches"); static final ParseField MAX_BATCH_SIZE = new ParseField("max_batch_size"); static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches"); + static final ParseField MAX_WRITE_BUFFER_COUNT = new ParseField("max_write_buffer_count"); static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay"); static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout"); @@ -63,11 +64,16 @@ public final class ResumeFollowAction extends Action { MAX_BATCH_SIZE, ObjectParser.ValueType.STRING); PARSER.declareInt(Request::setMaxConcurrentWriteBatches, MAX_CONCURRENT_WRITE_BATCHES); - PARSER.declareInt(Request::setMaxWriteBufferSize, MAX_WRITE_BUFFER_SIZE); + PARSER.declareInt(Request::setMaxWriteBufferCount, MAX_WRITE_BUFFER_COUNT); + PARSER.declareField( + Request::setMaxWriteBufferSize, + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()), + MAX_WRITE_BUFFER_SIZE, + ObjectParser.ValueType.STRING); PARSER.declareField( Request::setMaxRetryDelay, (p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY_FIELD.getPreferredName()), - MAX_RETRY_DELAY_FIELD, + MAX_RETRY_DELAY_FIELD, ObjectParser.ValueType.STRING); PARSER.declareField( Request::setPollTimeout, @@ -140,13 +146,23 @@ public final class ResumeFollowAction extends Action { this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; } - private Integer maxWriteBufferSize; + private Integer maxWriteBufferCount; - public Integer getMaxWriteBufferSize() { + public Integer getMaxWriteBufferCount() { + return maxWriteBufferCount; + } + + public void setMaxWriteBufferCount(Integer maxWriteBufferCount) { + this.maxWriteBufferCount = maxWriteBufferCount; + } + + private ByteSizeValue maxWriteBufferSize; + + public ByteSizeValue getMaxWriteBufferSize() { return maxWriteBufferSize; } - public void setMaxWriteBufferSize(Integer maxWriteBufferSize) { + public void setMaxWriteBufferSize(ByteSizeValue maxWriteBufferSize) { this.maxWriteBufferSize = maxWriteBufferSize; } @@ -192,7 +208,10 @@ public final class ResumeFollowAction extends Action { if (maxConcurrentWriteBatches != null && maxConcurrentWriteBatches < 1) { e = addValidationError(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName() + " must be larger than 0", e); } - if (maxWriteBufferSize != null && maxWriteBufferSize < 1) { + if (maxWriteBufferCount != null && maxWriteBufferCount < 1) { + e = addValidationError(MAX_WRITE_BUFFER_COUNT.getPreferredName() + " must be larger than 0", e); + } + if (maxWriteBufferSize != null && maxWriteBufferSize.compareTo(ByteSizeValue.ZERO) <= 0) { e = addValidationError(MAX_WRITE_BUFFER_SIZE.getPreferredName() + " must be larger than 0", e); } if (maxRetryDelay != null && maxRetryDelay.millis() <= 0) { @@ -217,7 +236,8 @@ public final class ResumeFollowAction extends Action { maxConcurrentReadBatches = in.readOptionalVInt(); maxBatchSize = in.readOptionalWriteable(ByteSizeValue::new); maxConcurrentWriteBatches = in.readOptionalVInt(); - maxWriteBufferSize = in.readOptionalVInt(); + maxWriteBufferCount = in.readOptionalVInt(); + maxWriteBufferSize = in.readOptionalWriteable(ByteSizeValue::new); maxRetryDelay = in.readOptionalTimeValue(); pollTimeout = in.readOptionalTimeValue(); } @@ -230,7 +250,8 @@ public final class ResumeFollowAction extends Action { out.writeOptionalVInt(maxConcurrentReadBatches); out.writeOptionalWriteable(maxBatchSize); out.writeOptionalVInt(maxConcurrentWriteBatches); - out.writeOptionalVInt(maxWriteBufferSize); + out.writeOptionalVInt(maxWriteBufferCount); + out.writeOptionalWriteable(maxWriteBufferSize); out.writeOptionalTimeValue(maxRetryDelay); out.writeOptionalTimeValue(pollTimeout); } @@ -253,8 +274,11 @@ public final class ResumeFollowAction extends Action { if (maxBatchSize != null) { builder.field(MAX_BATCH_SIZE.getPreferredName(), maxBatchSize.getStringRep()); } + if (maxWriteBufferCount != null) { + builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount); + } if (maxWriteBufferSize != null) { - builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); + builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep()); } if (maxConcurrentReadBatches != null) { builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); @@ -279,6 +303,7 @@ public final class ResumeFollowAction extends Action { Objects.equals(maxConcurrentReadBatches, request.maxConcurrentReadBatches) && Objects.equals(maxBatchSize, request.maxBatchSize) && Objects.equals(maxConcurrentWriteBatches, request.maxConcurrentWriteBatches) && + Objects.equals(maxWriteBufferCount, request.maxWriteBufferCount) && Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) && Objects.equals(maxRetryDelay, request.maxRetryDelay) && Objects.equals(pollTimeout, request.pollTimeout) && @@ -293,6 +318,7 @@ public final class ResumeFollowAction extends Action { maxConcurrentReadBatches, maxBatchSize, maxConcurrentWriteBatches, + maxWriteBufferCount, maxWriteBufferSize, maxRetryDelay, pollTimeout); diff --git a/x-pack/plugin/core/src/main/resources/monitoring-es.json b/x-pack/plugin/core/src/main/resources/monitoring-es.json index bad47024f05..4c2a9792440 100644 --- a/x-pack/plugin/core/src/main/resources/monitoring-es.json +++ b/x-pack/plugin/core/src/main/resources/monitoring-es.json @@ -965,6 +965,9 @@ "number_of_queued_writes": { "type": "long" }, + "buffer_size_in_bytes": { + "type": "long" + }, "mapping_version": { "type": "long" },