MSQ: Remove unnecessary snapshot deserialization code. (#16116)

Since #13205, a special deserializer module has no longer been necessary
to read key collector snapshots. This patch removes the unnecessary code.
This commit is contained in:
Gian Merlino 2024-03-18 10:12:27 -07:00 committed by GitHub
parent 7d307df6e9
commit 36bc94c798
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 9 additions and 173 deletions

View File

@ -783,18 +783,10 @@ public class ControllerImpl implements Controller
addToKernelManipulationQueue( addToKernelManipulationQueue(
queryKernel -> { queryKernel -> {
final StageId stageId = queryKernel.getStageId(stageNumber); final StageId stageId = queryKernel.getStageId(stageNumber);
// We need a specially-decorated ObjectMapper to deserialize key statistics.
final StageDefinition stageDef = queryKernel.getStageDefinition(stageId);
final ObjectMapper mapper = MSQTasks.decorateObjectMapperForKeyCollectorSnapshot(
context.jsonMapper(),
stageDef.getShuffleSpec().clusterBy(),
stageDef.getShuffleSpec().doesAggregate()
);
final PartialKeyStatisticsInformation partialKeyStatisticsInformation; final PartialKeyStatisticsInformation partialKeyStatisticsInformation;
try { try {
partialKeyStatisticsInformation = mapper.convertValue( partialKeyStatisticsInformation = context.jsonMapper().convertValue(
partialKeyStatisticsInformationObject, partialKeyStatisticsInformationObject,
PartialKeyStatisticsInformation.class PartialKeyStatisticsInformation.class
); );

View File

@ -19,10 +19,8 @@
package org.apache.druid.msq.exec; package org.apache.druid.msq.exec;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Injector; import com.google.inject.Injector;
import com.google.inject.Key; import com.google.inject.Key;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.guice.MultiStageQuery; import org.apache.druid.msq.guice.MultiStageQuery;
@ -39,10 +37,6 @@ import org.apache.druid.msq.indexing.error.TooManyAttemptsForWorker;
import org.apache.druid.msq.indexing.error.UnknownFault; import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.indexing.error.WorkerFailedFault; import org.apache.druid.msq.indexing.error.WorkerFailedFault;
import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault; import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault;
import org.apache.druid.msq.statistics.KeyCollectorFactory;
import org.apache.druid.msq.statistics.KeyCollectorSnapshot;
import org.apache.druid.msq.statistics.KeyCollectorSnapshotDeserializerModule;
import org.apache.druid.msq.statistics.KeyCollectors;
import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.server.DruidNode; import org.apache.druid.server.DruidNode;
import org.apache.druid.storage.NilStorageConnector; import org.apache.druid.storage.NilStorageConnector;
@ -125,24 +119,6 @@ public class MSQTasks
} }
} }
/**
* Returns a decorated copy of an ObjectMapper that knows how to deserialize the appropriate kind of
* {@link KeyCollectorSnapshot}.
*/
static ObjectMapper decorateObjectMapperForKeyCollectorSnapshot(
final ObjectMapper mapper,
final ClusterBy clusterBy,
final boolean aggregate
)
{
final KeyCollectorFactory<?, ?> keyCollectorFactory =
KeyCollectors.makeStandardFactory(clusterBy, aggregate);
final ObjectMapper mapperCopy = mapper.copy();
mapperCopy.registerModule(new KeyCollectorSnapshotDeserializerModule(keyCollectorFactory));
return mapperCopy;
}
/** /**
* Returns the host:port from a {@link DruidNode}. Convenience method to make it easier to construct * Returns the host:port from a {@link DruidNode}. Convenience method to make it easier to construct
* {@link MSQErrorReport} instances. * {@link MSQErrorReport} instances.

View File

@ -19,13 +19,8 @@
package org.apache.druid.msq.statistics; package org.apache.druid.msq.statistics;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import org.apache.druid.frame.key.RowKey; import org.apache.druid.frame.key.RowKey;
import java.io.IOException;
import java.util.Comparator; import java.util.Comparator;
import java.util.Optional; import java.util.Optional;
@ -53,46 +48,6 @@ public class DelegateOrMinKeyCollectorFactory<TDelegate extends KeyCollector<TDe
return new DelegateOrMinKeyCollector<>(comparator, delegateFactory.newKeyCollector(), null); return new DelegateOrMinKeyCollector<>(comparator, delegateFactory.newKeyCollector(), null);
} }
@Override
public JsonDeserializer<DelegateOrMinKeyCollectorSnapshot<TSnapshot>> snapshotDeserializer()
{
final JsonDeserializer<TSnapshot> delegateDeserializer = delegateFactory.snapshotDeserializer();
return new JsonDeserializer<DelegateOrMinKeyCollectorSnapshot<TSnapshot>>()
{
@Override
public DelegateOrMinKeyCollectorSnapshot<TSnapshot> deserialize(JsonParser jp, DeserializationContext ctxt)
throws IOException
{
TSnapshot delegateSnapshot = null;
RowKey minKey = null;
if (!jp.isExpectedStartObjectToken()) {
ctxt.reportWrongTokenException(this, JsonToken.START_OBJECT, null);
}
JsonToken token;
while ((token = jp.nextToken()) != JsonToken.END_OBJECT) {
if (token != JsonToken.FIELD_NAME) {
ctxt.reportWrongTokenException(this, JsonToken.FIELD_NAME, null);
}
final String fieldName = jp.getText();
jp.nextToken();
if (DelegateOrMinKeyCollectorSnapshot.FIELD_SNAPSHOT.equals(fieldName)) {
delegateSnapshot = delegateDeserializer.deserialize(jp, ctxt);
} else if (DelegateOrMinKeyCollectorSnapshot.FIELD_MIN_KEY.equals(fieldName)) {
minKey = jp.readValueAs(RowKey.class);
}
}
return new DelegateOrMinKeyCollectorSnapshot<>(delegateSnapshot, minKey);
}
};
}
@Override @Override
public DelegateOrMinKeyCollectorSnapshot<TSnapshot> toSnapshot(final DelegateOrMinKeyCollector<TDelegate> collector) public DelegateOrMinKeyCollectorSnapshot<TSnapshot> toSnapshot(final DelegateOrMinKeyCollector<TDelegate> collector)
{ {

View File

@ -19,15 +19,11 @@
package org.apache.druid.msq.statistics; package org.apache.druid.msq.statistics;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import it.unimi.dsi.fastutil.objects.Object2LongRBTreeMap; import it.unimi.dsi.fastutil.objects.Object2LongRBTreeMap;
import org.apache.druid.collections.SerializablePair; import org.apache.druid.collections.SerializablePair;
import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.RowKey; import org.apache.druid.frame.key.RowKey;
import java.io.IOException;
import java.util.Comparator; import java.util.Comparator;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -51,19 +47,6 @@ public class DistinctKeyCollectorFactory implements KeyCollectorFactory<Distinct
return new DistinctKeyCollector(comparator); return new DistinctKeyCollector(comparator);
} }
@Override
public JsonDeserializer<DistinctKeySnapshot> snapshotDeserializer()
{
return new JsonDeserializer<DistinctKeySnapshot>()
{
@Override
public DistinctKeySnapshot deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException
{
return jp.readValueAs(DistinctKeySnapshot.class);
}
};
}
@Override @Override
public DistinctKeySnapshot toSnapshot(final DistinctKeyCollector collector) public DistinctKeySnapshot toSnapshot(final DistinctKeyCollector collector)
{ {

View File

@ -19,8 +19,6 @@
package org.apache.druid.msq.statistics; package org.apache.druid.msq.statistics;
import com.fasterxml.jackson.databind.JsonDeserializer;
public interface KeyCollectorFactory<TCollector extends KeyCollector<TCollector>, TSnapshot extends KeyCollectorSnapshot> public interface KeyCollectorFactory<TCollector extends KeyCollector<TCollector>, TSnapshot extends KeyCollectorSnapshot>
{ {
/** /**
@ -28,12 +26,6 @@ public interface KeyCollectorFactory<TCollector extends KeyCollector<TCollector>
*/ */
TCollector newKeyCollector(); TCollector newKeyCollector();
/**
* Fetches the deserializer that can be used to deserialize the snapshots created by the KeyCollectors corresponding
* to this factory
*/
JsonDeserializer<TSnapshot> snapshotDeserializer();
/** /**
* Serializes a {@link KeyCollector} to a {@link KeyCollectorSnapshot} * Serializes a {@link KeyCollector} to a {@link KeyCollectorSnapshot}
*/ */

View File

@ -1,34 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.statistics;
import com.fasterxml.jackson.databind.module.SimpleModule;
/**
* A module for deserialization of {@link KeyCollectorSnapshot}.
*/
public class KeyCollectorSnapshotDeserializerModule extends SimpleModule
{
public KeyCollectorSnapshotDeserializerModule(final KeyCollectorFactory<?, ?> keyCollectorFactory)
{
addDeserializer(KeyCollectorSnapshot.class, keyCollectorFactory.snapshotDeserializer());
}
}

View File

@ -19,9 +19,6 @@
package org.apache.druid.msq.statistics; package org.apache.druid.msq.statistics;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.datasketches.common.ArrayOfItemsSerDe; import org.apache.datasketches.common.ArrayOfItemsSerDe;
import org.apache.datasketches.common.ByteArrayUtil; import org.apache.datasketches.common.ByteArrayUtil;
@ -31,7 +28,6 @@ import org.apache.datasketches.quantiles.ItemsSketch;
import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import java.io.IOException;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
@ -61,20 +57,6 @@ public class QuantilesSketchKeyCollectorFactory
return new QuantilesSketchKeyCollector(comparator, ItemsSketch.getInstance(byte[].class, SKETCH_INITIAL_K, comparator), 0); return new QuantilesSketchKeyCollector(comparator, ItemsSketch.getInstance(byte[].class, SKETCH_INITIAL_K, comparator), 0);
} }
@Override
public JsonDeserializer<QuantilesSketchKeyCollectorSnapshot> snapshotDeserializer()
{
return new JsonDeserializer<QuantilesSketchKeyCollectorSnapshot>()
{
@Override
public QuantilesSketchKeyCollectorSnapshot deserialize(JsonParser jp, DeserializationContext ctxt)
throws IOException
{
return jp.readValueAs(QuantilesSketchKeyCollectorSnapshot.class);
}
};
}
@Override @Override
public QuantilesSketchKeyCollectorSnapshot toSnapshot(QuantilesSketchKeyCollector collector) public QuantilesSketchKeyCollectorSnapshot toSnapshot(QuantilesSketchKeyCollector collector)
{ {

View File

@ -138,7 +138,7 @@ public class ClusterByStatisticsCollectorImplTest extends InitializedNullHandlin
); );
} }
verifySnapshotSerialization(testName, collector, aggregate); verifySnapshotSerialization(testName, collector);
} }
); );
} }
@ -187,7 +187,7 @@ public class ClusterByStatisticsCollectorImplTest extends InitializedNullHandlin
); );
} }
verifySnapshotSerialization(testName, collector, aggregate); verifySnapshotSerialization(testName, collector);
} }
); );
} }
@ -245,7 +245,7 @@ public class ClusterByStatisticsCollectorImplTest extends InitializedNullHandlin
); );
} }
verifySnapshotSerialization(testName, collector, aggregate); verifySnapshotSerialization(testName, collector);
} }
); );
} }
@ -309,7 +309,7 @@ public class ClusterByStatisticsCollectorImplTest extends InitializedNullHandlin
} }
} }
verifySnapshotSerialization(testName, collector, aggregate); verifySnapshotSerialization(testName, collector);
} }
); );
} }
@ -380,7 +380,7 @@ public class ClusterByStatisticsCollectorImplTest extends InitializedNullHandlin
} }
} }
verifySnapshotSerialization(testName, collector, aggregate); verifySnapshotSerialization(testName, collector);
} }
); );
} }
@ -446,7 +446,7 @@ public class ClusterByStatisticsCollectorImplTest extends InitializedNullHandlin
} }
} }
verifySnapshotSerialization(testName, collector, aggregate); verifySnapshotSerialization(testName, collector);
} }
); );
} }
@ -945,21 +945,11 @@ public class ClusterByStatisticsCollectorImplTest extends InitializedNullHandlin
private static void verifySnapshotSerialization( private static void verifySnapshotSerialization(
final String testName, final String testName,
final ClusterByStatisticsCollector collector, final ClusterByStatisticsCollector collector
final boolean aggregate
) )
{ {
try { try {
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
jsonMapper.registerModule(
new KeyCollectorSnapshotDeserializerModule(
KeyCollectors.makeStandardFactory(
collector.getClusterBy(),
aggregate
)
)
);
final ClusterByStatisticsSnapshot snapshot = collector.snapshot(); final ClusterByStatisticsSnapshot snapshot = collector.snapshot();
final ClusterByStatisticsSnapshot snapshot2 = jsonMapper.readValue( final ClusterByStatisticsSnapshot snapshot2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(snapshot), jsonMapper.writeValueAsString(snapshot),