Allow retention lease operations under blocks (#39089)
This commit allows manipulating retention leases under blocks.
This commit is contained in:
parent
35e30b34f9
commit
fef9bdb23f
|
@ -110,7 +110,7 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
|
|||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockLevel indexBlockLevel() {
|
||||
public ClusterBlockLevel indexBlockLevel() {
|
||||
// resync should never be blocked because it's an internal action
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -222,7 +222,7 @@ public abstract class TransportReplicationAction<
|
|||
* Index level block to check before request execution. Returning null means that no blocks need to be checked.
|
||||
*/
|
||||
@Nullable
|
||||
protected ClusterBlockLevel indexBlockLevel() {
|
||||
public ClusterBlockLevel indexBlockLevel() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
@ -245,7 +245,7 @@ public abstract class TransportWriteAction<
|
|||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockLevel indexBlockLevel() {
|
||||
public ClusterBlockLevel indexBlockLevel() {
|
||||
return ClusterBlockLevel.WRITE;
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
|
|||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||
import org.elasticsearch.action.support.replication.TransportWriteAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -127,7 +128,7 @@ public class RetentionLeaseSyncAction extends
|
|||
Objects.requireNonNull(request);
|
||||
Objects.requireNonNull(primary);
|
||||
primary.persistRetentionLeases();
|
||||
return new WritePrimaryResult<>(request, new Response(), null, null, primary, logger);
|
||||
return new WritePrimaryResult<>(request, new Response(), null, null, primary, getLogger());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -138,7 +139,12 @@ public class RetentionLeaseSyncAction extends
|
|||
Objects.requireNonNull(replica);
|
||||
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
|
||||
replica.persistRetentionLeases();
|
||||
return new WriteReplicaResult<>(request, null, null, replica, logger);
|
||||
return new WriteReplicaResult<>(request, null, null, replica, getLogger());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterBlockLevel indexBlockLevel() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public static final class Request extends ReplicatedWriteRequest<Request> {
|
||||
|
|
|
@ -221,7 +221,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockLevel indexBlockLevel() {
|
||||
public ClusterBlockLevel indexBlockLevel() {
|
||||
return globalBlock == false ? ClusterBlockLevel.WRITE : null;
|
||||
}
|
||||
};
|
||||
|
@ -305,7 +305,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockLevel indexBlockLevel() {
|
||||
public ClusterBlockLevel indexBlockLevel() {
|
||||
return globalBlock == false ? ClusterBlockLevel.WRITE : null;
|
||||
}
|
||||
};
|
||||
|
|
|
@ -459,7 +459,7 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
|
|||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockLevel indexBlockLevel() {
|
||||
public ClusterBlockLevel indexBlockLevel() {
|
||||
return globalBlock == false ? ClusterBlockLevel.WRITE : super.indexBlockLevel();
|
||||
}
|
||||
|
||||
|
|
|
@ -228,4 +228,31 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
|
|||
assertTrue(invoked.get());
|
||||
}
|
||||
|
||||
public void testBlocks() {
|
||||
final IndicesService indicesService = mock(IndicesService.class);
|
||||
|
||||
final Index index = new Index("index", "uuid");
|
||||
final IndexService indexService = mock(IndexService.class);
|
||||
when(indicesService.indexServiceSafe(index)).thenReturn(indexService);
|
||||
|
||||
final int id = randomIntBetween(0, 4);
|
||||
final IndexShard indexShard = mock(IndexShard.class);
|
||||
when(indexService.getShard(id)).thenReturn(indexShard);
|
||||
|
||||
final ShardId shardId = new ShardId(index, id);
|
||||
when(indexShard.shardId()).thenReturn(shardId);
|
||||
|
||||
final RetentionLeaseBackgroundSyncAction action = new RetentionLeaseBackgroundSyncAction(
|
||||
Settings.EMPTY,
|
||||
transportService,
|
||||
clusterService,
|
||||
indicesService,
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new IndexNameExpressionResolver());
|
||||
|
||||
assertNull(action.indexBlockLevel());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -44,6 +44,10 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
|
@ -266,7 +270,7 @@ public class RetentionLeaseIT extends ESIntegTestCase {
|
|||
final Settings settings = Settings.builder()
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", numberOfReplicas)
|
||||
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s")
|
||||
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1))
|
||||
.build();
|
||||
createIndex("index", settings);
|
||||
ensureGreen("index");
|
||||
|
@ -370,4 +374,124 @@ public class RetentionLeaseIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testCanAddRetentionLeaseUnderBlock() throws InterruptedException {
|
||||
final String idForInitialRetentionLease = randomAlphaOfLength(8);
|
||||
runUnderBlockTest(
|
||||
idForInitialRetentionLease,
|
||||
randomLongBetween(0, Long.MAX_VALUE),
|
||||
(primary, listener) -> {
|
||||
final String nextId = randomValueOtherThan(idForInitialRetentionLease, () -> randomAlphaOfLength(8));
|
||||
final long nextRetainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
|
||||
final String nextSource = randomAlphaOfLength(8);
|
||||
primary.addRetentionLease(nextId, nextRetainingSequenceNumber, nextSource, listener);
|
||||
},
|
||||
primary -> {});
|
||||
}
|
||||
|
||||
public void testCanRenewRetentionLeaseUnderBlock() throws InterruptedException {
|
||||
final String idForInitialRetentionLease = randomAlphaOfLength(8);
|
||||
final long initialRetainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
|
||||
final AtomicReference<RetentionLease> retentionLease = new AtomicReference<>();
|
||||
runUnderBlockTest(
|
||||
idForInitialRetentionLease,
|
||||
initialRetainingSequenceNumber,
|
||||
(primary, listener) -> {
|
||||
final long nextRetainingSequenceNumber = randomLongBetween(initialRetainingSequenceNumber, Long.MAX_VALUE);
|
||||
final String nextSource = randomAlphaOfLength(8);
|
||||
retentionLease.set(primary.renewRetentionLease(idForInitialRetentionLease, nextRetainingSequenceNumber, nextSource));
|
||||
listener.onResponse(new ReplicationResponse());
|
||||
},
|
||||
primary -> {
|
||||
try {
|
||||
/*
|
||||
* If the background renew was able to execute, then the retention leases were persisted to disk. There is no other
|
||||
* way for the current retention leases to end up written to disk so we assume that if they are written to disk, it
|
||||
* implies that the background sync was able to execute under a block.
|
||||
*/
|
||||
assertBusy(() -> assertThat(primary.loadRetentionLeases().leases(), contains(retentionLease.get())));
|
||||
} catch (final Exception e) {
|
||||
fail(e.toString());
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
public void testCanRemoveRetentionLeasesUnderBlock() throws InterruptedException {
|
||||
final String idForInitialRetentionLease = randomAlphaOfLength(8);
|
||||
runUnderBlockTest(
|
||||
idForInitialRetentionLease,
|
||||
randomLongBetween(0, Long.MAX_VALUE),
|
||||
(primary, listener) -> primary.removeRetentionLease(idForInitialRetentionLease, listener),
|
||||
indexShard -> {});
|
||||
}
|
||||
|
||||
private void runUnderBlockTest(
|
||||
final String idForInitialRetentionLease,
|
||||
final long initialRetainingSequenceNumber,
|
||||
final BiConsumer<IndexShard, ActionListener<ReplicationResponse>> indexShard,
|
||||
final Consumer<IndexShard> afterSync) throws InterruptedException {
|
||||
final Settings settings = Settings.builder()
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1))
|
||||
.build();
|
||||
assertAcked(prepareCreate("index").setSettings(settings));
|
||||
ensureGreen("index");
|
||||
|
||||
final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId();
|
||||
final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName();
|
||||
final IndexShard primary = internalCluster()
|
||||
.getInstance(IndicesService.class, primaryShardNodeName)
|
||||
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
|
||||
|
||||
final String id = idForInitialRetentionLease;
|
||||
final long retainingSequenceNumber = initialRetainingSequenceNumber;
|
||||
final String source = randomAlphaOfLength(8);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
|
||||
primary.addRetentionLease(id, retainingSequenceNumber, source, listener);
|
||||
latch.await();
|
||||
|
||||
final String block = randomFrom("read_only", "read_only_allow_delete", "read", "write", "metadata");
|
||||
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("index")
|
||||
.setSettings(Settings.builder().put("index.blocks." + block, true).build())
|
||||
.get();
|
||||
|
||||
try {
|
||||
final CountDownLatch actionLatch = new CountDownLatch(1);
|
||||
final AtomicBoolean success = new AtomicBoolean();
|
||||
|
||||
indexShard.accept(
|
||||
primary,
|
||||
new ActionListener<ReplicationResponse>() {
|
||||
|
||||
@Override
|
||||
public void onResponse(final ReplicationResponse replicationResponse) {
|
||||
success.set(true);
|
||||
actionLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
fail(e.toString());
|
||||
}
|
||||
|
||||
});
|
||||
actionLatch.await();
|
||||
assertTrue(success.get());
|
||||
afterSync.accept(primary);
|
||||
} finally {
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("index")
|
||||
.setSettings(Settings.builder().putNull("index.blocks." + block).build())
|
||||
.get();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -228,4 +228,31 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
|
|||
assertTrue(invoked.get());
|
||||
}
|
||||
|
||||
public void testBlocks() {
|
||||
final IndicesService indicesService = mock(IndicesService.class);
|
||||
|
||||
final Index index = new Index("index", "uuid");
|
||||
final IndexService indexService = mock(IndexService.class);
|
||||
when(indicesService.indexServiceSafe(index)).thenReturn(indexService);
|
||||
|
||||
final int id = randomIntBetween(0, 4);
|
||||
final IndexShard indexShard = mock(IndexShard.class);
|
||||
when(indexService.getShard(id)).thenReturn(indexShard);
|
||||
|
||||
final ShardId shardId = new ShardId(index, id);
|
||||
when(indexShard.shardId()).thenReturn(shardId);
|
||||
|
||||
final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction(
|
||||
Settings.EMPTY,
|
||||
transportService,
|
||||
clusterService,
|
||||
indicesService,
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new IndexNameExpressionResolver());
|
||||
|
||||
assertNull(action.indexBlockLevel());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue