Refactoring GatedAutoCloseable and moving RecoveryState.Timer (#2965)

* Refactoring GatedAutoCloseable to AutoCloseableRefCounted

This is a part of the process of merging our feature branch - feature/segment-replication - back into main by re-PRing our changes from the feature branch.
GatedAutoCloseable currently wraps a subclass of RefCounted. Segment replication adds another subclass, but this also wraps RefCounted. Both subclasses have the same shutdown hook - decRef. This change makes the superclass less generic to increase code convergence.

The breakdown of the plan to merge segment-replication to main is detailed in #2355
Segment replication design proposal - #2229

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Minor refactoring in RecoveryState

This change makes two minor updates to RecoveryState -
1. The readRecoveryState API is removed because it can be replaced by an invocation of the constructor
2. The class members of the Timer inner class are changed to private, and accesses are only through the public APIs

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Update RecoveryTargetTests to test Timer subclasses deterministically

This change removes the use of RandomBoolean in testing the Timer classes and creates a dedicated unit test for each. The common test logic is shared via a private method.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Move the RecoveryState.Timer class to a top-level class

This will eventually be reused across both replication use-cases - peer recovery and segment replication.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Further update of timer tests in RecoveryTargetTests

Removes a non-deterministic code path around stopping the timer, and avoids assertThat (deprecated)

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Rename to ReplicationTimer

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Remove RecoveryTargetTests assert on a running timer

Trying to serialize and deserialize a running Timer instance, and then checking for equality leads to flaky test failures when the ser/deser takes time.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>
This commit is contained in:
Kartik Ganesh 2022-04-20 15:02:25 -07:00 committed by GitHub
parent 3b7e654757
commit c7c410a063
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 206 additions and 172 deletions

View File

@ -87,7 +87,7 @@ public class TransportRecoveryAction extends TransportBroadcastByNodeAction<Reco
@Override
protected RecoveryState readShardResult(StreamInput in) throws IOException {
return RecoveryState.readRecoveryState(in);
return new RecoveryState(in);
}
@Override

View File

@ -13,20 +13,19 @@
package org.opensearch.common.concurrent;
import org.opensearch.common.util.concurrent.RefCounted;
/**
* Decorator class that wraps an object reference with a {@link Runnable} that is
* invoked when {@link #close()} is called. The internal {@link OneWayGate} instance ensures
* that this is invoked only once. See also {@link GatedCloseable}
* Adapter class that enables a {@link RefCounted} implementation to function like an {@link AutoCloseable}.
* The {@link #close()} API invokes {@link RefCounted#decRef()} and ensures idempotency using a {@link OneWayGate}.
*/
public class GatedAutoCloseable<T> implements AutoCloseable {
public class AutoCloseableRefCounted<T extends RefCounted> implements AutoCloseable {
private final T ref;
private final Runnable onClose;
private final OneWayGate gate;
public GatedAutoCloseable(T ref, Runnable onClose) {
public AutoCloseableRefCounted(T ref) {
this.ref = ref;
this.onClose = onClose;
gate = new OneWayGate();
}
@ -37,7 +36,7 @@ public class GatedAutoCloseable<T> implements AutoCloseable {
@Override
public void close() {
if (gate.close()) {
onClose.run();
ref.decRef();
}
}
}

View File

@ -21,7 +21,7 @@ import java.io.IOException;
/**
* Decorator class that wraps an object reference with a {@link CheckedRunnable} that is
* invoked when {@link #close()} is called. The internal {@link OneWayGate} instance ensures
* that this is invoked only once. See also {@link GatedAutoCloseable}
* that this is invoked only once. See also {@link AutoCloseableRefCounted}
*/
public class GatedCloseable<T> implements Closeable {

View File

@ -70,6 +70,7 @@ import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
@ -215,7 +216,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
final String actionName;
final TransportRequest requestToSend;
final StartRecoveryRequest startRequest;
final RecoveryState.Timer timer;
final ReplicationTimer timer;
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) {
if (recoveryRef == null) {
logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId);
@ -622,9 +623,9 @@ public class PeerRecoveryTargetService implements IndexEventListener {
private final long recoveryId;
private final StartRecoveryRequest request;
private final RecoveryState.Timer timer;
private final ReplicationTimer timer;
private RecoveryResponseHandler(final StartRecoveryRequest request, final RecoveryState.Timer timer) {
private RecoveryResponseHandler(final StartRecoveryRequest request, final ReplicationTimer timer) {
this.recoveryId = request.recoveryId();
this.request = request;
this.timer = timer;

View File

@ -36,7 +36,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.concurrent.GatedAutoCloseable;
import org.opensearch.common.concurrent.AutoCloseableRefCounted;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
@ -273,14 +273,14 @@ public class RecoveriesCollection {
* causes {@link RecoveryTarget#decRef()} to be called. This makes sure that the underlying resources
* will not be freed until {@link RecoveryRef#close()} is called.
*/
public static class RecoveryRef extends GatedAutoCloseable<RecoveryTarget> {
public static class RecoveryRef extends AutoCloseableRefCounted<RecoveryTarget> {
/**
* Important: {@link RecoveryTarget#tryIncRef()} should
* be *successfully* called on status before
*/
public RecoveryRef(RecoveryTarget status) {
super(status, status::decRef);
super(status);
status.setLastAccessTime();
}
}

View File

@ -50,6 +50,7 @@ import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.StoreStats;
import org.opensearch.indices.replication.common.ReplicationTimer;
import java.io.IOException;
import java.util.ArrayList;
@ -122,7 +123,7 @@ public class RecoveryState implements ToXContentFragment, Writeable {
private final Index index;
private final Translog translog;
private final VerifyIndex verifyIndex;
private final Timer timer;
private final ReplicationTimer timer;
private RecoverySource recoverySource;
private ShardId shardId;
@ -149,12 +150,12 @@ public class RecoveryState implements ToXContentFragment, Writeable {
this.index = index;
translog = new Translog();
verifyIndex = new VerifyIndex();
timer = new Timer();
timer = new ReplicationTimer();
timer.start();
}
public RecoveryState(StreamInput in) throws IOException {
timer = new Timer(in);
timer = new ReplicationTimer(in);
stage = Stage.fromId(in.readByte());
shardId = new ShardId(in);
recoverySource = RecoverySource.readFrom(in);
@ -256,7 +257,7 @@ public class RecoveryState implements ToXContentFragment, Writeable {
return translog;
}
public Timer getTimer() {
public ReplicationTimer getTimer() {
return timer;
}
@ -280,10 +281,6 @@ public class RecoveryState implements ToXContentFragment, Writeable {
return primary;
}
public static RecoveryState readRecoveryState(StreamInput in) throws IOException {
return new RecoveryState(in);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
@ -291,9 +288,9 @@ public class RecoveryState implements ToXContentFragment, Writeable {
builder.field(Fields.TYPE, recoverySource.getType());
builder.field(Fields.STAGE, stage.toString());
builder.field(Fields.PRIMARY, primary);
builder.timeField(Fields.START_TIME_IN_MILLIS, Fields.START_TIME, timer.startTime);
if (timer.stopTime > 0) {
builder.timeField(Fields.STOP_TIME_IN_MILLIS, Fields.STOP_TIME, timer.stopTime);
builder.timeField(Fields.START_TIME_IN_MILLIS, Fields.START_TIME, timer.startTime());
if (timer.stopTime() > 0) {
builder.timeField(Fields.STOP_TIME_IN_MILLIS, Fields.STOP_TIME, timer.stopTime());
}
builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, new TimeValue(timer.time()));
@ -375,78 +372,7 @@ public class RecoveryState implements ToXContentFragment, Writeable {
static final String TARGET_THROTTLE_TIME_IN_MILLIS = "target_throttle_time_in_millis";
}
public static class Timer implements Writeable {
protected long startTime = 0;
protected long startNanoTime = 0;
protected long time = -1;
protected long stopTime = 0;
public Timer() {}
public Timer(StreamInput in) throws IOException {
startTime = in.readVLong();
startNanoTime = in.readVLong();
stopTime = in.readVLong();
time = in.readVLong();
}
@Override
public synchronized void writeTo(StreamOutput out) throws IOException {
out.writeVLong(startTime);
out.writeVLong(startNanoTime);
out.writeVLong(stopTime);
// write a snapshot of current time, which is not per se the time field
out.writeVLong(time());
}
public synchronized void start() {
assert startTime == 0 : "already started";
startTime = System.currentTimeMillis();
startNanoTime = System.nanoTime();
}
/** Returns start time in millis */
public synchronized long startTime() {
return startTime;
}
/** Returns elapsed time in millis, or 0 if timer was not started */
public synchronized long time() {
if (startNanoTime == 0) {
return 0;
}
if (time >= 0) {
return time;
}
return Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startNanoTime));
}
/** Returns stop time in millis */
public synchronized long stopTime() {
return stopTime;
}
public synchronized void stop() {
assert stopTime == 0 : "already stopped";
stopTime = Math.max(System.currentTimeMillis(), startTime);
time = TimeValue.nsecToMSec(System.nanoTime() - startNanoTime);
assert time >= 0;
}
public synchronized void reset() {
startTime = 0;
startNanoTime = 0;
time = -1;
stopTime = 0;
}
// for tests
public long getStartNanoTime() {
return startNanoTime;
}
}
public static class VerifyIndex extends Timer implements ToXContentFragment, Writeable {
public static class VerifyIndex extends ReplicationTimer implements ToXContentFragment, Writeable {
private volatile long checkIndexTime;
public VerifyIndex() {}
@ -483,7 +409,7 @@ public class RecoveryState implements ToXContentFragment, Writeable {
}
}
public static class Translog extends Timer implements ToXContentFragment, Writeable {
public static class Translog extends ReplicationTimer implements ToXContentFragment, Writeable {
public static final int UNKNOWN = -1;
private int recovered;
@ -819,7 +745,7 @@ public class RecoveryState implements ToXContentFragment, Writeable {
}
}
public static class Index extends Timer implements ToXContentFragment, Writeable {
public static class Index extends ReplicationTimer implements ToXContentFragment, Writeable {
private final RecoveryFilesDetails fileDetails;
public static final long UNKNOWN = -1L;

View File

@ -0,0 +1,97 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication.common;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.unit.TimeValue;
import java.io.IOException;
/**
* A serializable timer that is used to measure the time taken for
* file replication operations like recovery.
*/
public class ReplicationTimer implements Writeable {
private long startTime = 0;
private long startNanoTime = 0;
private long time = -1;
private long stopTime = 0;
public ReplicationTimer() {}
public ReplicationTimer(StreamInput in) throws IOException {
startTime = in.readVLong();
startNanoTime = in.readVLong();
stopTime = in.readVLong();
time = in.readVLong();
}
@Override
public synchronized void writeTo(StreamOutput out) throws IOException {
out.writeVLong(startTime);
out.writeVLong(startNanoTime);
out.writeVLong(stopTime);
// write a snapshot of current time, which is not per se the time field
out.writeVLong(time());
}
public synchronized void start() {
assert startTime == 0 : "already started";
startTime = System.currentTimeMillis();
startNanoTime = System.nanoTime();
}
/**
* Returns start time in millis
*/
public synchronized long startTime() {
return startTime;
}
/**
* Returns elapsed time in millis, or 0 if timer was not started
*/
public synchronized long time() {
if (startNanoTime == 0) {
return 0;
}
if (time >= 0) {
return time;
}
return Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startNanoTime));
}
/**
* Returns stop time in millis
*/
public synchronized long stopTime() {
return stopTime;
}
public synchronized void stop() {
assert stopTime == 0 : "already stopped";
stopTime = Math.max(System.currentTimeMillis(), startTime);
time = TimeValue.nsecToMSec(System.nanoTime() - startNanoTime);
assert time >= 0;
}
public synchronized void reset() {
startTime = 0;
startNanoTime = 0;
time = -1;
stopTime = 0;
}
// only used in tests
public long getStartNanoTime() {
return startNanoTime;
}
}

View File

@ -14,33 +14,36 @@
package org.opensearch.common.concurrent;
import org.junit.Before;
import org.opensearch.common.util.concurrent.RefCounted;
import org.opensearch.test.OpenSearchTestCase;
import java.util.concurrent.atomic.AtomicInteger;
import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
public class GatedAutoCloseableTests extends OpenSearchTestCase {
public class AutoCloseableRefCountedTests extends OpenSearchTestCase {
private AtomicInteger testRef;
private GatedAutoCloseable<AtomicInteger> testObject;
private RefCounted mockRefCounted;
private AutoCloseableRefCounted<RefCounted> testObject;
@Before
public void setup() {
testRef = new AtomicInteger(0);
testObject = new GatedAutoCloseable<>(testRef, testRef::incrementAndGet);
mockRefCounted = mock(RefCounted.class);
testObject = new AutoCloseableRefCounted<>(mockRefCounted);
}
public void testGet() {
assertEquals(0, testObject.get().get());
assertEquals(mockRefCounted, testObject.get());
}
public void testClose() {
testObject.close();
assertEquals(1, testObject.get().get());
verify(mockRefCounted, atMostOnce()).decRef();
}
public void testIdempotent() {
testObject.close();
testObject.close();
assertEquals(1, testObject.get().get());
verify(mockRefCounted, atMostOnce()).decRef();
}
}

View File

@ -44,9 +44,9 @@ import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.RecoveryState.FileDetail;
import org.opensearch.indices.recovery.RecoveryState.Index;
import org.opensearch.indices.recovery.RecoveryState.Stage;
import org.opensearch.indices.recovery.RecoveryState.Timer;
import org.opensearch.indices.recovery.RecoveryState.Translog;
import org.opensearch.indices.recovery.RecoveryState.VerifyIndex;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.test.OpenSearchTestCase;
import java.io.IOException;
@ -63,9 +63,7 @@ import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.startsWith;
@ -124,72 +122,81 @@ public class RecoveryTargetTests extends OpenSearchTestCase {
}
}
public void testTimers() throws Throwable {
final Timer timer;
Streamer<Timer> streamer;
public void testTimer() throws Throwable {
AtomicBoolean stop = new AtomicBoolean();
if (randomBoolean()) {
timer = new Timer();
streamer = new Streamer<Timer>(stop, timer) {
@Override
Timer createObj(StreamInput in) throws IOException {
return new Timer(in);
}
};
} else if (randomBoolean()) {
timer = new Index();
streamer = new Streamer<Timer>(stop, timer) {
@Override
Timer createObj(StreamInput in) throws IOException {
return new Index(in);
}
};
} else if (randomBoolean()) {
timer = new VerifyIndex();
streamer = new Streamer<Timer>(stop, timer) {
@Override
Timer createObj(StreamInput in) throws IOException {
return new VerifyIndex(in);
}
};
} else {
timer = new Translog();
streamer = new Streamer<Timer>(stop, timer) {
@Override
Timer createObj(StreamInput in) throws IOException {
return new Translog(in);
}
};
}
final ReplicationTimer timer = new ReplicationTimer();
Streamer<ReplicationTimer> streamer = new Streamer<>(stop, timer) {
@Override
ReplicationTimer createObj(StreamInput in) throws IOException {
return new ReplicationTimer(in);
}
};
doTimerTest(timer, streamer);
}
public void testIndexTimer() throws Throwable {
AtomicBoolean stop = new AtomicBoolean();
Index index = new Index();
Streamer<Index> streamer = new Streamer<>(stop, index) {
@Override
Index createObj(StreamInput in) throws IOException {
return new Index(in);
}
};
doTimerTest(index, streamer);
}
public void testVerifyIndexTimer() throws Throwable {
AtomicBoolean stop = new AtomicBoolean();
VerifyIndex verifyIndex = new VerifyIndex();
Streamer<VerifyIndex> streamer = new Streamer<>(stop, verifyIndex) {
@Override
VerifyIndex createObj(StreamInput in) throws IOException {
return new VerifyIndex(in);
}
};
doTimerTest(verifyIndex, streamer);
}
public void testTranslogTimer() throws Throwable {
AtomicBoolean stop = new AtomicBoolean();
Translog translog = new Translog();
Streamer<Translog> streamer = new Streamer<>(stop, translog) {
@Override
Translog createObj(StreamInput in) throws IOException {
return new Translog(in);
}
};
doTimerTest(translog, streamer);
}
private void doTimerTest(ReplicationTimer timer, Streamer<? extends ReplicationTimer> streamer) throws Exception {
timer.start();
assertThat(timer.startTime(), greaterThan(0L));
assertThat(timer.stopTime(), equalTo(0L));
Timer lastRead = streamer.serializeDeserialize();
assertTrue(timer.startTime() > 0);
assertEquals(0, timer.stopTime());
ReplicationTimer lastRead = streamer.serializeDeserialize();
final long time = lastRead.time();
assertThat(time, lessThanOrEqualTo(timer.time()));
assertBusy(() -> assertThat("timer timer should progress compared to captured one ", time, lessThan(timer.time())));
assertThat("captured time shouldn't change", lastRead.time(), equalTo(time));
assertBusy(() -> assertTrue("timer timer should progress compared to captured one ", time < timer.time()));
assertEquals("captured time shouldn't change", time, lastRead.time());
if (randomBoolean()) {
timer.stop();
assertThat(timer.stopTime(), greaterThanOrEqualTo(timer.startTime()));
assertThat(timer.time(), greaterThan(0L));
lastRead = streamer.serializeDeserialize();
assertThat(lastRead.startTime(), equalTo(timer.startTime()));
assertThat(lastRead.time(), equalTo(timer.time()));
assertThat(lastRead.stopTime(), equalTo(timer.stopTime()));
}
timer.stop();
assertTrue(timer.stopTime() >= timer.startTime());
assertTrue(timer.time() > 0);
// validate captured time
lastRead = streamer.serializeDeserialize();
assertEquals(timer.startTime(), lastRead.startTime());
assertEquals(timer.time(), lastRead.time());
assertEquals(timer.stopTime(), lastRead.stopTime());
timer.reset();
assertThat(timer.startTime(), equalTo(0L));
assertThat(timer.time(), equalTo(0L));
assertThat(timer.stopTime(), equalTo(0L));
assertEquals(0, timer.startTime());
assertEquals(0, timer.time());
assertEquals(0, timer.stopTime());
// validate captured time
lastRead = streamer.serializeDeserialize();
assertThat(lastRead.startTime(), equalTo(0L));
assertThat(lastRead.time(), equalTo(0L));
assertThat(lastRead.stopTime(), equalTo(0L));
assertEquals(0, lastRead.startTime());
assertEquals(0, lastRead.time());
assertEquals(0, lastRead.stopTime());
}
public void testIndex() throws Throwable {

View File

@ -45,6 +45,7 @@ import org.opensearch.common.xcontent.XContentOpenSearchExtension;
import org.opensearch.index.Index;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.test.OpenSearchTestCase;
import java.util.ArrayList;
@ -72,7 +73,7 @@ public class RestRecoveryActionTests extends OpenSearchTestCase {
for (int i = 0; i < successfulShards; i++) {
final RecoveryState state = mock(RecoveryState.class);
when(state.getShardId()).thenReturn(new ShardId(new Index("index", "_na_"), i));
final RecoveryState.Timer timer = mock(RecoveryState.Timer.class);
final ReplicationTimer timer = mock(ReplicationTimer.class);
final long startTime = randomLongBetween(0, new Date().getTime());
when(timer.startTime()).thenReturn(startTime);
final long time = randomLongBetween(1000000, 10 * 1000000);