Count coordinating and primary bytes as write bytes (#58984)
This is a follow-up to #57573. This commit combines coordinating and primary bytes under the same "write" bucket. Double accounting is prevented by only accounting the bytes at either the reroute phase or the primary phase. TransportBulkAction calls execute directly, so the operations handler is skipped and the bytes are not double accounted.
This commit is contained in:
parent
2c43421208
commit
dc9e364ff2
|
@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
|||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -42,9 +43,11 @@ import java.util.stream.Stream;
|
|||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2)
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2, numClientNodes = 1, transportClientRatio = 0.0D)
|
||||
public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
||||
|
||||
public static final String INDEX_NAME = "test";
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder()
|
||||
|
@ -69,15 +72,13 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
return 1;
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/58983")
|
||||
public void testWriteBytesAreIncremented() throws Exception {
|
||||
final String index = "test";
|
||||
assertAcked(prepareCreate(index, Settings.builder()
|
||||
assertAcked(prepareCreate(INDEX_NAME, Settings.builder()
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)));
|
||||
ensureGreen(index);
|
||||
ensureGreen(INDEX_NAME);
|
||||
|
||||
IndicesStatsResponse response = client().admin().indices().prepareStats(index).get();
|
||||
IndicesStatsResponse response = client().admin().indices().prepareStats(INDEX_NAME).get();
|
||||
String primaryId = Stream.of(response.getShards())
|
||||
.map(ShardStats::getShardRouting)
|
||||
.filter(ShardRouting::primary)
|
||||
|
@ -90,8 +91,10 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
.findAny()
|
||||
.get()
|
||||
.currentNodeId();
|
||||
String primaryName = client().admin().cluster().prepareState().get().getState().nodes().get(primaryId).getName();
|
||||
String replicaName = client().admin().cluster().prepareState().get().getState().nodes().get(replicaId).getName();
|
||||
DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes();
|
||||
String primaryName = nodes.get(primaryId).getName();
|
||||
String replicaName = nodes.get(replicaId).getName();
|
||||
String coordinatingOnlyNode = nodes.getCoordinatingOnlyNodes().iterator().next().value.getName();
|
||||
|
||||
final CountDownLatch replicationSendPointReached = new CountDownLatch(1);
|
||||
final CountDownLatch latchBlockingReplicationSend = new CountDownLatch(1);
|
||||
|
@ -118,7 +121,7 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
final BulkRequest bulkRequest = new BulkRequest();
|
||||
int totalRequestSize = 0;
|
||||
for (int i = 0; i < 80; ++i) {
|
||||
IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID())
|
||||
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
|
||||
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
|
||||
totalRequestSize += request.ramBytesUsed();
|
||||
assertTrue(request.ramBytesUsed() > request.source().length());
|
||||
|
@ -129,18 +132,19 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
final long bulkShardRequestSize = totalRequestSize;
|
||||
|
||||
try {
|
||||
final ActionFuture<BulkResponse> successFuture = client(replicaName).bulk(bulkRequest);
|
||||
final ActionFuture<BulkResponse> successFuture = client(coordinatingOnlyNode).bulk(bulkRequest);
|
||||
replicationSendPointReached.await();
|
||||
|
||||
WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName);
|
||||
WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName);
|
||||
WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode);
|
||||
|
||||
assertThat(primaryWriteLimits.getCoordinatingBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertThat(primaryWriteLimits.getPrimaryBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertEquals(0, primaryWriteLimits.getReplicaBytes());
|
||||
assertEquals(bulkRequestSize, replicaWriteLimits.getCoordinatingBytes());
|
||||
assertEquals(0, replicaWriteLimits.getPrimaryBytes());
|
||||
assertEquals(0, replicaWriteLimits.getReplicaBytes());
|
||||
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
|
||||
|
||||
ThreadPool replicaThreadPool = replicaTransportService.getThreadPool();
|
||||
// Block the replica Write thread pool
|
||||
|
@ -163,18 +167,32 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
newActionsSendPointReached.await();
|
||||
latchBlockingReplicationSend.countDown();
|
||||
|
||||
IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID())
|
||||
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
|
||||
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
|
||||
final BulkRequest secondBulkRequest = new BulkRequest();
|
||||
secondBulkRequest.add(request);
|
||||
|
||||
ActionFuture<BulkResponse> secondFuture = client(replicaName).bulk(secondBulkRequest);
|
||||
// Use the primary or the replica data node as the coordinating node this time
|
||||
boolean usePrimaryAsCoordinatingNode = randomBoolean();
|
||||
final ActionFuture<BulkResponse> secondFuture;
|
||||
if (usePrimaryAsCoordinatingNode) {
|
||||
secondFuture = client(primaryName).bulk(secondBulkRequest);
|
||||
} else {
|
||||
secondFuture = client(replicaName).bulk(secondBulkRequest);
|
||||
}
|
||||
|
||||
final long secondBulkRequestSize = secondBulkRequest.ramBytesUsed();
|
||||
final long secondBulkShardRequestSize = request.ramBytesUsed();
|
||||
|
||||
assertBusy(() -> assertEquals(bulkRequestSize + secondBulkRequestSize, replicaWriteLimits.getCoordinatingBytes()));
|
||||
assertBusy(() -> assertThat(replicaWriteLimits.getReplicaBytes(),
|
||||
if (usePrimaryAsCoordinatingNode) {
|
||||
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize + secondBulkRequestSize));
|
||||
assertEquals(0, replicaWriteLimits.getWriteBytes());
|
||||
} else {
|
||||
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
|
||||
assertEquals(secondBulkRequestSize, replicaWriteLimits.getWriteBytes());
|
||||
}
|
||||
assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes());
|
||||
assertBusy(() -> assertThat(replicaWriteLimits.getReplicaWriteBytes(),
|
||||
greaterThan(bulkShardRequestSize + secondBulkShardRequestSize)));
|
||||
|
||||
latchBlockingReplication.countDown();
|
||||
|
@ -182,12 +200,12 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
|
|||
successFuture.actionGet();
|
||||
secondFuture.actionGet();
|
||||
|
||||
assertEquals(0, primaryWriteLimits.getCoordinatingBytes());
|
||||
assertEquals(0, primaryWriteLimits.getPrimaryBytes());
|
||||
assertEquals(0, primaryWriteLimits.getReplicaBytes());
|
||||
assertEquals(0, replicaWriteLimits.getCoordinatingBytes());
|
||||
assertEquals(0, replicaWriteLimits.getPrimaryBytes());
|
||||
assertEquals(0, replicaWriteLimits.getReplicaBytes());
|
||||
assertEquals(0, primaryWriteLimits.getWriteBytes());
|
||||
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getWriteBytes());
|
||||
assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getWriteBytes());
|
||||
assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
|
||||
} finally {
|
||||
if (replicationSendPointReached.getCount() > 0) {
|
||||
replicationSendPointReached.countDown();
|
||||
|
|
|
@ -166,7 +166,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
@Override
|
||||
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
|
||||
long indexingBytes = bulkRequest.ramBytesUsed();
|
||||
final Releasable releasable = writeMemoryLimits.markCoordinatingOperationStarted(indexingBytes);
|
||||
final Releasable releasable = writeMemoryLimits.markWriteOperationStarted(indexingBytes);
|
||||
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
|
||||
try {
|
||||
doInternalExecute(task, bulkRequest, releasingListener);
|
||||
|
|
|
@ -25,34 +25,24 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
|
||||
public class WriteMemoryLimits {
|
||||
|
||||
private final AtomicLong coordinatingBytes = new AtomicLong(0);
|
||||
private final AtomicLong primaryBytes = new AtomicLong(0);
|
||||
private final AtomicLong replicaBytes = new AtomicLong(0);
|
||||
private final AtomicLong writeBytes = new AtomicLong(0);
|
||||
private final AtomicLong replicaWriteBytes = new AtomicLong(0);
|
||||
|
||||
public Releasable markCoordinatingOperationStarted(long bytes) {
|
||||
coordinatingBytes.addAndGet(bytes);
|
||||
return () -> coordinatingBytes.getAndAdd(-bytes);
|
||||
public Releasable markWriteOperationStarted(long bytes) {
|
||||
writeBytes.addAndGet(bytes);
|
||||
return () -> writeBytes.getAndAdd(-bytes);
|
||||
}
|
||||
|
||||
public long getCoordinatingBytes() {
|
||||
return coordinatingBytes.get();
|
||||
public long getWriteBytes() {
|
||||
return writeBytes.get();
|
||||
}
|
||||
|
||||
public Releasable markPrimaryOperationStarted(long bytes) {
|
||||
primaryBytes.addAndGet(bytes);
|
||||
return () -> primaryBytes.getAndAdd(-bytes);
|
||||
public Releasable markReplicaWriteStarted(long bytes) {
|
||||
replicaWriteBytes.getAndAdd(bytes);
|
||||
return () -> replicaWriteBytes.getAndAdd(-bytes);
|
||||
}
|
||||
|
||||
public long getPrimaryBytes() {
|
||||
return primaryBytes.get();
|
||||
}
|
||||
|
||||
public Releasable markReplicaOperationStarted(long bytes) {
|
||||
replicaBytes.getAndAdd(bytes);
|
||||
return () -> replicaBytes.getAndAdd(-bytes);
|
||||
}
|
||||
|
||||
public long getReplicaBytes() {
|
||||
return replicaBytes.get();
|
||||
public long getReplicaWriteBytes() {
|
||||
return replicaWriteBytes.get();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,6 +64,11 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
|
|||
writeMemoryLimits);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task parentTask, ResyncReplicationRequest request, ActionListener<ResyncReplicationResponse> listener) {
|
||||
assert false : "use TransportResyncReplicationAction#sync";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ResyncReplicationResponse newResponseInstance(StreamInput in) throws IOException {
|
||||
return new ResyncReplicationResponse(in);
|
||||
|
|
|
@ -286,7 +286,7 @@ public abstract class TransportReplicationAction<
|
|||
}
|
||||
|
||||
protected void handlePrimaryRequest(final ConcreteShardRequest<Request> request, final TransportChannel channel, final Task task) {
|
||||
Releasable releasable = checkPrimaryLimits(request.getRequest());
|
||||
Releasable releasable = checkPrimaryLimits(request.getRequest(), request.sentFromLocalReroute());
|
||||
ActionListener<Response> listener =
|
||||
ActionListener.runBefore(new ChannelActionListener<>(channel, transportPrimaryAction, request), releasable::close);
|
||||
|
||||
|
@ -297,7 +297,7 @@ public abstract class TransportReplicationAction<
|
|||
}
|
||||
}
|
||||
|
||||
protected Releasable checkPrimaryLimits(final Request request) {
|
||||
protected Releasable checkPrimaryLimits(final Request request, boolean rerouteWasLocal) {
|
||||
return () -> {};
|
||||
}
|
||||
|
||||
|
@ -372,8 +372,7 @@ public abstract class TransportReplicationAction<
|
|||
DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId());
|
||||
transportService.sendRequest(relocatingNode, transportPrimaryAction,
|
||||
new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(),
|
||||
primaryRequest.getPrimaryTerm()),
|
||||
transportOptions,
|
||||
primaryRequest.getPrimaryTerm()), transportOptions,
|
||||
new ActionListenerResponseHandler<Response>(onCompletionListener, reader) {
|
||||
@Override
|
||||
public void handleResponse(Response response) {
|
||||
|
@ -585,7 +584,7 @@ public abstract class TransportReplicationAction<
|
|||
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
|
||||
AsyncReplicaAction.this.onFailure(e);
|
||||
}));
|
||||
// TODO: Evaludate if we still need to catch this exception
|
||||
// TODO: Evaluate if we still need to catch this exception
|
||||
} catch (Exception e) {
|
||||
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
|
||||
AsyncReplicaAction.this.onFailure(e);
|
||||
|
@ -751,7 +750,7 @@ public abstract class TransportReplicationAction<
|
|||
transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId());
|
||||
}
|
||||
performAction(node, transportPrimaryAction, true,
|
||||
new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id())));
|
||||
new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id()), true));
|
||||
}
|
||||
|
||||
private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) {
|
||||
|
@ -1103,19 +1102,27 @@ public abstract class TransportReplicationAction<
|
|||
private final String targetAllocationID;
|
||||
private final long primaryTerm;
|
||||
private final R request;
|
||||
// Indicates if this primary shard request originated by a reroute on this local node.
|
||||
private final boolean sentFromLocalReroute;
|
||||
|
||||
public ConcreteShardRequest(Writeable.Reader<R> requestReader, StreamInput in) throws IOException {
|
||||
targetAllocationID = in.readString();
|
||||
primaryTerm = in.readVLong();
|
||||
sentFromLocalReroute = false;
|
||||
request = requestReader.read(in);
|
||||
}
|
||||
|
||||
public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm) {
|
||||
this(request, targetAllocationID, primaryTerm, false);
|
||||
}
|
||||
|
||||
public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm, boolean sentFromLocalReroute) {
|
||||
Objects.requireNonNull(request);
|
||||
Objects.requireNonNull(targetAllocationID);
|
||||
this.request = request;
|
||||
this.targetAllocationID = targetAllocationID;
|
||||
this.primaryTerm = primaryTerm;
|
||||
this.sentFromLocalReroute = sentFromLocalReroute;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1144,11 +1151,19 @@ public abstract class TransportReplicationAction<
|
|||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
// If sentFromLocalReroute is marked true, then this request should just be looped back through
|
||||
// the local transport. It should never be serialized to be sent over the wire. If it is sent over
|
||||
// the wire, then it was NOT sent from a local reroute.
|
||||
assert sentFromLocalReroute == false;
|
||||
out.writeString(targetAllocationID);
|
||||
out.writeVLong(primaryTerm);
|
||||
request.writeTo(out);
|
||||
}
|
||||
|
||||
public boolean sentFromLocalReroute() {
|
||||
return sentFromLocalReroute;
|
||||
}
|
||||
|
||||
public R getRequest() {
|
||||
return request;
|
||||
}
|
||||
|
|
|
@ -80,12 +80,18 @@ public abstract class TransportWriteAction<
|
|||
|
||||
@Override
|
||||
protected Releasable checkOperationLimits(Request request) {
|
||||
return writeMemoryLimits.markCoordinatingOperationStarted(primaryOperationSize(request));
|
||||
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Releasable checkPrimaryLimits(Request request) {
|
||||
return writeMemoryLimits.markPrimaryOperationStarted(primaryOperationSize(request));
|
||||
protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal) {
|
||||
// If this primary request was submitted by a reroute performed on this local node, we have already
|
||||
// accounted the bytes.
|
||||
if (rerouteWasLocal) {
|
||||
return () -> {};
|
||||
} else {
|
||||
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request));
|
||||
}
|
||||
}
|
||||
|
||||
protected long primaryOperationSize(Request request) {
|
||||
|
@ -94,7 +100,7 @@ public abstract class TransportWriteAction<
|
|||
|
||||
@Override
|
||||
protected Releasable checkReplicaLimits(ReplicaRequest request) {
|
||||
return writeMemoryLimits.markReplicaOperationStarted(replicaOperationSize(request));
|
||||
return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request));
|
||||
}
|
||||
|
||||
protected long replicaOperationSize(ReplicaRequest request) {
|
||||
|
|
|
@ -1349,19 +1349,14 @@ public final class InternalTestCluster extends TestCluster {
|
|||
assertBusy(() -> {
|
||||
for (NodeAndClient nodeAndClient : nodes.values()) {
|
||||
WriteMemoryLimits writeMemoryLimits = getInstance(WriteMemoryLimits.class, nodeAndClient.name);
|
||||
final long coordinatingBytes = writeMemoryLimits.getCoordinatingBytes();
|
||||
if (coordinatingBytes > 0) {
|
||||
throw new AssertionError("pending coordinating write bytes [" + coordinatingBytes + "] bytes on node ["
|
||||
final long writeBytes = writeMemoryLimits.getWriteBytes();
|
||||
if (writeBytes > 0) {
|
||||
throw new AssertionError("pending write bytes [" + writeBytes + "] bytes on node ["
|
||||
+ nodeAndClient.name + "].");
|
||||
}
|
||||
final long primaryBytes = writeMemoryLimits.getPrimaryBytes();
|
||||
if (primaryBytes > 0) {
|
||||
throw new AssertionError("pending primary write bytes [" + coordinatingBytes + "] bytes on node ["
|
||||
+ nodeAndClient.name + "].");
|
||||
}
|
||||
final long replicaBytes = writeMemoryLimits.getReplicaBytes();
|
||||
if (replicaBytes > 0) {
|
||||
throw new AssertionError("pending replica write bytes [" + coordinatingBytes + "] bytes on node ["
|
||||
final long replicaWriteBytes = writeMemoryLimits.getReplicaWriteBytes();
|
||||
if (replicaWriteBytes > 0) {
|
||||
throw new AssertionError("pending replica write bytes [" + writeBytes + "] bytes on node ["
|
||||
+ nodeAndClient.name + "].");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,13 +6,16 @@
|
|||
|
||||
package org.elasticsearch.xpack.ccr;
|
||||
|
||||
import org.elasticsearch.action.bulk.WriteMemoryLimits;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.CcrSingleNodeTestCase;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
|
||||
|
@ -24,6 +27,8 @@ import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
|
|||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
|
@ -92,6 +97,65 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
|
|||
assertThat(client().admin().indices().prepareExists("follower-index").get().isExists(), equalTo(false));
|
||||
}
|
||||
|
||||
public void testWriteLimitsIncremented() throws Exception {
|
||||
final String leaderIndexSettings = getIndexSettings(1, 0,
|
||||
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
assertAcked(client().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON));
|
||||
ensureGreen("leader");
|
||||
|
||||
// Use a sufficiently small number of docs to ensure that they are well below the number of docs that
|
||||
// can be sent in a single TransportBulkShardOperationsAction
|
||||
final long firstBatchNumDocs = randomIntBetween(10, 20);
|
||||
long sourceSize = 0;
|
||||
for (int i = 0; i < firstBatchNumDocs; i++) {
|
||||
BytesArray source = new BytesArray("{}");
|
||||
sourceSize += source.length();
|
||||
client().prepareIndex("leader", "doc").setSource(source, XContentType.JSON).get();
|
||||
}
|
||||
|
||||
ThreadPool nodeThreadPool = getInstanceFromNode(ThreadPool.class);
|
||||
ThreadPool.Info writeInfo = StreamSupport.stream(nodeThreadPool.info().spliterator(), false)
|
||||
.filter(i -> i.getName().equals(ThreadPool.Names.WRITE)).findAny().get();
|
||||
int numberOfThreads = writeInfo.getMax();
|
||||
CountDownLatch threadBlockedLatch = new CountDownLatch(numberOfThreads);
|
||||
CountDownLatch blocker = new CountDownLatch(1);
|
||||
|
||||
for (int i = 0; i < numberOfThreads; ++i) {
|
||||
nodeThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> {
|
||||
try {
|
||||
threadBlockedLatch.countDown();
|
||||
blocker.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
threadBlockedLatch.await();
|
||||
|
||||
try {
|
||||
final PutFollowAction.Request followRequest = getPutFollowRequest("leader", "follower");
|
||||
client().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||
|
||||
WriteMemoryLimits memoryLimits = getInstanceFromNode(WriteMemoryLimits.class);
|
||||
final long finalSourceSize = sourceSize;
|
||||
assertBusy(() -> {
|
||||
// The actual write bytes will be greater due to other request fields. However, this test is
|
||||
// just spot checking that the bytes are incremented at all.
|
||||
assertTrue(memoryLimits.getWriteBytes() > finalSourceSize);
|
||||
});
|
||||
blocker.countDown();
|
||||
assertBusy(() -> {
|
||||
assertThat(client().prepareSearch("follower").get().getHits().getTotalHits().value, equalTo(firstBatchNumDocs));
|
||||
});
|
||||
ensureEmptyWriteBuffers();
|
||||
} finally {
|
||||
if (blocker.getCount() > 0) {
|
||||
blocker.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void testRemoveRemoteConnection() throws Exception {
|
||||
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
||||
request.setName("my_pattern");
|
||||
|
|
|
@ -17,6 +17,7 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||
|
@ -25,6 +26,7 @@ import org.elasticsearch.index.shard.IndexShard;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.ccr.index.engine.AlreadyProcessedFollowingEngineException;
|
||||
|
@ -36,6 +38,8 @@ import java.util.List;
|
|||
public class TransportBulkShardOperationsAction
|
||||
extends TransportWriteAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> {
|
||||
|
||||
private final WriteMemoryLimits writeMemoryLimits;
|
||||
|
||||
@Inject
|
||||
public TransportBulkShardOperationsAction(
|
||||
final Settings settings,
|
||||
|
@ -58,6 +62,19 @@ public class TransportBulkShardOperationsAction
|
|||
BulkShardOperationsRequest::new,
|
||||
BulkShardOperationsRequest::new,
|
||||
ThreadPool.Names.WRITE, false, writeMemoryLimits);
|
||||
this.writeMemoryLimits = writeMemoryLimits;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, BulkShardOperationsRequest request, ActionListener<BulkShardOperationsResponse> listener) {
|
||||
// This is executed on the follower coordinator node and we need to mark the bytes.
|
||||
Releasable releasable = writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request));
|
||||
ActionListener<BulkShardOperationsResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
|
||||
try {
|
||||
super.doExecute(task, request, releasingListener);
|
||||
} catch (Exception e) {
|
||||
releasingListener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue