diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index afa3479bcc8..c71941f5c07 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -783,18 +783,10 @@ public class ControllerImpl implements Controller addToKernelManipulationQueue( queryKernel -> { final StageId stageId = queryKernel.getStageId(stageNumber); - - // We need a specially-decorated ObjectMapper to deserialize key statistics. - final StageDefinition stageDef = queryKernel.getStageDefinition(stageId); - final ObjectMapper mapper = MSQTasks.decorateObjectMapperForKeyCollectorSnapshot( - context.jsonMapper(), - stageDef.getShuffleSpec().clusterBy(), - stageDef.getShuffleSpec().doesAggregate() - ); - final PartialKeyStatisticsInformation partialKeyStatisticsInformation; + try { - partialKeyStatisticsInformation = mapper.convertValue( + partialKeyStatisticsInformation = context.jsonMapper().convertValue( partialKeyStatisticsInformationObject, PartialKeyStatisticsInformation.class ); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java index 24a3fad8dbf..2dff4419bdb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java @@ -19,10 +19,8 @@ package org.apache.druid.msq.exec; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Injector; import com.google.inject.Key; -import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.msq.guice.MultiStageQuery; @@ -39,10 +37,6 @@ import org.apache.druid.msq.indexing.error.TooManyAttemptsForWorker; import org.apache.druid.msq.indexing.error.UnknownFault; import org.apache.druid.msq.indexing.error.WorkerFailedFault; import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault; -import org.apache.druid.msq.statistics.KeyCollectorFactory; -import org.apache.druid.msq.statistics.KeyCollectorSnapshot; -import org.apache.druid.msq.statistics.KeyCollectorSnapshotDeserializerModule; -import org.apache.druid.msq.statistics.KeyCollectors; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.server.DruidNode; import org.apache.druid.storage.NilStorageConnector; @@ -125,24 +119,6 @@ public class MSQTasks } } - /** - * Returns a decorated copy of an ObjectMapper that knows how to deserialize the appropriate kind of - * {@link KeyCollectorSnapshot}. - */ - static ObjectMapper decorateObjectMapperForKeyCollectorSnapshot( - final ObjectMapper mapper, - final ClusterBy clusterBy, - final boolean aggregate - ) - { - final KeyCollectorFactory keyCollectorFactory = - KeyCollectors.makeStandardFactory(clusterBy, aggregate); - - final ObjectMapper mapperCopy = mapper.copy(); - mapperCopy.registerModule(new KeyCollectorSnapshotDeserializerModule(keyCollectorFactory)); - return mapperCopy; - } - /** * Returns the host:port from a {@link DruidNode}. Convenience method to make it easier to construct * {@link MSQErrorReport} instances. diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/DelegateOrMinKeyCollectorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/DelegateOrMinKeyCollectorFactory.java index 043c5056257..835e3927158 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/DelegateOrMinKeyCollectorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/DelegateOrMinKeyCollectorFactory.java @@ -19,13 +19,8 @@ package org.apache.druid.msq.statistics; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; import org.apache.druid.frame.key.RowKey; -import java.io.IOException; import java.util.Comparator; import java.util.Optional; @@ -53,46 +48,6 @@ public class DelegateOrMinKeyCollectorFactory(comparator, delegateFactory.newKeyCollector(), null); } - @Override - public JsonDeserializer> snapshotDeserializer() - { - final JsonDeserializer delegateDeserializer = delegateFactory.snapshotDeserializer(); - - return new JsonDeserializer>() - { - @Override - public DelegateOrMinKeyCollectorSnapshot deserialize(JsonParser jp, DeserializationContext ctxt) - throws IOException - { - TSnapshot delegateSnapshot = null; - RowKey minKey = null; - - if (!jp.isExpectedStartObjectToken()) { - ctxt.reportWrongTokenException(this, JsonToken.START_OBJECT, null); - } - - JsonToken token; - - while ((token = jp.nextToken()) != JsonToken.END_OBJECT) { - if (token != JsonToken.FIELD_NAME) { - ctxt.reportWrongTokenException(this, JsonToken.FIELD_NAME, null); - } - - final String fieldName = jp.getText(); - jp.nextToken(); - - if (DelegateOrMinKeyCollectorSnapshot.FIELD_SNAPSHOT.equals(fieldName)) { - delegateSnapshot = delegateDeserializer.deserialize(jp, ctxt); - } else if (DelegateOrMinKeyCollectorSnapshot.FIELD_MIN_KEY.equals(fieldName)) { - minKey = jp.readValueAs(RowKey.class); - } - } - - return new DelegateOrMinKeyCollectorSnapshot<>(delegateSnapshot, minKey); - } - }; - } - @Override public DelegateOrMinKeyCollectorSnapshot toSnapshot(final DelegateOrMinKeyCollector collector) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/DistinctKeyCollectorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/DistinctKeyCollectorFactory.java index 741b5096c77..432fd76e858 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/DistinctKeyCollectorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/DistinctKeyCollectorFactory.java @@ -19,15 +19,11 @@ package org.apache.druid.msq.statistics; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; import it.unimi.dsi.fastutil.objects.Object2LongRBTreeMap; import org.apache.druid.collections.SerializablePair; import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.frame.key.RowKey; -import java.io.IOException; import java.util.Comparator; import java.util.stream.Collectors; @@ -51,19 +47,6 @@ public class DistinctKeyCollectorFactory implements KeyCollectorFactory snapshotDeserializer() - { - return new JsonDeserializer() - { - @Override - public DistinctKeySnapshot deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException - { - return jp.readValueAs(DistinctKeySnapshot.class); - } - }; - } - @Override public DistinctKeySnapshot toSnapshot(final DistinctKeyCollector collector) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/KeyCollectorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/KeyCollectorFactory.java index f7956b6dcd6..5ee61558370 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/KeyCollectorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/KeyCollectorFactory.java @@ -19,8 +19,6 @@ package org.apache.druid.msq.statistics; -import com.fasterxml.jackson.databind.JsonDeserializer; - public interface KeyCollectorFactory, TSnapshot extends KeyCollectorSnapshot> { /** @@ -28,12 +26,6 @@ public interface KeyCollectorFactory */ TCollector newKeyCollector(); - /** - * Fetches the deserializer that can be used to deserialize the snapshots created by the KeyCollectors corresponding - * to this factory - */ - JsonDeserializer snapshotDeserializer(); - /** * Serializes a {@link KeyCollector} to a {@link KeyCollectorSnapshot} */ diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/KeyCollectorSnapshotDeserializerModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/KeyCollectorSnapshotDeserializerModule.java deleted file mode 100644 index b1aec6296d5..00000000000 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/KeyCollectorSnapshotDeserializerModule.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.msq.statistics; - -import com.fasterxml.jackson.databind.module.SimpleModule; - -/** - * A module for deserialization of {@link KeyCollectorSnapshot}. - */ -public class KeyCollectorSnapshotDeserializerModule extends SimpleModule -{ - public KeyCollectorSnapshotDeserializerModule(final KeyCollectorFactory keyCollectorFactory) - { - addDeserializer(KeyCollectorSnapshot.class, keyCollectorFactory.snapshotDeserializer()); - } -} - diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java index 674dfe15acb..4f6bd6dbcd1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorFactory.java @@ -19,9 +19,6 @@ package org.apache.druid.msq.statistics; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; import com.google.common.annotations.VisibleForTesting; import org.apache.datasketches.common.ArrayOfItemsSerDe; import org.apache.datasketches.common.ByteArrayUtil; @@ -31,7 +28,6 @@ import org.apache.datasketches.quantiles.ItemsSketch; import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.java.util.common.StringUtils; -import java.io.IOException; import java.nio.ByteOrder; import java.util.Arrays; import java.util.Comparator; @@ -61,20 +57,6 @@ public class QuantilesSketchKeyCollectorFactory return new QuantilesSketchKeyCollector(comparator, ItemsSketch.getInstance(byte[].class, SKETCH_INITIAL_K, comparator), 0); } - @Override - public JsonDeserializer snapshotDeserializer() - { - return new JsonDeserializer() - { - @Override - public QuantilesSketchKeyCollectorSnapshot deserialize(JsonParser jp, DeserializationContext ctxt) - throws IOException - { - return jp.readValueAs(QuantilesSketchKeyCollectorSnapshot.class); - } - }; - } - @Override public QuantilesSketchKeyCollectorSnapshot toSnapshot(QuantilesSketchKeyCollector collector) { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImplTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImplTest.java index 4d9421c221f..77502dddd38 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImplTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImplTest.java @@ -138,7 +138,7 @@ public class ClusterByStatisticsCollectorImplTest extends InitializedNullHandlin ); } - verifySnapshotSerialization(testName, collector, aggregate); + verifySnapshotSerialization(testName, collector); } ); } @@ -187,7 +187,7 @@ public class ClusterByStatisticsCollectorImplTest extends InitializedNullHandlin ); } - verifySnapshotSerialization(testName, collector, aggregate); + verifySnapshotSerialization(testName, collector); } ); } @@ -245,7 +245,7 @@ public class ClusterByStatisticsCollectorImplTest extends InitializedNullHandlin ); } - verifySnapshotSerialization(testName, collector, aggregate); + verifySnapshotSerialization(testName, collector); } ); } @@ -309,7 +309,7 @@ public class ClusterByStatisticsCollectorImplTest extends InitializedNullHandlin } } - verifySnapshotSerialization(testName, collector, aggregate); + verifySnapshotSerialization(testName, collector); } ); } @@ -380,7 +380,7 @@ public class ClusterByStatisticsCollectorImplTest extends InitializedNullHandlin } } - verifySnapshotSerialization(testName, collector, aggregate); + verifySnapshotSerialization(testName, collector); } ); } @@ -446,7 +446,7 @@ public class ClusterByStatisticsCollectorImplTest extends InitializedNullHandlin } } - verifySnapshotSerialization(testName, collector, aggregate); + verifySnapshotSerialization(testName, collector); } ); } @@ -945,21 +945,11 @@ public class ClusterByStatisticsCollectorImplTest extends InitializedNullHandlin private static void verifySnapshotSerialization( final String testName, - final ClusterByStatisticsCollector collector, - final boolean aggregate + final ClusterByStatisticsCollector collector ) { try { final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - jsonMapper.registerModule( - new KeyCollectorSnapshotDeserializerModule( - KeyCollectors.makeStandardFactory( - collector.getClusterBy(), - aggregate - ) - ) - ); - final ClusterByStatisticsSnapshot snapshot = collector.snapshot(); final ClusterByStatisticsSnapshot snapshot2 = jsonMapper.readValue( jsonMapper.writeValueAsString(snapshot),