mirror of https://github.com/apache/druid.git
Add serde for ColumnBasedRowsAndColumns to fix window queries without group by (#16658)
Register a Ser-De for RowsAndColumns so that the window operator query running on leaf operators would be transferred properly on the wire. Would fix the empty response given by window queries without group by on the native engine.
This commit is contained in:
parent
bb487a4193
commit
bb1c3c1749
|
@ -37,7 +37,6 @@ import org.apache.druid.query.FrameBasedInlineDataSourceSerializer;
|
|||
import org.apache.druid.query.context.ResponseContext;
|
||||
import org.apache.druid.query.context.ResponseContextDeserializer;
|
||||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -189,20 +188,7 @@ public class DruidDefaultSerializersModule extends SimpleModule
|
|||
);
|
||||
addDeserializer(ResponseContext.class, new ResponseContextDeserializer());
|
||||
|
||||
addSerializer(RowsAndColumns.class, new JsonSerializer<RowsAndColumns>()
|
||||
{
|
||||
@Override
|
||||
public void serialize(
|
||||
RowsAndColumns value,
|
||||
JsonGenerator gen,
|
||||
SerializerProvider serializers
|
||||
) throws IOException
|
||||
{
|
||||
// It would be really cool if jackson offered an output stream that would allow us to push bytes
|
||||
// through, but it doesn't right now, so we have to build a byte[] instead. Maybe something to contribute
|
||||
// back to Jackson at some point.
|
||||
gen.writeBinary(WireTransferable.fromRAC(value).bytesToTransfer());
|
||||
}
|
||||
});
|
||||
addSerializer(RowsAndColumns.class, new RowsAndColumns.RowsAndColumnsSerializer());
|
||||
addDeserializer(RowsAndColumns.class, new RowsAndColumns.RowsAndColumnsDeserializer());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.druid.query.operator;
|
|||
|
||||
import com.google.common.base.Function;
|
||||
import org.apache.druid.error.DruidException;
|
||||
import org.apache.druid.frame.Frame;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.java.util.common.guava.Sequences;
|
||||
|
@ -31,10 +30,8 @@ import org.apache.druid.query.QueryRunnerFactory;
|
|||
import org.apache.druid.query.QueryToolChest;
|
||||
import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
|
||||
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
|
||||
import org.apache.druid.segment.Segment;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -100,19 +97,8 @@ public class WindowOperatorQueryQueryRunnerFactory implements QueryRunnerFactory
|
|||
@Override
|
||||
public RowsAndColumns apply(@Nullable RowsAndColumns input)
|
||||
{
|
||||
// This is interim code to force a materialization by synthesizing the wire transfer
|
||||
// that will need to naturally happen as we flesh out this code more. For now, we
|
||||
// materialize the bytes on-heap and then read them back in as a frame.
|
||||
if (input instanceof LazilyDecoratedRowsAndColumns) {
|
||||
final WireTransferable wire = WireTransferable.fromRAC(input);
|
||||
final byte[] frameBytes = wire.bytesToTransfer();
|
||||
|
||||
RowSignature.Builder sigBob = RowSignature.builder();
|
||||
for (String column : input.getColumnNames()) {
|
||||
sigBob.add(column, input.findColumn(column).toAccessor().getType());
|
||||
}
|
||||
|
||||
return new ColumnBasedFrameRowsAndColumns(Frame.wrap(frameBytes), sigBob.build());
|
||||
return input.as(FrameRowsAndColumns.class);
|
||||
}
|
||||
return input;
|
||||
}
|
||||
|
|
|
@ -79,14 +79,4 @@ public class AppendableMapOfColumns implements AppendableRowsAndColumns
|
|||
}
|
||||
return retVal;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> T as(Class<T> clazz)
|
||||
{
|
||||
if (AppendableRowsAndColumns.class.equals(clazz)) {
|
||||
return (T) this;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -141,13 +141,6 @@ public class ConcatRowsAndColumns implements RowsAndColumns
|
|||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public <T> T as(Class<T> clazz)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
private class ConcatedidColumn implements Column
|
||||
{
|
||||
|
||||
|
|
|
@ -61,7 +61,7 @@ public class CursorFactoryRowsAndColumns implements CloseableShapeshifter, RowsA
|
|||
if (CursorFactory.class == clazz) {
|
||||
return (T) cursorFactory;
|
||||
}
|
||||
return null;
|
||||
return RowsAndColumns.super.as(clazz);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.druid.query.rowsandcols;
|
|||
|
||||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
||||
|
@ -44,11 +43,4 @@ public class EmptyRowsAndColumns implements RowsAndColumns
|
|||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public <T> T as(Class<T> clazz)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,10 +39,10 @@ import org.apache.druid.query.operator.OffsetLimit;
|
|||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
|
||||
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
|
||||
import org.apache.druid.query.rowsandcols.semantic.DefaultRowsAndColumnsDecorator;
|
||||
import org.apache.druid.query.rowsandcols.semantic.RowsAndColumnsDecorator;
|
||||
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.Cursor;
|
||||
import org.apache.druid.segment.CursorBuildSpec;
|
||||
|
@ -150,16 +150,10 @@ public class LazilyDecoratedRowsAndColumns implements RowsAndColumns
|
|||
|
||||
@SuppressWarnings("unused")
|
||||
@SemanticCreator
|
||||
public WireTransferable toWireTransferable()
|
||||
public FrameRowsAndColumns toFrameRowsAndColumns()
|
||||
{
|
||||
return () -> {
|
||||
final Pair<byte[], RowSignature> materialized = materialize();
|
||||
if (materialized == null) {
|
||||
return new byte[]{};
|
||||
} else {
|
||||
return materialized.lhs;
|
||||
}
|
||||
};
|
||||
maybeMaterialize();
|
||||
return base.as(FrameRowsAndColumns.class);
|
||||
}
|
||||
|
||||
private void maybeMaterialize()
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.druid.java.util.common.ISE;
|
|||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
import org.apache.druid.query.rowsandcols.column.LimitedColumn;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Collection;
|
||||
|
||||
public class LimitedRowsAndColumns implements RowsAndColumns
|
||||
|
@ -66,12 +65,4 @@ public class LimitedRowsAndColumns implements RowsAndColumns
|
|||
|
||||
return new LimitedColumn(column, start, end);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public <T> T as(Class<T> clazz)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -164,7 +164,7 @@ public class MapOfColumnsRowsAndColumns implements RowsAndColumns
|
|||
if (AppendableRowsAndColumns.class.equals(clazz)) {
|
||||
return (T) new AppendableMapOfColumns(this);
|
||||
}
|
||||
return null;
|
||||
return RowsAndColumns.super.as(clazz);
|
||||
}
|
||||
|
||||
public static class Builder
|
||||
|
|
|
@ -164,11 +164,4 @@ public class RearrangedRowsAndColumns implements RowsAndColumns
|
|||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public <T> T as(Class<T> clazz)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,12 +19,30 @@
|
|||
|
||||
package org.apache.druid.query.rowsandcols;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
|
||||
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
|
||||
import org.apache.druid.error.DruidException;
|
||||
import org.apache.druid.frame.Frame;
|
||||
import org.apache.druid.frame.FrameType;
|
||||
import org.apache.druid.frame.channel.ByteTracker;
|
||||
import org.apache.druid.java.util.common.jackson.JacksonUtils;
|
||||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.Channels;
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
|
@ -110,6 +128,72 @@ public interface RowsAndColumns
|
|||
* @return A concrete implementation of the interface, or null if there is no meaningful optimization to be had
|
||||
* through a local implementation of the interface.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
@Nullable
|
||||
<T> T as(Class<T> clazz);
|
||||
default <T> T as(Class<T> clazz)
|
||||
{
|
||||
if (clazz.isInstance(this)) {
|
||||
return (T) this;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializer for {@link RowsAndColumns} by converting the instance to {@link FrameRowsAndColumns}
|
||||
*/
|
||||
class RowsAndColumnsSerializer extends StdSerializer<RowsAndColumns>
|
||||
{
|
||||
public RowsAndColumnsSerializer()
|
||||
{
|
||||
super(RowsAndColumns.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serialize(
|
||||
RowsAndColumns rac,
|
||||
JsonGenerator jsonGenerator,
|
||||
SerializerProvider serializerProvider
|
||||
) throws IOException
|
||||
{
|
||||
FrameRowsAndColumns frameRAC = rac.as(FrameRowsAndColumns.class);
|
||||
if (frameRAC == null) {
|
||||
throw DruidException.defensive("Unable to serialize RAC");
|
||||
}
|
||||
JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializerProvider, frameRAC.getSignature());
|
||||
|
||||
Frame frame = frameRAC.getFrame();
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
frame.writeTo(Channels.newChannel(baos), false, null, ByteTracker.unboundedTracker());
|
||||
|
||||
jsonGenerator.writeBinary(baos.toByteArray());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserializer for {@link RowsAndColumns} returning as an instance of {@link FrameRowsAndColumns}
|
||||
*/
|
||||
class RowsAndColumnsDeserializer extends StdDeserializer<RowsAndColumns>
|
||||
{
|
||||
public RowsAndColumnsDeserializer()
|
||||
{
|
||||
super(RowsAndColumns.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FrameRowsAndColumns deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
|
||||
throws IOException
|
||||
{
|
||||
RowSignature sig = jsonParser.readValueAs(RowSignature.class);
|
||||
jsonParser.nextValue();
|
||||
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
jsonParser.readBinaryValue(baos);
|
||||
Frame frame = Frame.wrap(baos.toByteArray());
|
||||
if (frame.type() == FrameType.COLUMNAR) {
|
||||
return new ColumnBasedFrameRowsAndColumns(frame, sig);
|
||||
} else {
|
||||
return new RowBasedFrameRowsAndColumns(frame, sig);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.rowsandcols.concrete;
|
||||
|
||||
import com.google.common.base.Objects;
|
||||
import org.apache.druid.frame.Frame;
|
||||
import org.apache.druid.frame.read.FrameReader;
|
||||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
import org.apache.druid.segment.CloseableShapeshifter;
|
||||
import org.apache.druid.segment.CursorFactory;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashMap;
|
||||
|
||||
public abstract class AbstractFrameRowsAndColumns implements FrameRowsAndColumns, AutoCloseable, CloseableShapeshifter
|
||||
{
|
||||
final Frame frame;
|
||||
final RowSignature signature;
|
||||
final LinkedHashMap<String, Column> colCache = new LinkedHashMap<>();
|
||||
|
||||
public AbstractFrameRowsAndColumns(Frame frame, RowSignature signature)
|
||||
{
|
||||
this.frame = frame;
|
||||
this.signature = signature;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Frame getFrame()
|
||||
{
|
||||
return frame;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RowSignature getSignature()
|
||||
{
|
||||
return signature;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<String> getColumnNames()
|
||||
{
|
||||
return signature.getColumnNames();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int numRows()
|
||||
{
|
||||
return frame.numRows();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Nullable
|
||||
@Override
|
||||
public <T> T as(Class<T> clazz)
|
||||
{
|
||||
if (CursorFactory.class.equals(clazz)) {
|
||||
return (T) FrameReader.create(signature).makeCursorFactory(frame);
|
||||
}
|
||||
return FrameRowsAndColumns.super.as(clazz);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
// nothing to close
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hashCode(frame, signature);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof AbstractFrameRowsAndColumns)) {
|
||||
return false;
|
||||
}
|
||||
AbstractFrameRowsAndColumns otherFrame = (AbstractFrameRowsAndColumns) o;
|
||||
|
||||
return frame.writableMemory().equals(otherFrame.frame.writableMemory()) && signature.equals(otherFrame.signature);
|
||||
}
|
||||
}
|
|
@ -19,44 +19,21 @@
|
|||
|
||||
package org.apache.druid.query.rowsandcols.concrete;
|
||||
|
||||
import org.apache.druid.error.DruidException;
|
||||
import org.apache.druid.frame.Frame;
|
||||
import org.apache.druid.frame.FrameType;
|
||||
import org.apache.druid.frame.read.FrameReader;
|
||||
import org.apache.druid.frame.read.columnar.FrameColumnReaders;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
import org.apache.druid.segment.CloseableShapeshifter;
|
||||
import org.apache.druid.segment.CursorFactory;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashMap;
|
||||
|
||||
public class ColumnBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter
|
||||
public class ColumnBasedFrameRowsAndColumns extends AbstractFrameRowsAndColumns
|
||||
{
|
||||
private final Frame frame;
|
||||
private final RowSignature signature;
|
||||
private final LinkedHashMap<String, Column> colCache = new LinkedHashMap<>();
|
||||
|
||||
public ColumnBasedFrameRowsAndColumns(Frame frame, RowSignature signature)
|
||||
{
|
||||
this.frame = FrameType.COLUMNAR.ensureType(frame);
|
||||
this.signature = signature;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<String> getColumnNames()
|
||||
{
|
||||
return signature.getColumnNames();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int numRows()
|
||||
{
|
||||
return frame.numRows();
|
||||
super(FrameType.COLUMNAR.ensureType(frame), signature);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
|
@ -71,28 +48,17 @@ public class ColumnBasedFrameRowsAndColumns implements RowsAndColumns, AutoClose
|
|||
} else {
|
||||
final ColumnType columnType = signature
|
||||
.getColumnType(columnIndex)
|
||||
.orElseThrow(() -> new ISE("just got the id, why is columnType not there?"));
|
||||
.orElseThrow(
|
||||
() -> DruidException.defensive(
|
||||
"just got the id [%s][%s], why is columnType not there?",
|
||||
columnIndex,
|
||||
name
|
||||
)
|
||||
);
|
||||
|
||||
colCache.put(name, FrameColumnReaders.create(name, columnIndex, columnType).readRACColumn(frame));
|
||||
}
|
||||
}
|
||||
return colCache.get(name);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Nullable
|
||||
@Override
|
||||
public <T> T as(Class<T> clazz)
|
||||
{
|
||||
if (CursorFactory.class.equals(clazz)) {
|
||||
return (T) FrameReader.create(signature).makeCursorFactory(frame);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
// nothing to close
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,21 +17,15 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.rowsandcols.semantic;
|
||||
package org.apache.druid.query.rowsandcols.concrete;
|
||||
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.frame.Frame;
|
||||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
|
||||
public interface WireTransferable
|
||||
public interface FrameRowsAndColumns extends RowsAndColumns
|
||||
{
|
||||
static WireTransferable fromRAC(RowsAndColumns rac)
|
||||
{
|
||||
WireTransferable retVal = rac.as(WireTransferable.class);
|
||||
if (retVal == null) {
|
||||
throw new ISE("Rac[%s] cannot be transferred over the wire", rac.getClass());
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
Frame getFrame();
|
||||
|
||||
byte[] bytesToTransfer();
|
||||
RowSignature getSignature();
|
||||
}
|
|
@ -24,40 +24,17 @@ import org.apache.druid.frame.Frame;
|
|||
import org.apache.druid.frame.FrameType;
|
||||
import org.apache.druid.frame.field.FieldReader;
|
||||
import org.apache.druid.frame.field.FieldReaders;
|
||||
import org.apache.druid.frame.read.FrameReader;
|
||||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
import org.apache.druid.segment.CloseableShapeshifter;
|
||||
import org.apache.druid.segment.CursorFactory;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashMap;
|
||||
|
||||
public class RowBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter
|
||||
public class RowBasedFrameRowsAndColumns extends AbstractFrameRowsAndColumns
|
||||
{
|
||||
private final Frame frame;
|
||||
private final RowSignature signature;
|
||||
private final LinkedHashMap<String, Column> colCache = new LinkedHashMap<>();
|
||||
|
||||
public RowBasedFrameRowsAndColumns(Frame frame, RowSignature signature)
|
||||
{
|
||||
this.frame = FrameType.ROW_BASED.ensureType(frame);
|
||||
this.signature = signature;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<String> getColumnNames()
|
||||
{
|
||||
return signature.getColumnNames();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int numRows()
|
||||
{
|
||||
return frame.numRows();
|
||||
super(FrameType.ROW_BASED.ensureType(frame), signature);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
|
@ -86,21 +63,4 @@ public class RowBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseabl
|
|||
}
|
||||
return colCache.get(name);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Nullable
|
||||
@Override
|
||||
public <T> T as(Class<T> clazz)
|
||||
{
|
||||
if (CursorFactory.class.equals(clazz)) {
|
||||
return (T) FrameReader.create(signature).makeCursorFactory(frame);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
// nothing to close
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,12 +22,18 @@ package org.apache.druid.jackson;
|
|||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.java.util.common.guava.Sequences;
|
||||
import org.apache.druid.java.util.common.guava.Yielders;
|
||||
import org.apache.druid.query.Query;
|
||||
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
|
||||
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumnsTest;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.junit.Assert;
|
||||
|
@ -35,6 +41,8 @@ import org.junit.Test;
|
|||
|
||||
import java.util.Arrays;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
@ -102,4 +110,22 @@ public class DefaultObjectMapperTest
|
|||
}
|
||||
Assert.fail("We expect InvalidTypeIdException to be thrown");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testColumnBasedFrameRowsAndColumns() throws Exception
|
||||
{
|
||||
DefaultObjectMapper om = new DefaultObjectMapper("test");
|
||||
|
||||
MapOfColumnsRowsAndColumns input = (MapOfColumnsRowsAndColumns.fromMap(
|
||||
ImmutableMap.of(
|
||||
"colA", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
|
||||
"colB", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0})
|
||||
)));
|
||||
|
||||
ColumnBasedFrameRowsAndColumns frc = ColumnBasedFrameRowsAndColumnsTest.buildFrame(input);
|
||||
byte[] bytes = om.writeValueAsBytes(frc);
|
||||
|
||||
ColumnBasedFrameRowsAndColumns frc2 = (ColumnBasedFrameRowsAndColumns) om.readValue(bytes, RowsAndColumns.class);
|
||||
assertEquals(frc, frc2);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.druid.query.rowsandcols;
|
|||
|
||||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Collection;
|
||||
|
||||
public class NoAsRowsAndColumns implements RowsAndColumns
|
||||
|
@ -50,12 +49,4 @@ public class NoAsRowsAndColumns implements RowsAndColumns
|
|||
{
|
||||
return rac.findColumn(name);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public <T> T as(Class<T> clazz)
|
||||
{
|
||||
// Pretend like this doesn't implement any semantic interfaces
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,7 +37,15 @@ public class ColumnBasedFrameRowsAndColumnsTest extends RowsAndColumnsTestBase
|
|||
|
||||
public static ColumnBasedFrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns input)
|
||||
{
|
||||
LazilyDecoratedRowsAndColumns rac = new LazilyDecoratedRowsAndColumns(input, null, null, null, OffsetLimit.limit(Integer.MAX_VALUE), null, null);
|
||||
LazilyDecoratedRowsAndColumns rac = new LazilyDecoratedRowsAndColumns(
|
||||
input,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
OffsetLimit.limit(Integer.MAX_VALUE),
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
rac.numRows(); // materialize
|
||||
return (ColumnBasedFrameRowsAndColumns) rac.getBase();
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.query.rowsandcols.semantic;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
|
@ -32,6 +33,9 @@ import org.apache.druid.query.operator.OffsetLimit;
|
|||
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
|
||||
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
|
||||
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumnsTest;
|
||||
import org.apache.druid.segment.ArrayListSegment;
|
||||
import org.apache.druid.segment.ColumnValueSelector;
|
||||
import org.apache.druid.segment.Cursor;
|
||||
|
@ -214,6 +218,39 @@ public class RowsAndColumnsDecoratorTest extends SemanticTestBase
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDecoratorWithColumnBasedFrameRAC()
|
||||
{
|
||||
RowSignature siggy = RowSignature.builder()
|
||||
.add("colA", ColumnType.LONG)
|
||||
.add("colB", ColumnType.LONG)
|
||||
.build();
|
||||
|
||||
Object[][] vals = new Object[][]{
|
||||
{1L, 4L},
|
||||
{2L, -4L},
|
||||
{3L, 3L},
|
||||
{4L, -3L},
|
||||
{5L, 4L},
|
||||
{6L, 82L},
|
||||
{7L, -90L},
|
||||
{8L, 4L},
|
||||
{9L, 0L},
|
||||
{10L, 0L}
|
||||
};
|
||||
|
||||
MapOfColumnsRowsAndColumns input = MapOfColumnsRowsAndColumns.fromMap(
|
||||
ImmutableMap.of(
|
||||
"colA", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
|
||||
"colB", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0})
|
||||
)
|
||||
);
|
||||
|
||||
ColumnBasedFrameRowsAndColumns frc = ColumnBasedFrameRowsAndColumnsTest.buildFrame(input);
|
||||
|
||||
validateDecorated(frc, siggy, vals, null, null, OffsetLimit.NONE, null);
|
||||
}
|
||||
|
||||
private void validateDecorated(
|
||||
RowsAndColumns base,
|
||||
RowSignature siggy,
|
||||
|
|
|
@ -109,7 +109,6 @@ public class NativeSqlEngine implements SqlEngine
|
|||
case ALLOW_TOP_LEVEL_UNION_ALL:
|
||||
case TIME_BOUNDARY_QUERY:
|
||||
case GROUPBY_IMPLICITLY_SORTS:
|
||||
case WINDOW_LEAF_OPERATOR:
|
||||
return true;
|
||||
case CAN_INSERT:
|
||||
case CAN_REPLACE:
|
||||
|
@ -117,6 +116,7 @@ public class NativeSqlEngine implements SqlEngine
|
|||
case WRITE_EXTERNAL_DATA:
|
||||
case SCAN_ORDER_BY_NON_TIME:
|
||||
case SCAN_NEEDS_SIGNATURE:
|
||||
case WINDOW_LEAF_OPERATOR:
|
||||
return false;
|
||||
default:
|
||||
throw SqlEngines.generateUnrecognizedFeatureException(NativeSqlEngine.class.getSimpleName(), feature);
|
||||
|
|
|
@ -16086,6 +16086,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
.run();
|
||||
}
|
||||
|
||||
@NotYetSupported(Modes.UNSUPPORTED_DATASOURCE)
|
||||
@Test
|
||||
public void testWindowingOverJoin()
|
||||
{
|
||||
|
|
|
@ -298,7 +298,7 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
|
|||
);
|
||||
|
||||
assertEquals(
|
||||
"Encountered a multi value column [v0]. Window processing does not support MVDs. "
|
||||
"Encountered a multi value column. Window processing does not support MVDs. "
|
||||
+ "Consider using UNNEST or MV_TO_ARRAY.",
|
||||
e.getMessage()
|
||||
);
|
||||
|
|
|
@ -89,6 +89,7 @@ public @interface NotYetSupported
|
|||
RESULT_MISMATCH(AssertionError.class, "(assertResulEquals|AssertionError: column content mismatch)"),
|
||||
LONG_CASTING(AssertionError.class, "expected: java.lang.Long"),
|
||||
UNSUPPORTED_NULL_ORDERING(DruidException.class, "(A|DE)SCENDING ordering with NULLS (LAST|FIRST)"),
|
||||
UNSUPPORTED_DATASOURCE(DruidException.class, "WindowOperatorQuery must run on top of a query or inline data source"),
|
||||
UNION_WITH_COMPLEX_OPERAND(DruidException.class, "Only Table and Values are supported as inputs for Union"),
|
||||
UNION_MORE_STRICT_ROWTYPE_CHECK(DruidException.class, "Row signature mismatch in Union inputs"),
|
||||
JOIN_CONDITION_NOT_PUSHED_CONDITION(DruidException.class, "SQL requires a join with '.*' condition"),
|
||||
|
|
|
@ -2,7 +2,7 @@ type: "operatorValidation"
|
|||
|
||||
sql: |
|
||||
SELECT
|
||||
countryIsoCode,
|
||||
countryIsoCode,
|
||||
CAST (FLOOR(__time TO HOUR) AS BIGINT) t,
|
||||
SUM(delta) delta,
|
||||
SUM(SUM(delta)) OVER (PARTITION BY countryIsoCode ORDER BY CAST (FLOOR(__time TO HOUR) AS BIGINT) ROWS BETWEEN 3 PRECEDING AND 2 FOLLOWING) windowedDelta
|
||||
|
|
Loading…
Reference in New Issue