Ignore waitForActiveShards when syncing leases (#39224)

Adjust the retention lease sync actions so that they do not respect the
`index.write.wait_for_active_shards` setting on an index, allowing them to sync
retention leases even if insufficiently many shards are currently active to
accept writes.

Relates #39089
This commit is contained in:
David Turner 2019-02-25 08:08:02 +00:00
parent 50d2736746
commit 96c09b032d
3 changed files with 120 additions and 5 deletions

View File

@ -26,6 +26,7 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
@ -123,6 +124,7 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
final Request request,
final IndexShard primary) throws WriteStateException {
assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards();
Objects.requireNonNull(request);
Objects.requireNonNull(primary);
primary.persistRetentionLeases();
@ -153,6 +155,7 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
public Request(final ShardId shardId, final RetentionLeases retentionLeases) {
super(Objects.requireNonNull(shardId));
this.retentionLeases = Objects.requireNonNull(retentionLeases);
waitForActiveShards(ActiveShardCount.NONE);
}
@Override

View File

@ -26,6 +26,7 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
@ -125,6 +126,7 @@ public class RetentionLeaseSyncAction extends
protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(
final Request request,
final IndexShard primary) throws WriteStateException {
assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards();
Objects.requireNonNull(request);
Objects.requireNonNull(primary);
primary.persistRetentionLeases();
@ -162,6 +164,7 @@ public class RetentionLeaseSyncAction extends
public Request(final ShardId shardId, final RetentionLeases retentionLeases) {
super(Objects.requireNonNull(shardId));
this.retentionLeases = Objects.requireNonNull(retentionLeases);
waitForActiveShards(ActiveShardCount.NONE);
}
@Override

View File

@ -428,7 +428,7 @@ public class RetentionLeaseIT extends ESIntegTestCase {
private void runUnderBlockTest(
final String idForInitialRetentionLease,
final long initialRetainingSequenceNumber,
final BiConsumer<IndexShard, ActionListener<ReplicationResponse>> indexShard,
final BiConsumer<IndexShard, ActionListener<ReplicationResponse>> primaryConsumer,
final Consumer<IndexShard> afterSync) throws InterruptedException {
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
@ -444,12 +444,10 @@ public class RetentionLeaseIT extends ESIntegTestCase {
.getInstance(IndicesService.class, primaryShardNodeName)
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
final String id = idForInitialRetentionLease;
final long retainingSequenceNumber = initialRetainingSequenceNumber;
final String source = randomAlphaOfLength(8);
final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
primary.addRetentionLease(id, retainingSequenceNumber, source, listener);
primary.addRetentionLease(idForInitialRetentionLease, initialRetainingSequenceNumber, source, listener);
latch.await();
final String block = randomFrom("read_only", "read_only_allow_delete", "read", "write", "metadata");
@ -465,7 +463,7 @@ public class RetentionLeaseIT extends ESIntegTestCase {
final CountDownLatch actionLatch = new CountDownLatch(1);
final AtomicBoolean success = new AtomicBoolean();
indexShard.accept(
primaryConsumer.accept(
primary,
new ActionListener<ReplicationResponse>() {
@ -494,4 +492,115 @@ public class RetentionLeaseIT extends ESIntegTestCase {
}
}
public void testCanAddRetentionLeaseWithoutWaitingForShards() throws InterruptedException {
final String idForInitialRetentionLease = randomAlphaOfLength(8);
runWaitForShardsTest(
idForInitialRetentionLease,
randomLongBetween(0, Long.MAX_VALUE),
(primary, listener) -> {
final String nextId = randomValueOtherThan(idForInitialRetentionLease, () -> randomAlphaOfLength(8));
final long nextRetainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
final String nextSource = randomAlphaOfLength(8);
primary.addRetentionLease(nextId, nextRetainingSequenceNumber, nextSource, listener);
},
primary -> {});
}
public void testCanRenewRetentionLeaseWithoutWaitingForShards() throws InterruptedException {
final String idForInitialRetentionLease = randomAlphaOfLength(8);
final long initialRetainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
final AtomicReference<RetentionLease> retentionLease = new AtomicReference<>();
runWaitForShardsTest(
idForInitialRetentionLease,
initialRetainingSequenceNumber,
(primary, listener) -> {
final long nextRetainingSequenceNumber = randomLongBetween(initialRetainingSequenceNumber, Long.MAX_VALUE);
final String nextSource = randomAlphaOfLength(8);
retentionLease.set(primary.renewRetentionLease(idForInitialRetentionLease, nextRetainingSequenceNumber, nextSource));
listener.onResponse(new ReplicationResponse());
},
primary -> {
try {
/*
* If the background renew was able to execute, then the retention leases were persisted to disk. There is no other
* way for the current retention leases to end up written to disk so we assume that if they are written to disk, it
* implies that the background sync was able to execute despite wait for shards being set on the index.
*/
assertBusy(() -> assertThat(primary.loadRetentionLeases().leases(), contains(retentionLease.get())));
} catch (final Exception e) {
fail(e.toString());
}
});
}
public void testCanRemoveRetentionLeasesWithoutWaitingForShards() throws InterruptedException {
final String idForInitialRetentionLease = randomAlphaOfLength(8);
runWaitForShardsTest(
idForInitialRetentionLease,
randomLongBetween(0, Long.MAX_VALUE),
(primary, listener) -> primary.removeRetentionLease(idForInitialRetentionLease, listener),
primary -> {});
}
private void runWaitForShardsTest(
final String idForInitialRetentionLease,
final long initialRetainingSequenceNumber,
final BiConsumer<IndexShard, ActionListener<ReplicationResponse>> primaryConsumer,
final Consumer<IndexShard> afterSync) throws InterruptedException {
final int numDataNodes = internalCluster().numDataNodes();
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", numDataNodes == 1 ? 0 : numDataNodes - 1)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.build();
assertAcked(prepareCreate("index").setSettings(settings));
ensureYellowAndNoInitializingShards("index");
assertFalse(client().admin().cluster().prepareHealth("index").setWaitForActiveShards(numDataNodes).get().isTimedOut());
final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId();
final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName();
final IndexShard primary = internalCluster()
.getInstance(IndicesService.class, primaryShardNodeName)
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
final String source = randomAlphaOfLength(8);
final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
primary.addRetentionLease(idForInitialRetentionLease, initialRetainingSequenceNumber, source, listener);
latch.await();
final String waitForActiveValue = randomBoolean() ? "all" : Integer.toString(numDataNodes);
client()
.admin()
.indices()
.prepareUpdateSettings("index")
.setSettings(Settings.builder().put("index.write.wait_for_active_shards", waitForActiveValue).build())
.get();
final CountDownLatch actionLatch = new CountDownLatch(1);
final AtomicBoolean success = new AtomicBoolean();
primaryConsumer.accept(
primary,
new ActionListener<ReplicationResponse>() {
@Override
public void onResponse(final ReplicationResponse replicationResponse) {
success.set(true);
actionLatch.countDown();
}
@Override
public void onFailure(final Exception e) {
fail(e.toString());
}
});
actionLatch.await();
assertTrue(success.get());
afterSync.accept(primary);
}
}