Update error messages when supervisor's checkpoint state is invalid (#16208)

* Update error message when topic messages.

Suggest resetting the supervisor when the topic changes instead of changing
the supervisor name which is actually making a new supervisor.

* Update server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java

Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>

* Cleanup

* Remove log and include oldCommitMetadataFromDb

* Fix test

---------

Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
This commit is contained in:
Abhishek Radhakrishnan 2024-04-03 10:34:17 -07:00 committed by GitHub
parent 1df41db46d
commit 75fb57ed6e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 70 additions and 57 deletions

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.actions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
@ -151,7 +152,12 @@ public class SegmentTransactionalInsertActionTest
);
Assert.assertEquals(
SegmentPublishResult.fail("java.lang.RuntimeException: Failed to update the metadata Store. The new start metadata is ahead of last commited end state."),
SegmentPublishResult.fail(
InvalidInput.exception(
"The new start metadata state[ObjectMetadata{theObject=[1]}] is ahead of the last commited end"
+ " state[null]. Try resetting the supervisor."
).toString()
),
result
);
}

View File

@ -33,6 +33,7 @@ import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.inject.Inject;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SegmentCreateRequest;
@ -445,41 +446,33 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
try {
return connector.retryTransaction(
new TransactionCallback<SegmentPublishResult>()
{
@Override
public SegmentPublishResult inTransaction(
final Handle handle,
final TransactionStatus transactionStatus
) throws Exception
{
// Set definitelyNotUpdated back to false upon retrying.
definitelyNotUpdated.set(false);
(handle, transactionStatus) -> {
// Set definitelyNotUpdated back to false upon retrying.
definitelyNotUpdated.set(false);
if (startMetadata != null) {
final DataStoreMetadataUpdateResult result = updateDataSourceMetadataWithHandle(
handle,
dataSource,
startMetadata,
endMetadata
);
if (startMetadata != null) {
final DataStoreMetadataUpdateResult result = updateDataSourceMetadataWithHandle(
handle,
dataSource,
startMetadata,
endMetadata
);
if (result.isFailed()) {
// Metadata was definitely not updated.
transactionStatus.setRollbackOnly();
definitelyNotUpdated.set(true);
if (result.isFailed()) {
// Metadata was definitely not updated.
transactionStatus.setRollbackOnly();
definitelyNotUpdated.set(true);
if (result.canRetry()) {
throw new RetryTransactionException(result.getErrorMsg());
} else {
throw new RuntimeException(result.getErrorMsg());
}
if (result.canRetry()) {
throw new RetryTransactionException(result.getErrorMsg());
} else {
throw InvalidInput.exception(result.getErrorMsg());
}
}
final Set<DataSegment> inserted = announceHistoricalSegmentBatch(handle, segments, usedSegments);
return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted));
}
final Set<DataSegment> inserted = announceHistoricalSegmentBatch(handle, segments, usedSegments);
return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted));
},
3,
getSqlMetadataMaxRetry()
@ -2395,17 +2388,19 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
final boolean startMetadataMatchesExisting;
int startMetadataGreaterThanExisting = 0;
boolean startMetadataGreaterThanExisting = false;
if (oldCommitMetadataFromDb == null) {
startMetadataMatchesExisting = startMetadata.isValidStart();
startMetadataGreaterThanExisting = 1;
startMetadataGreaterThanExisting = true;
} else {
// Checking against the last committed metadata.
// If the new start sequence number is greater than the end sequence number of last commit compareTo() function will return 1,
// 0 in all other cases. It might be because multiple tasks are publishing the sequence at around same time.
// If the new start sequence number is greater than the end sequence number of the last commit,
// compareTo() will return 1 and 0 in all other cases. This can happen if multiple tasks are publishing the
// sequence around the same time.
if (startMetadata instanceof Comparable) {
startMetadataGreaterThanExisting = ((Comparable) startMetadata.asStartMetadata()).compareTo(oldCommitMetadataFromDb.asStartMetadata());
startMetadataGreaterThanExisting = ((Comparable) startMetadata.asStartMetadata())
.compareTo(oldCommitMetadataFromDb.asStartMetadata()) > 0;
}
// Converting the last one into start metadata for checking since only the same type of metadata can be matched.
@ -2415,25 +2410,20 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
startMetadataMatchesExisting = startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata());
}
if (startMetadataGreaterThanExisting == 1 && !startMetadataMatchesExisting) {
// Offset stored in StartMetadata is Greater than the last commited metadata,
// Then retry multiple task might be trying to publish the segment for same partitions.
log.info("Failed to update the metadata Store. The new start metadata: [%s] is ahead of last commited end state: [%s].",
startMetadata,
oldCommitMetadataFromDb);
if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) {
// Offsets stored in startMetadata is greater than the last commited metadata.
return new DataStoreMetadataUpdateResult(true, false,
"Failed to update the metadata Store. The new start metadata is ahead of last commited end state."
"The new start metadata state[%s] is ahead of the last commited"
+ " end state[%s]. Try resetting the supervisor.", startMetadata, oldCommitMetadataFromDb
);
}
if (!startMetadataMatchesExisting) {
// Not in the desired start state.
return new DataStoreMetadataUpdateResult(true, false, StringUtils.format(
"Inconsistent metadata state. This can happen if you update input topic in a spec without changing " +
"the supervisor name. Stored state: [%s], Target state: [%s].",
oldCommitMetadataFromDb,
startMetadata
));
return new DataStoreMetadataUpdateResult(true, false,
"Inconsistency between stored metadata state[%s] and target state[%s]. Try resetting the supervisor.",
oldCommitMetadataFromDb, startMetadata
);
}
// Only endOffsets should be stored in metadata store

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Iterables;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.ObjectMetadata;
import org.apache.druid.indexing.overlord.SegmentCreateRequest;
@ -935,7 +936,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
new ObjectMetadata(ImmutableMap.of("foo", "bar")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Failed to update the metadata Store. The new start metadata is ahead of last commited end state."), result1);
Assert.assertEquals(
SegmentPublishResult.fail(
InvalidInput.exception(
"The new start metadata state[ObjectMetadata{theObject={foo=bar}}] is ahead of the last commited"
+ " end state[null]. Try resetting the supervisor."
).toString()),
result1
);
// Should only be tried once.
Assert.assertEquals(1, metadataUpdateCounter.get());
@ -956,10 +964,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Inconsistent metadata state. This can " +
"happen if you update input topic in a spec without changing the supervisor name. " +
"Stored state: [ObjectMetadata{theObject={foo=baz}}], " +
"Target state: [ObjectMetadata{theObject=null}]."), result2);
Assert.assertEquals(
SegmentPublishResult.fail(
InvalidInput.exception(
"Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}]"
+ " and target state[ObjectMetadata{theObject=null}]. Try resetting the supervisor."
).toString()
),
result2
);
// Should only be tried once per call.
Assert.assertEquals(2, metadataUpdateCounter.get());
@ -1026,10 +1039,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
new ObjectMetadata(ImmutableMap.of("foo", "qux")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Inconsistent metadata state. This can " +
"happen if you update input topic in a spec without changing the supervisor name. " +
"Stored state: [ObjectMetadata{theObject={foo=baz}}], " +
"Target state: [ObjectMetadata{theObject={foo=qux}}]."), result2);
Assert.assertEquals(
SegmentPublishResult.fail(
InvalidInput.exception(
"Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}] and "
+ "target state[ObjectMetadata{theObject={foo=qux}}]. Try resetting the supervisor."
).toString()),
result2
);
// Should only be tried once per call.
Assert.assertEquals(2, metadataUpdateCounter.get());