MSQ: Report the warning directly as an error if none of it is allowed by the user (#13198)

In MSQ, there can be an upper limit to the number of worker warnings. For example, for parseExceptions encountered while parsing the external data, the user can specify an upper limit to the number of parse exceptions that can be allowed before it throws an error of type TooManyWarnings.

This PR makes it so that if the user disallows warnings of a certain type i.e. the limit is 0 (or is executing in strict mode), instead of throwing an error of type TooManyWarnings, we can directly surface the warning as the error, saving the user from the hassle of going throw the warning reports.
This commit is contained in:
Laksh Singla 2022-10-20 13:43:10 +05:30 committed by GitHub
parent 9763bf8050
commit fc262dfbaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 142 additions and 44 deletions

View File

@ -533,15 +533,6 @@ public class ControllerImpl implements Controller
log.debug("Query [%s] durable storage mode is set to %s.", queryDef.getQueryId(), isDurableStorageEnabled);
this.workerTaskLauncher = new MSQWorkerTaskLauncher(
id(),
task.getDataSource(),
context,
isDurableStorageEnabled,
// 10 minutes +- 2 minutes jitter
TimeUnit.SECONDS.toMillis(600 + ThreadLocalRandom.current().nextInt(-4, 5) * 30L)
);
long maxParseExceptions = -1;
@ -552,6 +543,17 @@ public class ControllerImpl implements Controller
.orElse(MSQWarnings.DEFAULT_MAX_PARSE_EXCEPTIONS_ALLOWED);
}
this.workerTaskLauncher = new MSQWorkerTaskLauncher(
id(),
task.getDataSource(),
context,
isDurableStorageEnabled,
maxParseExceptions,
// 10 minutes +- 2 minutes jitter
TimeUnit.SECONDS.toMillis(600 + ThreadLocalRandom.current().nextInt(-4, 5) * 30L)
);
this.faultsExceededChecker = new FaultsExceededChecker(
ImmutableMap.of(CannotParseExternalDataFault.CODE, maxParseExceptions)
);
@ -644,6 +646,7 @@ public class ControllerImpl implements Controller
// Present means the warning limit was exceeded, and warnings have therefore turned into an error.
String errorCode = warningsExceeded.get().lhs;
Long limit = warningsExceeded.get().rhs;
workerError(MSQErrorReport.fromFault(
id(),
selfDruidNode.getHost(),

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -67,11 +68,13 @@ import org.apache.druid.msq.indexing.InputChannelsImpl;
import org.apache.druid.msq.indexing.KeyStatisticsCollectionProcessor;
import org.apache.druid.msq.indexing.MSQWorkerTask;
import org.apache.druid.msq.indexing.error.CanceledFault;
import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.MSQWarningReportLimiterPublisher;
import org.apache.druid.msq.indexing.error.MSQWarningReportPublisher;
import org.apache.druid.msq.indexing.error.MSQWarningReportSimplePublisher;
import org.apache.druid.msq.indexing.error.MSQWarnings;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSliceReader;
import org.apache.druid.msq.input.InputSlices;
@ -127,6 +130,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
@ -263,13 +267,38 @@ public class WorkerImpl implements Worker
}
});
long maxAllowedParseExceptions = Long.parseLong(task.getContext().getOrDefault(
MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED,
Long.MAX_VALUE
).toString());
long maxVerboseParseExceptions;
if (maxAllowedParseExceptions == -1L) {
maxVerboseParseExceptions = Limits.MAX_VERBOSE_PARSE_EXCEPTIONS;
} else {
maxVerboseParseExceptions = Math.min(maxAllowedParseExceptions, Limits.MAX_VERBOSE_PARSE_EXCEPTIONS);
}
Set<String> criticalWarningCodes;
if (maxAllowedParseExceptions == 0) {
criticalWarningCodes = ImmutableSet.of(CannotParseExternalDataFault.CODE);
} else {
criticalWarningCodes = ImmutableSet.of();
}
final MSQWarningReportPublisher msqWarningReportPublisher = new MSQWarningReportLimiterPublisher(
new MSQWarningReportSimplePublisher(
id(),
controllerClient,
id(),
MSQTasks.getHostFromSelfNode(selfDruidNode)
)
),
Limits.MAX_VERBOSE_WARNINGS,
ImmutableMap.of(CannotParseExternalDataFault.CODE, maxVerboseParseExceptions),
criticalWarningCodes,
controllerClient,
id(),
MSQTasks.getHostFromSelfNode(selfDruidNode)
);
closer.register(msqWarningReportPublisher);

View File

@ -37,11 +37,13 @@ import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.ControllerImpl;
import org.apache.druid.msq.exec.WorkerManagerClient;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.MSQWarnings;
import org.apache.druid.msq.indexing.error.TaskStartTimeoutFault;
import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.indexing.error.WorkerFailedFault;
import org.apache.druid.msq.util.MultiStageQueryContext;
import javax.annotation.Nullable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
@ -83,6 +85,9 @@ public class MSQWorkerTaskLauncher
private final long maxTaskStartDelayMillis;
private final boolean durableStageStorageEnabled;
@Nullable
private final Long maxParseExceptions;
// Mutable state meant to be accessible by threads outside the main loop.
private final SettableFuture<?> stopFuture = SettableFuture.create();
private final AtomicReference<State> state = new AtomicReference<>(State.NEW);
@ -111,6 +116,7 @@ public class MSQWorkerTaskLauncher
final String dataSource,
final ControllerContext context,
final boolean durableStageStorageEnabled,
@Nullable final Long maxParseExceptions,
final long maxTaskStartDelayMillis
)
{
@ -121,6 +127,7 @@ public class MSQWorkerTaskLauncher
"multi-stage-query-task-launcher[" + StringUtils.encodeForFormat(controllerTaskId) + "]-%s"
);
this.durableStageStorageEnabled = durableStageStorageEnabled;
this.maxParseExceptions = maxParseExceptions;
this.maxTaskStartDelayMillis = maxTaskStartDelayMillis;
}
@ -308,6 +315,10 @@ public class MSQWorkerTaskLauncher
taskContext.put(MultiStageQueryContext.CTX_ENABLE_DURABLE_SHUFFLE_STORAGE, true);
}
if (maxParseExceptions != null) {
taskContext.put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, maxParseExceptions);
}
final int firstTask;
final int taskCount;

View File

@ -19,11 +19,13 @@
package org.apache.druid.msq.indexing.error;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.msq.exec.ControllerClient;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
@ -35,35 +37,53 @@ import java.util.concurrent.ConcurrentHashMap;
public class MSQWarningReportLimiterPublisher implements MSQWarningReportPublisher
{
final MSQWarningReportPublisher delegate;
final long totalLimit;
final Map<String, Long> errorCodeToLimit;
final ConcurrentHashMap<String, Long> errorCodeToCurrentCount = new ConcurrentHashMap<>();
private final MSQWarningReportPublisher delegate;
private final long totalLimit;
private final Map<String, Long> errorCodeToLimit;
private final Set<String> criticalWarningCodes;
private final ConcurrentHashMap<String, Long> errorCodeToCurrentCount = new ConcurrentHashMap<>();
private final ControllerClient controllerClient;
private final String workerId;
@Nullable
private final String host;
long totalCount = 0L;
final Object lock = new Object();
public MSQWarningReportLimiterPublisher(MSQWarningReportPublisher delegate)
{
this(
delegate,
Limits.MAX_VERBOSE_WARNINGS,
ImmutableMap.of(
CannotParseExternalDataFault.CODE, Limits.MAX_VERBOSE_PARSE_EXCEPTIONS
)
);
}
/**
* Creates a publisher which publishes the warnings to the controller if they have not yet exceeded the allowed limit.
* Moreover, if a warning is disallowed, i.e. it's limit is set to 0, then the publisher directly reports the warning
* as an error
* {@code errorCodeToLimit} refers to the maximum number of verbose warnings that should be published. The actual
* limit for the warnings before which the controller should fail can be much higher and hence a separate {@code criticalWarningCodes}
*
* @param delegate The delegate publisher which publishes the allowed warnings
* @param totalLimit Total limit of warnings that a worker can publish
* @param errorCodeToLimit Map of error code to the number of allowed warnings that the publisher can publish
* @param criticalWarningCodes Error codes which if encountered should be thrown as error
* @param controllerClient Controller client (for directly sending the warning as an error)
* @param workerId workerId, used to construct the error report
* @param host worker' host, used to construct the error report
*/
public MSQWarningReportLimiterPublisher(
MSQWarningReportPublisher delegate,
long totalLimit,
Map<String, Long> errorCodeToLimit
Map<String, Long> errorCodeToLimit,
Set<String> criticalWarningCodes,
ControllerClient controllerClient,
String workerId,
@Nullable String host
)
{
this.delegate = delegate;
this.errorCodeToLimit = errorCodeToLimit;
this.criticalWarningCodes = criticalWarningCodes;
this.totalLimit = totalLimit;
this.controllerClient = controllerClient;
this.workerId = workerId;
this.host = host;
}
@Override
@ -74,6 +94,16 @@ public class MSQWarningReportLimiterPublisher implements MSQWarningReportPublish
totalCount = totalCount + 1;
errorCodeToCurrentCount.compute(errorCode, (ignored, count) -> count == null ? 1L : count + 1);
// Send the warning as an error if it is disallowed altogether
if (criticalWarningCodes.contains(errorCode)) {
try {
controllerClient.postWorkerError(workerId, MSQErrorReport.fromException(workerId, host, stageNumber, e));
}
catch (IOException postException) {
throw new RE(postException, "Failed to post the worker error [%s] to the controller", errorCode);
}
}
if (totalLimit != -1 && totalCount > totalLimit) {
return;
}

View File

@ -151,6 +151,7 @@ public class MSQTasksTest
"foo",
controllerContext,
false,
-1L,
TimeUnit.SECONDS.toMillis(5)
);

View File

@ -139,7 +139,7 @@ public class MSQWarningsTest extends MSQTestBase
.columnMappings(defaultColumnMappings)
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setExpectedMSQFault(new TooManyWarningsFault(0, CannotParseExternalDataFault.CODE))
.setExpectedMSQFaultClass(CannotParseExternalDataFault.class)
.verifyResults();
}
@ -318,7 +318,7 @@ public class MSQWarningsTest extends MSQTestBase
.columnMappings(defaultColumnMappings)
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setExpectedMSQFault(new TooManyWarningsFault(0, CannotParseExternalDataFault.CODE))
.setExpectedMSQFaultClass(CannotParseExternalDataFault.class)
.verifyResults();
}
@ -340,7 +340,7 @@ public class MSQWarningsTest extends MSQTestBase
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.addExpectedAggregatorFactory(new LongSumAggregatorFactory("cnt", "cnt"))
.setExpectedMSQFault(new TooManyWarningsFault(0, CannotParseExternalDataFault.CODE))
.setExpectedMSQFaultClass(CannotParseExternalDataFault.class)
.verifyResults();
}
@ -359,7 +359,7 @@ public class MSQWarningsTest extends MSQTestBase
+ ") group by 1 PARTITIONED by day ")
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setExpectedMSQFault(new TooManyWarningsFault(0, CannotParseExternalDataFault.CODE))
.setExpectedMSQFaultClass(CannotParseExternalDataFault.class)
.verifyResults();
// Temporary directory should not contain any controller-related folders

View File

@ -701,6 +701,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
protected Matcher<Throwable> expectedValidationErrorMatcher = null;
protected Matcher<Throwable> expectedExecutionErrorMatcher = null;
protected MSQFault expectedMSQFault = null;
protected Class<? extends MSQFault> expectedMSQFaultClass = null;
private boolean hasRun = false;
@ -763,6 +764,12 @@ public class MSQTestBase extends BaseCalciteQueryTest
return (Builder) this;
}
public Builder setExpectedMSQFaultClass(Class<? extends MSQFault> expectedMSQFaultClass)
{
this.expectedMSQFaultClass = expectedMSQFaultClass;
return (Builder) this;
}
public void verifyPlanningErrors()
{
Preconditions.checkArgument(expectedValidationErrorMatcher != null, "Validation error matcher cannot be null");
@ -850,19 +857,28 @@ public class MSQTestBase extends BaseCalciteQueryTest
Preconditions.checkArgument(expectedDataSource != null, "dataSource cannot be null");
Preconditions.checkArgument(expectedRowSignature != null, "expectedRowSignature cannot be null");
Preconditions.checkArgument(
expectedResultRows != null || expectedMSQFault != null,
"atleast one of expectedResultRows or expectedMSQFault should be set to non null"
expectedResultRows != null || expectedMSQFault != null || expectedMSQFaultClass != null,
"atleast one of expectedResultRows, expectedMSQFault or expectedMSQFaultClass should be set to non null"
);
Preconditions.checkArgument(expectedShardSpec != null, "shardSpecClass cannot be null");
readyToRun();
try {
String controllerId = runMultiStageQuery(sql, queryContext);
if (expectedMSQFault != null) {
if (expectedMSQFault != null || expectedMSQFaultClass != null) {
MSQErrorReport msqErrorReport = getErrorReportOrThrow(controllerId);
Assert.assertEquals(
expectedMSQFault.getCodeWithMessage(),
msqErrorReport.getFault().getCodeWithMessage()
);
if (expectedMSQFault != null) {
Assert.assertEquals(
expectedMSQFault.getCodeWithMessage(),
msqErrorReport.getFault().getCodeWithMessage()
);
}
if (expectedMSQFaultClass != null) {
Assert.assertEquals(
expectedMSQFaultClass,
msqErrorReport.getFault().getClass()
);
}
return;
}
getPayloadOrThrow(controllerId);
@ -1016,12 +1032,20 @@ public class MSQTestBase extends BaseCalciteQueryTest
try {
String controllerId = runMultiStageQuery(sql, queryContext);
if (expectedMSQFault != null) {
if (expectedMSQFault != null || expectedMSQFaultClass != null) {
MSQErrorReport msqErrorReport = getErrorReportOrThrow(controllerId);
Assert.assertEquals(
expectedMSQFault.getCodeWithMessage(),
msqErrorReport.getFault().getCodeWithMessage()
);
if (expectedMSQFault != null) {
Assert.assertEquals(
expectedMSQFault.getCodeWithMessage(),
msqErrorReport.getFault().getCodeWithMessage()
);
}
if (expectedMSQFaultClass != null) {
Assert.assertEquals(
expectedMSQFaultClass,
msqErrorReport.getFault().getClass()
);
}
return null;
}