diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index bd996377c39..e9a6e7b4815 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -110,7 +110,7 @@ public class TransportResyncReplicationAction extends TransportWriteAction(request, new Response(), null, null, primary, logger); + return new WritePrimaryResult<>(request, new Response(), null, null, primary, getLogger()); } @Override @@ -138,7 +139,12 @@ public class RetentionLeaseSyncAction extends Objects.requireNonNull(replica); replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); replica.persistRetentionLeases(); - return new WriteReplicaResult<>(request, null, null, replica, logger); + return new WriteReplicaResult<>(request, null, null, replica, getLogger()); + } + + @Override + public ClusterBlockLevel indexBlockLevel() { + return null; } public static final class Request extends ReplicatedWriteRequest { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 41a300c28f3..110ab9bcb99 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -221,7 +221,7 @@ public class TransportReplicationActionTests extends ESTestCase { } @Override - protected ClusterBlockLevel indexBlockLevel() { + public ClusterBlockLevel indexBlockLevel() { return globalBlock == false ? ClusterBlockLevel.WRITE : null; } }; @@ -305,7 +305,7 @@ public class TransportReplicationActionTests extends ESTestCase { } @Override - protected ClusterBlockLevel indexBlockLevel() { + public ClusterBlockLevel indexBlockLevel() { return globalBlock == false ? ClusterBlockLevel.WRITE : null; } }; diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 8cad76bcdfe..1cb1bfde34e 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -459,7 +459,7 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe } @Override - protected ClusterBlockLevel indexBlockLevel() { + public ClusterBlockLevel indexBlockLevel() { return globalBlock == false ? ClusterBlockLevel.WRITE : super.indexBlockLevel(); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java index 4567f3e3823..6ad7d5039ae 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java @@ -228,4 +228,31 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase { assertTrue(invoked.get()); } + public void testBlocks() { + final IndicesService indicesService = mock(IndicesService.class); + + final Index index = new Index("index", "uuid"); + final IndexService indexService = mock(IndexService.class); + when(indicesService.indexServiceSafe(index)).thenReturn(indexService); + + final int id = randomIntBetween(0, 4); + final IndexShard indexShard = mock(IndexShard.class); + when(indexService.getShard(id)).thenReturn(indexShard); + + final ShardId shardId = new ShardId(index, id); + when(indexShard.shardId()).thenReturn(shardId); + + final RetentionLeaseBackgroundSyncAction action = new RetentionLeaseBackgroundSyncAction( + Settings.EMPTY, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + new ActionFilters(Collections.emptySet()), + new IndexNameExpressionResolver()); + + assertNull(action.indexBlockLevel()); + } + } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index a05d383eee0..ee6cab9a687 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -44,6 +44,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -266,7 +270,7 @@ public class RetentionLeaseIT extends ESIntegTestCase { final Settings settings = Settings.builder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", numberOfReplicas) - .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s") + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1)) .build(); createIndex("index", settings); ensureGreen("index"); @@ -370,4 +374,124 @@ public class RetentionLeaseIT extends ESIntegTestCase { } } + public void testCanAddRetentionLeaseUnderBlock() throws InterruptedException { + final String idForInitialRetentionLease = randomAlphaOfLength(8); + runUnderBlockTest( + idForInitialRetentionLease, + randomLongBetween(0, Long.MAX_VALUE), + (primary, listener) -> { + final String nextId = randomValueOtherThan(idForInitialRetentionLease, () -> randomAlphaOfLength(8)); + final long nextRetainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE); + final String nextSource = randomAlphaOfLength(8); + primary.addRetentionLease(nextId, nextRetainingSequenceNumber, nextSource, listener); + }, + primary -> {}); + } + + public void testCanRenewRetentionLeaseUnderBlock() throws InterruptedException { + final String idForInitialRetentionLease = randomAlphaOfLength(8); + final long initialRetainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE); + final AtomicReference retentionLease = new AtomicReference<>(); + runUnderBlockTest( + idForInitialRetentionLease, + initialRetainingSequenceNumber, + (primary, listener) -> { + final long nextRetainingSequenceNumber = randomLongBetween(initialRetainingSequenceNumber, Long.MAX_VALUE); + final String nextSource = randomAlphaOfLength(8); + retentionLease.set(primary.renewRetentionLease(idForInitialRetentionLease, nextRetainingSequenceNumber, nextSource)); + listener.onResponse(new ReplicationResponse()); + }, + primary -> { + try { + /* + * If the background renew was able to execute, then the retention leases were persisted to disk. There is no other + * way for the current retention leases to end up written to disk so we assume that if they are written to disk, it + * implies that the background sync was able to execute under a block. + */ + assertBusy(() -> assertThat(primary.loadRetentionLeases().leases(), contains(retentionLease.get()))); + } catch (final Exception e) { + fail(e.toString()); + } + }); + + } + + public void testCanRemoveRetentionLeasesUnderBlock() throws InterruptedException { + final String idForInitialRetentionLease = randomAlphaOfLength(8); + runUnderBlockTest( + idForInitialRetentionLease, + randomLongBetween(0, Long.MAX_VALUE), + (primary, listener) -> primary.removeRetentionLease(idForInitialRetentionLease, listener), + indexShard -> {}); + } + + private void runUnderBlockTest( + final String idForInitialRetentionLease, + final long initialRetainingSequenceNumber, + final BiConsumer> indexShard, + final Consumer afterSync) throws InterruptedException { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1)) + .build(); + assertAcked(prepareCreate("index").setSettings(settings)); + ensureGreen("index"); + + final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId(); + final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName(); + final IndexShard primary = internalCluster() + .getInstance(IndicesService.class, primaryShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + + final String id = idForInitialRetentionLease; + final long retainingSequenceNumber = initialRetainingSequenceNumber; + final String source = randomAlphaOfLength(8); + final CountDownLatch latch = new CountDownLatch(1); + final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); + primary.addRetentionLease(id, retainingSequenceNumber, source, listener); + latch.await(); + + final String block = randomFrom("read_only", "read_only_allow_delete", "read", "write", "metadata"); + + client() + .admin() + .indices() + .prepareUpdateSettings("index") + .setSettings(Settings.builder().put("index.blocks." + block, true).build()) + .get(); + + try { + final CountDownLatch actionLatch = new CountDownLatch(1); + final AtomicBoolean success = new AtomicBoolean(); + + indexShard.accept( + primary, + new ActionListener() { + + @Override + public void onResponse(final ReplicationResponse replicationResponse) { + success.set(true); + actionLatch.countDown(); + } + + @Override + public void onFailure(final Exception e) { + fail(e.toString()); + } + + }); + actionLatch.await(); + assertTrue(success.get()); + afterSync.accept(primary); + } finally { + client() + .admin() + .indices() + .prepareUpdateSettings("index") + .setSettings(Settings.builder().putNull("index.blocks." + block).build()) + .get(); + } + } + } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java index 80baa23a4d7..9b9ad6a0962 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -228,4 +228,31 @@ public class RetentionLeaseSyncActionTests extends ESTestCase { assertTrue(invoked.get()); } + public void testBlocks() { + final IndicesService indicesService = mock(IndicesService.class); + + final Index index = new Index("index", "uuid"); + final IndexService indexService = mock(IndexService.class); + when(indicesService.indexServiceSafe(index)).thenReturn(indexService); + + final int id = randomIntBetween(0, 4); + final IndexShard indexShard = mock(IndexShard.class); + when(indexService.getShard(id)).thenReturn(indexShard); + + final ShardId shardId = new ShardId(index, id); + when(indexShard.shardId()).thenReturn(shardId); + + final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction( + Settings.EMPTY, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + new ActionFilters(Collections.emptySet()), + new IndexNameExpressionResolver()); + + assertNull(action.indexBlockLevel()); + } + }