Improved error message when topic name changes within same supervisor (#13815)

Improved error message when topic name changes within same supervisor

Co-authored-by: Katya Macedo  <38017980+ektravel@users.noreply.github.com>
This commit is contained in:
Abhishek Agarwal 2023-03-08 07:40:18 +05:30 committed by GitHub
parent ef82756176
commit 52bd9e6adb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 147 additions and 52 deletions

View File

@ -221,7 +221,7 @@ batch ingestion emit the following metrics. These metrics are deltas for each em
|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate segments.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. Generally a few minutes at most.|
|`ingest/handoff/count`|Number of handoffs that happened.|`dataSource`, `taskId`, `taskType`, `tags`|Varies. Generally greater than 0 once every segment granular period if cluster operating normally.|
|`ingest/sink/count`|Number of sinks not handoffed.|`dataSource`, `taskId`, `taskType`, `tags`|1~3|
|`ingest/events/messageGap`|Time gap in milliseconds between the latest ingested event timestamp and the current system timestamp of metrics emission. |`dataSource`, `taskId`, `taskType`, `tags`|Greater than 0, depends on the time carried in event. |
|`ingest/events/messageGap`|Time gap in milliseconds between the latest ingested event timestamp and the current system timestamp of metrics emission. If the value is increasing but lag is low, Druid may not be receiving new data. This metric is reset as new tasks spawn up.|`dataSource`, `taskId`, `taskType`, `tags`|Greater than 0, depends on the time carried in event. |
|`ingest/notices/queueSize`|Number of pending notices to be processed by the coordinator.|`dataSource`, `tags`|Typically 0 and occasionally in lower single digits. Should not be a very high number. |
|`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`dataSource`, `tags`| < 1s |
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|

View File

@ -191,7 +191,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
{
final long thisTimeoutNanos = timeoutAtNanos - System.nanoTime();
if (hasTimeout && thisTimeoutNanos < 0) {
throw new QueryTimeoutException("Sequence iterator timed out");
throw new QueryTimeoutException();
}
if (currentBatch != null && !currentBatch.isTerminalResult() && !currentBatch.isDrained()) {
@ -206,7 +206,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
}
}
if (currentBatch == null) {
throw new QueryTimeoutException("Sequence iterator timed out waiting for data");
throw new QueryTimeoutException();
}
if (cancellationGizmo.isCancelled()) {

View File

@ -36,7 +36,8 @@ import javax.annotation.Nullable;
public class QueryTimeoutException extends QueryException
{
private static final String ERROR_CLASS = QueryTimeoutException.class.getName();
public static final String ERROR_MESSAGE = "Query Timed Out!";
public static final String ERROR_MESSAGE = "Query did not complete within configured timeout period. " +
"You can increase query timeout or tune the performance of query.";
public static final int STATUS_CODE = 504;
@JsonCreator

View File

@ -511,7 +511,8 @@ public class ParallelMergeCombiningSequenceTest
input.add(nonBlockingSequence(someSize));
input.add(blockingSequence(someSize, 400, 500, 1, 500, true));
expectedException.expect(QueryTimeoutException.class);
expectedException.expectMessage("Sequence iterator timed out waiting for data");
expectedException.expectMessage("Query did not complete within configured timeout period. " +
"You can increase query timeout or tune the performance of query");
assertException(
input,
@ -533,7 +534,8 @@ public class ParallelMergeCombiningSequenceTest
input.add(nonBlockingSequence(someSize));
expectedException.expect(QueryTimeoutException.class);
expectedException.expectMessage("Sequence iterator timed out");
expectedException.expectMessage("Query did not complete within configured timeout period. " +
"You can increase query timeout or tune the performance of query");
assertException(input, 8, 64, 1000, 500);
}

View File

@ -65,7 +65,8 @@ public class QueryTimeoutExceptionTest
timeoutException.getErrorCode()
);
Assert.assertEquals(
"Query Timed Out!",
"Query did not complete within configured timeout period. You can increase " +
"query timeout or tune the performance of query.",
timeoutException.getMessage()
);
Assert.assertEquals(

View File

@ -87,6 +87,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@ -116,13 +117,6 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
this.connector = connector;
}
enum DataStoreMetadataUpdateResult
{
SUCCESS,
FAILURE,
TRY_AGAIN
}
@LifecycleStart
public void start()
{
@ -365,15 +359,15 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
endMetadata
);
if (result != DataStoreMetadataUpdateResult.SUCCESS) {
if (result.isFailed()) {
// Metadata was definitely not updated.
transactionStatus.setRollbackOnly();
definitelyNotUpdated.set(true);
if (result == DataStoreMetadataUpdateResult.FAILURE) {
throw new RuntimeException("Aborting transaction!");
} else if (result == DataStoreMetadataUpdateResult.TRY_AGAIN) {
throw new RetryTransactionException("Aborting transaction!");
if (result.canRetry()) {
throw new RetryTransactionException(result.getErrorMsg());
} else {
throw new RuntimeException(result.getErrorMsg());
}
}
}
@ -384,15 +378,15 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
segmentsToDrop,
dataSource
);
if (result != DataStoreMetadataUpdateResult.SUCCESS) {
if (result.isFailed()) {
// Metadata store was definitely not updated.
transactionStatus.setRollbackOnly();
definitelyNotUpdated.set(true);
if (result == DataStoreMetadataUpdateResult.FAILURE) {
throw new RuntimeException("Aborting transaction!");
} else if (result == DataStoreMetadataUpdateResult.TRY_AGAIN) {
throw new RetryTransactionException("Aborting transaction!");
if (result.canRetry()) {
throw new RetryTransactionException(result.getErrorMsg());
} else {
throw new RuntimeException(result.getErrorMsg());
}
}
}
@ -455,15 +449,15 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
endMetadata
);
if (result != DataStoreMetadataUpdateResult.SUCCESS) {
if (result.isFailed()) {
// Metadata was definitely not updated.
transactionStatus.setRollbackOnly();
definitelyNotUpdated.set(true);
if (result == DataStoreMetadataUpdateResult.FAILURE) {
throw new RuntimeException("Aborting transaction!");
} else if (result == DataStoreMetadataUpdateResult.TRY_AGAIN) {
throw new RetryTransactionException("Aborting transaction!");
if (result.canRetry()) {
throw new RetryTransactionException(result.getErrorMsg());
} else {
throw new RuntimeException(result.getErrorMsg());
}
}
@ -1572,12 +1566,12 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
if (!startMetadataMatchesExisting) {
// Not in the desired start state.
log.error(
"Not updating metadata, existing state[%s] in metadata store doesn't match to the new start state[%s].",
return new DataStoreMetadataUpdateResult(true, false, StringUtils.format(
"Inconsistent metadata state. This can happen if you update input topic in a spec without changing " +
"the supervisor name. Stored state: [%s], Target state: [%s].",
oldCommitMetadataFromDb,
startMetadata
);
return DataStoreMetadataUpdateResult.FAILURE;
));
}
// Only endOffsets should be stored in metadata store
@ -1605,7 +1599,13 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.bind("commit_metadata_sha1", newCommitMetadataSha1)
.execute();
retVal = numRows == 1 ? DataStoreMetadataUpdateResult.SUCCESS : DataStoreMetadataUpdateResult.TRY_AGAIN;
retVal = numRows == 1
? DataStoreMetadataUpdateResult.SUCCESS
: new DataStoreMetadataUpdateResult(
true,
true,
"Failed to insert metadata for datasource [%s]",
dataSource);
} else {
// Expecting a particular old metadata; use the SHA1 in a compare-and-swap UPDATE
final int numRows = handle.createStatement(
@ -1623,10 +1623,16 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.bind("new_commit_metadata_sha1", newCommitMetadataSha1)
.execute();
retVal = numRows == 1 ? DataStoreMetadataUpdateResult.SUCCESS : DataStoreMetadataUpdateResult.TRY_AGAIN;
retVal = numRows == 1
? DataStoreMetadataUpdateResult.SUCCESS
: new DataStoreMetadataUpdateResult(
true,
true,
"Failed to update metadata for datasource [%s]",
dataSource);
}
if (retVal == DataStoreMetadataUpdateResult.SUCCESS) {
if (retVal.isSuccess()) {
log.info("Updated metadata from[%s] to[%s].", oldCommitMetadataFromDb, newCommitMetadata);
} else {
log.info("Not updating metadata, compare-and-swap failure.");
@ -1665,11 +1671,11 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
if (segmentsToDrop.stream().anyMatch(segment -> !dataSource.equals(segment.getDataSource()))) {
// All segments to drop must belong to the same datasource
log.error(
return new DataStoreMetadataUpdateResult(
true,
false,
"Not dropping segments, as not all segments belong to the datasource[%s].",
dataSource
);
return DataStoreMetadataUpdateResult.FAILURE;
dataSource);
}
final int numChangedSegments =
@ -1679,11 +1685,13 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
);
if (numChangedSegments != segmentsToDrop.size()) {
log.warn("Failed to drop segments metadata update as numChangedSegments[%s] segmentsToDropSize[%s]",
numChangedSegments,
segmentsToDrop.size()
return new DataStoreMetadataUpdateResult(
true,
true,
"Failed to drop some segments. Only %d could be dropped out of %d. Trying again",
numChangedSegments,
segmentsToDrop.size()
);
return DataStoreMetadataUpdateResult.TRY_AGAIN;
}
return DataStoreMetadataUpdateResult.SUCCESS;
}
@ -1908,4 +1916,71 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
}
public static class DataStoreMetadataUpdateResult
{
private final boolean failed;
private final boolean canRetry;
@Nullable private final String errorMsg;
public static DataStoreMetadataUpdateResult SUCCESS = new DataStoreMetadataUpdateResult(false, false, null);
DataStoreMetadataUpdateResult(boolean failed, boolean canRetry, @Nullable String errorMsg, Object... errorFormatArgs)
{
this.failed = failed;
this.canRetry = canRetry;
this.errorMsg = null == errorMsg ? null : StringUtils.format(errorMsg, errorFormatArgs);
}
public boolean isFailed()
{
return failed;
}
public boolean isSuccess()
{
return !failed;
}
public boolean canRetry()
{
return canRetry;
}
@Nullable
public String getErrorMsg()
{
return errorMsg;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DataStoreMetadataUpdateResult that = (DataStoreMetadataUpdateResult) o;
return failed == that.failed && canRetry == that.canRetry && Objects.equals(errorMsg, that.errorMsg);
}
@Override
public int hashCode()
{
return Objects.hash(failed, canRetry, errorMsg);
}
@Override
public String toString()
{
return "DataStoreMetadataUpdateResult{" +
"failed=" + failed +
", canRetry=" + canRetry +
", errorMsg='" + errorMsg + '\'' +
'}';
}
}
}

View File

@ -613,7 +613,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
{
metadataUpdateCounter.getAndIncrement();
if (attemptCounter.getAndIncrement() == 0) {
return DataStoreMetadataUpdateResult.TRY_AGAIN;
return new DataStoreMetadataUpdateResult(true, true, null);
} else {
return super.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata);
}
@ -680,7 +680,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
new ObjectMetadata(ImmutableMap.of("foo", "bar")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result1);
Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Inconsistent metadata state. This can " +
"happen if you update input topic in a spec without changing the supervisor name. " +
"Stored state: [null], " +
"Target state: [ObjectMetadata{theObject={foo=bar}}]."), result1);
// Should only be tried once.
Assert.assertEquals(1, metadataUpdateCounter.get());
@ -711,7 +714,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
null,
null
);
Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result1);
Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Not dropping segments, " +
"as not all segments belong to the datasource[fooDataSource]."), result1);
// Should only be tried once. Since dropSegmentsWithHandle will return FAILURE (not TRY_AGAIN) as set of
// segments to drop contains more than one datasource.
@ -778,7 +782,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
null
);
Assert.assertEquals(SegmentPublishResult.fail(
"org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result1);
"org.apache.druid.metadata.RetryTransactionException: Failed to drop some segments. " +
"Only 1 could be dropped out of 2. Trying again"), result1);
Assert.assertEquals(MAX_SQL_MEATADATA_RETRY_FOR_TEST, segmentTableDropUpdateCounter.get());
@ -805,7 +810,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result2);
Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Inconsistent metadata state. This can " +
"happen if you update input topic in a spec without changing the supervisor name. " +
"Stored state: [ObjectMetadata{theObject={foo=baz}}], " +
"Target state: [ObjectMetadata{theObject=null}]."), result2);
// Should only be tried once per call.
Assert.assertEquals(2, metadataUpdateCounter.get());
@ -828,7 +836,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
new ObjectMetadata(ImmutableMap.of("foo", "qux")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result2);
Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Inconsistent metadata state. This can " +
"happen if you update input topic in a spec without changing the supervisor name. " +
"Stored state: [ObjectMetadata{theObject={foo=baz}}], " +
"Target state: [ObjectMetadata{theObject={foo=qux}}]."), result2);
// Should only be tried once per call.
Assert.assertEquals(2, metadataUpdateCounter.get());
@ -2256,7 +2267,11 @@ public class IndexerSQLMetadataStorageCoordinatorTest
ImmutableSet.of(defaultSegment),
defaultSegment.getDataSource()
);
Assert.assertEquals(IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult.TRY_AGAIN, result);
Assert.assertEquals(new IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult(
true,
true,
"Failed to drop some segments. Only 0 could be dropped out of 1. Trying again"),
result);
}
}

View File

@ -645,7 +645,8 @@ public class QueryResourceTest
ByteArrayOutputStream baos = new ByteArrayOutputStream();
((StreamingOutput) response.getEntity()).write(baos);
QueryTimeoutException ex = jsonMapper.readValue(baos.toByteArray(), QueryTimeoutException.class);
Assert.assertEquals("Query Timed Out!", ex.getMessage());
Assert.assertEquals("Query did not complete within configured timeout period. You can " +
"increase query timeout or tune the performance of query.", ex.getMessage());
Assert.assertEquals(QueryException.QUERY_TIMEOUT_ERROR_CODE, ex.getErrorCode());
Assert.assertEquals(1, timeoutQueryResource.getTimedOutQueryCount());