From 96c09b032dc637b7042ed5a8d91da91e3979094c Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 25 Feb 2019 08:08:02 +0000 Subject: [PATCH] Ignore waitForActiveShards when syncing leases (#39224) Adjust the retention lease sync actions so that they do not respect the `index.write.wait_for_active_shards` setting on an index, allowing them to sync retention leases even if insufficiently many shards are currently active to accept writes. Relates #39089 --- .../RetentionLeaseBackgroundSyncAction.java | 3 + .../index/seqno/RetentionLeaseSyncAction.java | 3 + .../index/seqno/RetentionLeaseIT.java | 119 +++++++++++++++++- 3 files changed, 120 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index 4033dcf0c4b..d454c2de75b 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -26,6 +26,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; @@ -123,6 +124,7 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi protected PrimaryResult shardOperationOnPrimary( final Request request, final IndexShard primary) throws WriteStateException { + assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards(); Objects.requireNonNull(request); Objects.requireNonNull(primary); primary.persistRetentionLeases(); @@ -153,6 +155,7 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi public Request(final ShardId shardId, final RetentionLeases retentionLeases) { super(Objects.requireNonNull(shardId)); this.retentionLeases = Objects.requireNonNull(retentionLeases); + waitForActiveShards(ActiveShardCount.NONE); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index 4cd11de4574..d4845d92a3a 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -26,6 +26,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -125,6 +126,7 @@ public class RetentionLeaseSyncAction extends protected WritePrimaryResult shardOperationOnPrimary( final Request request, final IndexShard primary) throws WriteStateException { + assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards(); Objects.requireNonNull(request); Objects.requireNonNull(primary); primary.persistRetentionLeases(); @@ -162,6 +164,7 @@ public class RetentionLeaseSyncAction extends public Request(final ShardId shardId, final RetentionLeases retentionLeases) { super(Objects.requireNonNull(shardId)); this.retentionLeases = Objects.requireNonNull(retentionLeases); + waitForActiveShards(ActiveShardCount.NONE); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index ee6cab9a687..43e5847735d 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -428,7 +428,7 @@ public class RetentionLeaseIT extends ESIntegTestCase { private void runUnderBlockTest( final String idForInitialRetentionLease, final long initialRetainingSequenceNumber, - final BiConsumer> indexShard, + final BiConsumer> primaryConsumer, final Consumer afterSync) throws InterruptedException { final Settings settings = Settings.builder() .put("index.number_of_shards", 1) @@ -444,12 +444,10 @@ public class RetentionLeaseIT extends ESIntegTestCase { .getInstance(IndicesService.class, primaryShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - final String id = idForInitialRetentionLease; - final long retainingSequenceNumber = initialRetainingSequenceNumber; final String source = randomAlphaOfLength(8); final CountDownLatch latch = new CountDownLatch(1); final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); - primary.addRetentionLease(id, retainingSequenceNumber, source, listener); + primary.addRetentionLease(idForInitialRetentionLease, initialRetainingSequenceNumber, source, listener); latch.await(); final String block = randomFrom("read_only", "read_only_allow_delete", "read", "write", "metadata"); @@ -465,7 +463,7 @@ public class RetentionLeaseIT extends ESIntegTestCase { final CountDownLatch actionLatch = new CountDownLatch(1); final AtomicBoolean success = new AtomicBoolean(); - indexShard.accept( + primaryConsumer.accept( primary, new ActionListener() { @@ -494,4 +492,115 @@ public class RetentionLeaseIT extends ESIntegTestCase { } } + public void testCanAddRetentionLeaseWithoutWaitingForShards() throws InterruptedException { + final String idForInitialRetentionLease = randomAlphaOfLength(8); + runWaitForShardsTest( + idForInitialRetentionLease, + randomLongBetween(0, Long.MAX_VALUE), + (primary, listener) -> { + final String nextId = randomValueOtherThan(idForInitialRetentionLease, () -> randomAlphaOfLength(8)); + final long nextRetainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE); + final String nextSource = randomAlphaOfLength(8); + primary.addRetentionLease(nextId, nextRetainingSequenceNumber, nextSource, listener); + }, + primary -> {}); + } + + public void testCanRenewRetentionLeaseWithoutWaitingForShards() throws InterruptedException { + final String idForInitialRetentionLease = randomAlphaOfLength(8); + final long initialRetainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE); + final AtomicReference retentionLease = new AtomicReference<>(); + runWaitForShardsTest( + idForInitialRetentionLease, + initialRetainingSequenceNumber, + (primary, listener) -> { + final long nextRetainingSequenceNumber = randomLongBetween(initialRetainingSequenceNumber, Long.MAX_VALUE); + final String nextSource = randomAlphaOfLength(8); + retentionLease.set(primary.renewRetentionLease(idForInitialRetentionLease, nextRetainingSequenceNumber, nextSource)); + listener.onResponse(new ReplicationResponse()); + }, + primary -> { + try { + /* + * If the background renew was able to execute, then the retention leases were persisted to disk. There is no other + * way for the current retention leases to end up written to disk so we assume that if they are written to disk, it + * implies that the background sync was able to execute despite wait for shards being set on the index. + */ + assertBusy(() -> assertThat(primary.loadRetentionLeases().leases(), contains(retentionLease.get()))); + } catch (final Exception e) { + fail(e.toString()); + } + }); + + } + + public void testCanRemoveRetentionLeasesWithoutWaitingForShards() throws InterruptedException { + final String idForInitialRetentionLease = randomAlphaOfLength(8); + runWaitForShardsTest( + idForInitialRetentionLease, + randomLongBetween(0, Long.MAX_VALUE), + (primary, listener) -> primary.removeRetentionLease(idForInitialRetentionLease, listener), + primary -> {}); + } + + private void runWaitForShardsTest( + final String idForInitialRetentionLease, + final long initialRetainingSequenceNumber, + final BiConsumer> primaryConsumer, + final Consumer afterSync) throws InterruptedException { + final int numDataNodes = internalCluster().numDataNodes(); + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", numDataNodes == 1 ? 0 : numDataNodes - 1) + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1)) + .build(); + assertAcked(prepareCreate("index").setSettings(settings)); + ensureYellowAndNoInitializingShards("index"); + assertFalse(client().admin().cluster().prepareHealth("index").setWaitForActiveShards(numDataNodes).get().isTimedOut()); + + final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId(); + final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName(); + final IndexShard primary = internalCluster() + .getInstance(IndicesService.class, primaryShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + + final String source = randomAlphaOfLength(8); + final CountDownLatch latch = new CountDownLatch(1); + final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); + primary.addRetentionLease(idForInitialRetentionLease, initialRetainingSequenceNumber, source, listener); + latch.await(); + + final String waitForActiveValue = randomBoolean() ? "all" : Integer.toString(numDataNodes); + + client() + .admin() + .indices() + .prepareUpdateSettings("index") + .setSettings(Settings.builder().put("index.write.wait_for_active_shards", waitForActiveValue).build()) + .get(); + + final CountDownLatch actionLatch = new CountDownLatch(1); + final AtomicBoolean success = new AtomicBoolean(); + + primaryConsumer.accept( + primary, + new ActionListener() { + + @Override + public void onResponse(final ReplicationResponse replicationResponse) { + success.set(true); + actionLatch.countDown(); + } + + @Override + public void onFailure(final Exception e) { + fail(e.toString()); + } + + }); + actionLatch.await(); + assertTrue(success.get()); + afterSync.accept(primary); + } + }