Cleanup usages of stopwatch (#16478)

Changes:
- Remove synchronized methods from `Stopwatch`
- Access stopwatch methods in `ChangeRequestHttpSyncer` inside a lock
This commit is contained in:
Kashif Faraz 2024-05-27 23:08:46 +05:30 committed by GitHub
parent 4e1de50e30
commit 9d77ef04f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 89 additions and 103 deletions

View File

@ -25,11 +25,8 @@ import org.joda.time.Duration;
import java.util.concurrent.TimeUnit;
/**
* Thread-safe wrapper over {@link com.google.common.base.Stopwatch}.
* <p>
* Thread safety has been limited to the start/stop methods for now as they are
* the only ones that can throw an exception in an illegal state and are thus
* vulnerable to race conditions.
* Wrapper over {@link com.google.common.base.Stopwatch} to provide some utility
* methods such as {@link #millisElapsed()}, {@link #restart()}, {@link #hasElapsed(Duration)}.
*/
public class Stopwatch
{
@ -55,17 +52,17 @@ public class Stopwatch
this.delegate = delegate;
}
public synchronized void start()
public void start()
{
delegate.start();
}
public synchronized void stop()
public void stop()
{
delegate.stop();
}
public synchronized void reset()
public void reset()
{
delegate.reset();
}
@ -73,12 +70,12 @@ public class Stopwatch
/**
* Invokes {@code reset().start()} on the underlying {@link com.google.common.base.Stopwatch}.
*/
public synchronized void restart()
public void restart()
{
delegate.reset().start();
}
public synchronized boolean isRunning()
public boolean isRunning()
{
return delegate.isRunning();
}

View File

@ -67,9 +67,9 @@ public class DruidLeaderClient
private final String leaderRequestPath;
private LifecycleLock lifecycleLock = new LifecycleLock();
private final LifecycleLock lifecycleLock = new LifecycleLock();
private DruidNodeDiscovery druidNodeDiscovery;
private AtomicReference<String> currentKnownLeader = new AtomicReference<>();
private final AtomicReference<String> currentKnownLeader = new AtomicReference<>();
public DruidLeaderClient(
HttpClient httpClient,

View File

@ -71,12 +71,9 @@ import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import javax.annotation.Nullable;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -1036,42 +1033,26 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
final Stopwatch stopwatch = Stopwatch.createStarted();
log.info("Starting polling of segment table.");
// some databases such as PostgreSQL require auto-commit turned off
// Some databases such as PostgreSQL require auto-commit turned off
// to stream results back, enabling transactions disables auto-commit
//
// setting connection to read-only will allow some database such as MySQL
// to automatically use read-only transaction mode, further optimizing the query
final List<DataSegment> segments = connector.inReadOnlyTransaction(
new TransactionCallback<List<DataSegment>>()
{
@Override
public List<DataSegment> inTransaction(Handle handle, TransactionStatus status)
{
return handle
.createQuery(StringUtils.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable()))
.setFetchSize(connector.getStreamingFetchSize())
.map(
new ResultSetMapper<DataSegment>()
{
@Override
public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLException
{
try {
DataSegment segment = jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
return replaceWithExistingSegmentIfPresent(segment);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read segment from db.").emit();
// If one entry in database is corrupted doPoll() should continue to work overall. See
// filter by `Objects::nonNull` below in this method.
return null;
}
}
}
)
.list();
}
}
(handle, status) -> handle
.createQuery(StringUtils.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable()))
.setFetchSize(connector.getStreamingFetchSize())
.map((index, r, ctx) -> {
try {
DataSegment segment = jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
return replaceWithExistingSegmentIfPresent(segment);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read segment from db.").emit();
// If one entry in database is corrupted doPoll() should continue to work overall. See
// filter by `Objects::nonNull` below in this method.
return null;
}
}).list()
);
Preconditions.checkNotNull(
@ -1082,11 +1063,13 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
if (segments.isEmpty()) {
log.info("No segments found in the database!");
} else {
log.info("Polled and found [%,d] segments in the database in [%,d] ms.", segments.size(), stopwatch.millisElapsed());
log.info(
"Polled and found [%,d] segments in the database in [%,d] ms.",
segments.size(), stopwatch.millisElapsed()
);
}
stopwatch.restart();
createDatasourcesSnapshot(stopwatch, segments);
createDatasourcesSnapshot(segments);
}
private void doPollSegmentAndSchema()
@ -1157,25 +1140,18 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
(handle, status) -> {
handle.createQuery(schemaPollQuery)
.setFetchSize(connector.getStreamingFetchSize())
.map(
new ResultSetMapper<Void>()
{
@Override
public Void map(int index, ResultSet r, StatementContext ctx) throws SQLException
{
try {
schemaMapBuilder.put(
r.getString("fingerprint"),
jsonMapper.readValue(r.getBytes("payload"), SchemaPayload.class)
);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read schema from db.").emit();
}
return null;
}
})
.list();
.map((index, r, ctx) -> {
try {
schemaMapBuilder.put(
r.getString("fingerprint"),
jsonMapper.readValue(r.getBytes("payload"), SchemaPayload.class)
);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read schema from db.").emit();
}
return null;
}).list();
segmentSchemaCache.resetTemporaryPublishedMetadataQueryResultOnDBPoll();
return null;
@ -1195,19 +1171,17 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
log.info("No segments found in the database!");
} else {
log.info(
"Polled and found total [%,d] segments and [%,d] schema in the database in [%,d] ms.",
segments.size(),
schemaMap.size(),
stopwatch.millisElapsed()
"Polled and found [%,d] segments and [%,d] schemas in the database in [%,d] ms.",
segments.size(), schemaMap.size(), stopwatch.millisElapsed()
);
}
stopwatch.restart();
createDatasourcesSnapshot(stopwatch, segments);
createDatasourcesSnapshot(segments);
}
private void createDatasourcesSnapshot(Stopwatch stopwatch, List<DataSegment> segments)
private void createDatasourcesSnapshot(List<DataSegment> segments)
{
final Stopwatch stopwatch = Stopwatch.createStarted();
// dataSourcesSnapshot is updated only here and the DataSourcesSnapshot object is immutable. If data sources or
// segments are marked as used or unused directly (via markAs...() methods in SegmentsMetadataManager), the
// dataSourcesSnapshot can become invalid until the next database poll.

View File

@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.SchemaPayloadPlus;
@ -159,13 +160,11 @@ public class SegmentSchemaBackFillQueue
return;
}
Stopwatch stopwatch = Stopwatch.createStarted();
final Stopwatch stopwatch = Stopwatch.createStarted();
log.info("Backfilling segment schema. Queue size is [%s].", queue.size());
log.info("Backfilling segment schema. Queue size is [%s]", queue.size());
int itemsToProcess = Math.min(MAX_BATCH_SIZE, queue.size());
Map<String, List<SegmentSchemaMetadataPlus>> polled = new HashMap<>();
final int itemsToProcess = Math.min(MAX_BATCH_SIZE, queue.size());
final Map<String, List<SegmentSchemaMetadataPlus>> polled = new HashMap<>();
for (int i = 0; i < itemsToProcess; i++) {
SegmentSchemaMetadataPlus item = queue.poll();
if (item != null) {
@ -175,21 +174,29 @@ public class SegmentSchemaBackFillQueue
for (Map.Entry<String, List<SegmentSchemaMetadataPlus>> entry : polled.entrySet()) {
try {
segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(entry.getKey(), entry.getValue(), CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(
entry.getKey(),
entry.getValue(),
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
);
// Mark the segments as published in the cache.
for (SegmentSchemaMetadataPlus plus : entry.getValue()) {
segmentSchemaCache.markMetadataQueryResultPublished(plus.getSegmentId());
}
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("dataSource", entry.getKey())
.setMetric("metadatacache/backfill/count", entry.getValue().size())
.setDimension(DruidMetrics.DATASOURCE, entry.getKey())
.setMetric("metadatacache/backfill/count", entry.getValue().size())
);
}
catch (Exception e) {
log.error(e, "Exception persisting schema and updating segments table for datasource [%s].", entry.getKey());
log.error(e, "Exception persisting schema and updating segments table for datasource[%s].", entry.getKey());
}
}
emitter.emit(ServiceMetricEvent.builder().setMetric("metadatacache/backfill/time", stopwatch.millisElapsed()));
emitter.emit(
ServiceMetricEvent.builder()
.setMetric("metadatacache/backfill/time", stopwatch.millisElapsed())
);
}
}

View File

@ -82,9 +82,13 @@ public class ChangeRequestHttpSyncer<T>
private final CountDownLatch initializationLatch = new CountDownLatch(1);
/**
* This lock is used to ensure proper start-then-stop semantics and making sure after stopping no state update happens
* and {@link #sync} is not again scheduled in {@link #executor} and if there was a previously scheduled sync before
* stopping, it is skipped and also, it is used to ensure that duplicate syncs are never scheduled in the executor.
* Lock to implement proper start-then-stop semantics. Used to ensure that:
* <ul>
* <li>No state update happens after {@link #stop()}.</li>
* <li>No sync is scheduled after {@link #stop()}.</li>
* <li>Any pending sync is skipped when {@link #stop()} has been called.</li>
* <li>Duplicate syncs are not scheduled on the executor.</li>
* </ul>
*/
private final LifecycleLock startStopLock = new LifecycleLock();
@ -141,7 +145,7 @@ public class ChangeRequestHttpSyncer<T>
startStopLock.exitStart();
}
sinceSyncerStart.restart();
safeRestart(sinceSyncerStart);
addNextSyncToWorkQueue();
}
}
@ -220,21 +224,18 @@ public class ChangeRequestHttpSyncer<T>
*/
public boolean isSyncedSuccessfully()
{
if (consecutiveFailedAttemptCount > 0) {
return false;
} else {
return sinceLastSyncSuccess.hasNotElapsed(maxDurationToWaitForSync);
}
return consecutiveFailedAttemptCount <= 0
&& sinceLastSyncSuccess.hasNotElapsed(maxDurationToWaitForSync);
}
private void sync()
private void sendSyncRequest()
{
if (!startStopLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
log.info("Skipping sync for server[%s] as syncer has not started yet.", logIdentity);
return;
}
sinceLastSyncRequest.restart();
safeRestart(sinceLastSyncRequest);
try {
final String req = getRequestString();
@ -270,7 +271,7 @@ public class ChangeRequestHttpSyncer<T>
final int responseCode = responseHandler.getStatus();
if (responseCode == HttpServletResponse.SC_NO_CONTENT) {
log.debug("Received NO CONTENT from server[%s]", logIdentity);
sinceLastSyncSuccess.restart();
safeRestart(sinceLastSyncSuccess);
return;
} else if (responseCode != HttpServletResponse.SC_OK) {
handleFailure(new ISE("Received sync response [%d]", responseCode));
@ -306,7 +307,7 @@ public class ChangeRequestHttpSyncer<T>
log.info("Server[%s] synced successfully.", logIdentity);
}
sinceLastSyncSuccess.restart();
safeRestart(sinceLastSyncSuccess);
}
catch (Exception ex) {
markServerUnstableAndAlert(ex, "Processing Response");
@ -390,9 +391,9 @@ public class ChangeRequestHttpSyncer<T>
RetryUtils.nextRetrySleepMillis(consecutiveFailedAttemptCount)
);
log.info("Scheduling next sync for server[%s] in [%d] millis.", logIdentity, delayMillis);
executor.schedule(this::sync, delayMillis, TimeUnit.MILLISECONDS);
executor.schedule(this::sendSyncRequest, delayMillis, TimeUnit.MILLISECONDS);
} else {
executor.execute(this::sync);
executor.execute(this::sendSyncRequest);
}
}
catch (Throwable th) {
@ -410,10 +411,17 @@ public class ChangeRequestHttpSyncer<T>
}
}
private void safeRestart(Stopwatch stopwatch)
{
synchronized (startStopLock) {
stopwatch.restart();
}
}
private void markServerUnstableAndAlert(Throwable throwable, String action)
{
if (consecutiveFailedAttemptCount++ == 0) {
sinceUnstable.restart();
safeRestart(sinceUnstable);
}
final long unstableSeconds = getUnstableTimeMillis() / 1000;