[Backport] Move TerminalStageSpecFactory packages (#17049) and log.warn on ArrayIngestMode.MVD (#17164) (#17242)

* Move TerminalStageSpecFactory packages. (#17049)
* log.warn anytime a column is relying on ArrayIngestMode.MVD (#17164)
---------
Co-authored-by: Gian Merlino <gianmerlino@gmail.com>
Co-authored-by: Clint Wylie <cwylie@apache.org>
This commit is contained in:
Kashif Faraz 2024-10-04 14:21:28 +05:30 committed by GitHub
parent b275ffed8d
commit 50eb7321d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 74 additions and 23 deletions

View File

@ -27,6 +27,8 @@ import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.LoadScope;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.msq.indexing.destination.MSQTerminalStageSpecFactory;
import org.apache.druid.msq.indexing.destination.SegmentGenerationTerminalStageSpecFactory;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlToolbox;

View File

@ -17,9 +17,8 @@
* under the License.
*/
package org.apache.druid.msq.guice;
package org.apache.druid.msq.indexing.destination;
import org.apache.druid.msq.indexing.destination.TerminalStageSpec;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuery;

View File

@ -17,10 +17,8 @@
* under the License.
*/
package org.apache.druid.msq.guice;
package org.apache.druid.msq.indexing.destination;
import org.apache.druid.msq.indexing.destination.SegmentGenerationStageSpec;
import org.apache.druid.msq.indexing.destination.TerminalStageSpec;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuery;

View File

@ -33,7 +33,6 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.msq.exec.MSQTasks;
import org.apache.druid.msq.guice.MSQTerminalStageSpecFactory;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
@ -42,6 +41,7 @@ import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
import org.apache.druid.msq.indexing.destination.ExportMSQDestination;
import org.apache.druid.msq.indexing.destination.MSQDestination;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.indexing.destination.MSQTerminalStageSpecFactory;
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
import org.apache.druid.msq.util.MSQTaskQueryMakerUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;

View File

@ -42,7 +42,7 @@ import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.msq.guice.MSQTerminalStageSpecFactory;
import org.apache.druid.msq.indexing.destination.MSQTerminalStageSpecFactory;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.util.ArrayIngestMode;
import org.apache.druid.msq.util.DimensionSchemaUtils;
@ -387,7 +387,7 @@ public class MSQTaskSqlEngine implements SqlEngine
final ColumnType oldDruidType = Calcites.getColumnTypeForRelDataType(oldSqlTypeField.getType());
final RelDataType newSqlType = rootRel.getRowType().getFieldList().get(columnIndex).getType();
final ColumnType newDruidType =
DimensionSchemaUtils.getDimensionType(Calcites.getColumnTypeForRelDataType(newSqlType), arrayIngestMode);
DimensionSchemaUtils.getDimensionType(columnName, Calcites.getColumnTypeForRelDataType(newSqlType), arrayIngestMode);
if (newDruidType.isArray() && oldDruidType.is(ValueType.STRING)
|| (newDruidType.is(ValueType.STRING) && oldDruidType.isArray())) {

View File

@ -27,6 +27,8 @@ import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ColumnCapabilities;
@ -42,6 +44,7 @@ import javax.annotation.Nullable;
*/
public class DimensionSchemaUtils
{
private static final EmittingLogger LOG = new EmittingLogger(DimensionSchemaUtils.class);
/**
* Creates a dimension schema for creating {@link org.apache.druid.data.input.InputSourceReader}.
@ -89,7 +92,7 @@ public class DimensionSchemaUtils
return new AutoTypeColumnSchema(column, null);
} else {
// dimensionType may not be identical to queryType, depending on arrayIngestMode.
final ColumnType dimensionType = getDimensionType(queryType, arrayIngestMode);
final ColumnType dimensionType = getDimensionType(column, queryType, arrayIngestMode);
if (dimensionType.getType() == ValueType.STRING) {
return new StringDimensionSchema(
@ -121,6 +124,7 @@ public class DimensionSchemaUtils
* @throws org.apache.druid.error.DruidException if there is some problem
*/
public static ColumnType getDimensionType(
final String columnName,
@Nullable final ColumnType queryType,
final ArrayIngestMode arrayIngestMode
)
@ -132,6 +136,13 @@ public class DimensionSchemaUtils
ValueType elementType = queryType.getElementType().getType();
if (elementType == ValueType.STRING) {
if (arrayIngestMode == ArrayIngestMode.MVD) {
final String msgFormat = "Inserting a multi-value string column[%s] relying on deprecated"
+ " ArrayIngestMode.MVD. This query should be rewritten to use the ARRAY_TO_MV"
+ " operator to insert ARRAY types as multi-value strings.";
LOG.makeWarningAlert(msgFormat, columnName)
.severity(AlertEvent.Severity.DEPRECATED)
.addData("feature", "ArrayIngestMode.MVD")
.emit();
return ColumnType.STRING;
} else {
return queryType;

View File

@ -26,7 +26,7 @@ import com.google.inject.Provides;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.initialization.ServerInjectorBuilderTest.TestDruidModule;
import org.apache.druid.msq.guice.MultiStageQuery;
import org.apache.druid.msq.guice.SegmentGenerationTerminalStageSpecFactory;
import org.apache.druid.msq.indexing.destination.SegmentGenerationTerminalStageSpecFactory;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestOverlordServiceClient;

View File

@ -92,12 +92,12 @@ import org.apache.druid.msq.guice.MSQExternalDataSourceModule;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.msq.guice.MSQSqlModule;
import org.apache.druid.msq.guice.MultiStageQuery;
import org.apache.druid.msq.guice.SegmentGenerationTerminalStageSpecFactory;
import org.apache.druid.msq.indexing.InputChannelFactory;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.destination.SegmentGenerationTerminalStageSpecFactory;
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFaultTest;
import org.apache.druid.msq.indexing.error.MSQErrorReport;

View File

@ -1073,7 +1073,7 @@ public class RemoteTaskRunnerTest
EasyMock.expect(worker.getHost()).andReturn("host").atLeastOnce();
EasyMock.replay(worker);
ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
Capture<EmittingLogger.EmittingAlertBuilder> capturedArgument = Capture.newInstance();
Capture<EmittingLogger.LoggingAlertBuilder> capturedArgument = Capture.newInstance();
emitter.emit(EasyMock.capture(capturedArgument));
EasyMock.expectLastCall().atLeastOnce();
EmittingLogger.registerEmitter(emitter);

View File

@ -29,8 +29,11 @@ import org.apache.druid.java.util.emitter.service.AlertBuilder;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import javax.annotation.Nullable;
import java.util.Map;
/**
* {@link Logger} which also has an {@link ServiceEmitter}. Primarily useful for constructing and emitting "alerts" in
* the form of {@link AlertBuilder}, which will log when {@link AlertBuilder#emit()} is called.
*/
public class EmittingLogger extends Logger
{
@ -62,12 +65,34 @@ public class EmittingLogger extends Logger
return new EmittingLogger(getSlf4jLogger(), false);
}
public AlertBuilder makeAlert(String message, Object... objects)
/**
* Make an {@link AlertBuilder} which will call {@link #warn(String, Object...)} when {@link AlertBuilder#emit()} is
* called.
*/
public AlertBuilder makeWarningAlert(String message, Object... objects)
{
return makeAlert(null, message, objects);
return makeAlert(null, false, message, objects);
}
/**
* Make an {@link AlertBuilder} which will call {@link #error(String, Object...)} when {@link AlertBuilder#emit()} is
* called.
*/
public AlertBuilder makeAlert(String message, Object... objects)
{
return makeAlert(null, true, message, objects);
}
/**
* Make an {@link AlertBuilder} which will call {@link #error(Throwable, String, Object...)} when
* {@link AlertBuilder#emit()} is called.
*/
public AlertBuilder makeAlert(@Nullable Throwable t, String message, Object... objects)
{
return makeAlert(t, true, message, objects);
}
public AlertBuilder makeAlert(@Nullable Throwable t, boolean isError, String message, Object... objects)
{
if (emitter == null) {
final String errorMessage = StringUtils.format(
@ -87,20 +112,22 @@ public class EmittingLogger extends Logger
throw e;
}
return new EmittingAlertBuilder(t, StringUtils.format(message, objects), emitter)
return new LoggingAlertBuilder(t, StringUtils.format(message, objects), emitter, isError)
.addData("class", className);
}
public class EmittingAlertBuilder extends AlertBuilder
public class LoggingAlertBuilder extends AlertBuilder
{
private final Throwable t;
private final boolean isError;
private volatile boolean emitted = false;
private EmittingAlertBuilder(Throwable t, String description, ServiceEmitter emitter)
private LoggingAlertBuilder(Throwable t, String description, ServiceEmitter emitter, boolean isError)
{
super(description, emitter);
this.t = t;
this.isError = isError;
addThrowable(t);
}
@ -126,15 +153,22 @@ public class EmittingLogger extends Logger
private void logIt(String format)
{
if (t == null) {
if (isError) {
error(format, description, dataMap);
} else {
warn(format, description, dataMap);
}
} else {
// Filter out the stack trace from the message, because it should be in the logline already if it's wanted.
error(
t,
format,
description,
Maps.filterKeys(dataMap, Predicates.not(Predicates.equalTo("exceptionStackTrace")))
final Map<String, Object> filteredDataMap = Maps.filterKeys(
dataMap,
Predicates.not(Predicates.equalTo("exceptionStackTrace"))
);
if (isError) {
error(t, format, description, filteredDataMap);
} else {
warn(t, format, description, filteredDataMap);
}
}
}
}

View File

@ -186,6 +186,13 @@ public class AlertEvent implements Event
{
return "service-failure";
}
},
DEPRECATED {
@Override
public String toString()
{
return "deprecated";
}
};
public static final Severity DEFAULT = COMPONENT_FAILURE;