MSQ: Add exception messages to WorkerFailedFault and WorkerRpcFailedFault.

It's useful for the fault message to have some description of the
underlying error: why did the worker fail, or why did the RPC call fail?
This patch updates the fault messages to include the toString of the error.

The full stack trace for these faults is available to end users in
"exceptionStackTrace" of the report. But for cluster operators, if debug
logging is not enabled, then for Dart only the message is logged, not the
stack trace. In this scenario it's useful for the message to be more
detailed.
This commit is contained in:
Gian Merlino 2024-10-09 21:38:05 -07:00
parent 4fdb38118a
commit 7645681b2a
7 changed files with 28 additions and 11 deletions

View File

@ -1276,7 +1276,7 @@ public class ControllerImpl implements Controller
workerNumber
);
addToRetryQueue(queryKernel, workerNumber, new WorkerRpcFailedFault(workerId));
addToRetryQueue(queryKernel, workerNumber, new WorkerRpcFailedFault(workerId, workerResult.error().toString()));
} else {
// Nonretryable failure.
throw new RuntimeException(workerResult.error());

View File

@ -142,7 +142,7 @@ public class ExceptionWrappingWorkerClient implements WorkerClient
clientFuture = clientFn.apply(client);
}
catch (Exception e) {
throw new MSQException(e, new WorkerRpcFailedFault(workerTaskId));
throw new MSQException(e, new WorkerRpcFailedFault(workerTaskId, e.toString()));
}
Futures.addCallback(
@ -158,7 +158,7 @@ public class ExceptionWrappingWorkerClient implements WorkerClient
@Override
public void onFailure(Throwable t)
{
retVal.setException(new MSQException(t, new WorkerRpcFailedFault(workerTaskId)));
retVal.setException(new MSQException(t, new WorkerRpcFailedFault(workerTaskId, t.toString())));
}
},
MoreExecutors.directExecutor()

View File

@ -190,7 +190,7 @@ public class WorkerSketchFetcher implements AutoCloseable
try {
kernelActions.accept((kernel) -> {
try {
retryOperation.accept(kernel, worker, new WorkerRpcFailedFault(taskId));
retryOperation.accept(kernel, worker, new WorkerRpcFailedFault(taskId, t.toString()));
kernelActionFuture.set(false);
}

View File

@ -43,7 +43,7 @@ public class WorkerFailedFault extends BaseMSQFault
@JsonProperty("errorMsg") @Nullable final String errorMsg
)
{
super(CODE, "Worker task failed: [%s]%s", workerTaskId, errorMsg != null ? " (" + errorMsg + ")" : "");
super(CODE, "Worker[%s] failed%s", workerTaskId, errorMsg != null ? ": " + errorMsg : "");
this.workerTaskId = workerTaskId;
this.errorMsg = errorMsg;
}

View File

@ -20,9 +20,11 @@
package org.apache.druid.msq.indexing.error;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import javax.annotation.Nullable;
import java.util.Objects;
@JsonTypeName(WorkerRpcFailedFault.CODE)
@ -31,12 +33,17 @@ public class WorkerRpcFailedFault extends BaseMSQFault
public static final String CODE = "WorkerRpcFailed";
private final String workerTaskId;
private final String errorMsg;
@JsonCreator
public WorkerRpcFailedFault(@JsonProperty("workerTaskId") final String workerTaskId)
public WorkerRpcFailedFault(
@JsonProperty("workerTaskId") final String workerTaskId,
@JsonProperty("errorMsg") @Nullable final String errorMsg
)
{
super(CODE, "RPC call to task failed unrecoverably: [%s]", workerTaskId);
super(CODE, "RPC to worker[%s] failed%s", workerTaskId, errorMsg != null ? ": " + errorMsg : "");
this.workerTaskId = workerTaskId;
this.errorMsg = errorMsg;
}
@JsonProperty
@ -45,6 +52,14 @@ public class WorkerRpcFailedFault extends BaseMSQFault
return workerTaskId;
}
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public String getErrorMsg()
{
return errorMsg;
}
@Override
public boolean equals(Object o)
{
@ -58,12 +73,12 @@ public class WorkerRpcFailedFault extends BaseMSQFault
return false;
}
WorkerRpcFailedFault that = (WorkerRpcFailedFault) o;
return Objects.equals(workerTaskId, that.workerTaskId);
return Objects.equals(workerTaskId, that.workerTaskId) && Objects.equals(errorMsg, that.errorMsg);
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), workerTaskId);
return Objects.hash(super.hashCode(), workerTaskId, errorMsg);
}
}

View File

@ -134,7 +134,7 @@ public class MSQTasksTest
final MSQErrorReport controllerReport = MSQTasks.makeErrorReport(
WORKER_ID,
WORKER_HOST,
MSQErrorReport.fromFault(WORKER_ID, WORKER_HOST, null, new WorkerRpcFailedFault(WORKER_ID)),
MSQErrorReport.fromFault(WORKER_ID, WORKER_HOST, null, new WorkerRpcFailedFault(WORKER_ID, null)),
null
);

View File

@ -94,8 +94,10 @@ public class MSQFaultSerdeTest
assertFaultSerde(new TooManyAttemptsForJob(2, 2, "taskId", "rootError"));
assertFaultSerde(UnknownFault.forMessage(null));
assertFaultSerde(UnknownFault.forMessage("the message"));
assertFaultSerde(new WorkerFailedFault("the worker task", null));
assertFaultSerde(new WorkerFailedFault("the worker task", "the error msg"));
assertFaultSerde(new WorkerRpcFailedFault("the worker task"));
assertFaultSerde(new WorkerRpcFailedFault("the worker task", null));
assertFaultSerde(new WorkerRpcFailedFault("the worker task", "the error msg"));
assertFaultSerde(new NotEnoughTemporaryStorageFault(250, 2));
}