projection segment merge fixes (#17460) (#17503)

changes:
* fix issue when merging projections from multiple-incremental persists which was hoping that some 'dim conversion' buffers were not closed, but they already were (by the merging iterator). fix involves selectively persisting these conversion buffers to temp files in the segment write out directory and mapping them and tying them to the segment level closer so that they are available after the lifetime of the parent merger
* modify auto column serializers to use segment write out directory for temp files instead of java.io.tmpdir
* fix queryable index projection to not put the time-like column as a dimension, instead only adding it as __time
* use smoosh for temp files so can safely write any Serializer to a temp smoosh
This commit is contained in:
Clint Wylie 2024-11-22 14:58:35 -08:00 committed by GitHub
parent 2bb2acca6f
commit c9aac1bb40
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 669 additions and 276 deletions

View File

@ -33,7 +33,6 @@ import java.nio.channels.WritableByteChannel;
public class SerializerUtils
{
public <T extends OutputStream> void writeString(T out, String name) throws IOException
{
byte[] nameBytes = StringUtils.toUtf8(name);

View File

@ -46,6 +46,7 @@ import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
@ -87,6 +88,8 @@ public class AutoTypeColumnMerger implements DimensionMergerV9
private boolean isVariantType = false;
private byte variantTypeByte = 0x00;
private final File segmentBaseDir;
/**
* @param name column name
* @param outputName output smoosh file name. if this is a base table column, it will be the equivalent to
@ -105,6 +108,7 @@ public class AutoTypeColumnMerger implements DimensionMergerV9
@Nullable ColumnType castToType,
IndexSpec indexSpec,
SegmentWriteOutMedium segmentWriteOutMedium,
File segmentBaseDir,
Closer closer
)
{
@ -114,6 +118,7 @@ public class AutoTypeColumnMerger implements DimensionMergerV9
this.castToType = castToType;
this.indexSpec = indexSpec;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.segmentBaseDir = segmentBaseDir;
this.closer = closer;
}
@ -265,7 +270,7 @@ public class AutoTypeColumnMerger implements DimensionMergerV9
);
}
serializer.openDictionaryWriter();
serializer.openDictionaryWriter(segmentBaseDir);
serializer.serializeFields(mergedFields);
int stringCardinality;

View File

@ -26,10 +26,13 @@ import it.unimi.dsi.fastutil.ints.IntIterator;
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.filter.DruidPredicateFactory;
import org.apache.druid.query.filter.ValueMatcher;
@ -51,13 +54,21 @@ import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
import org.apache.druid.segment.data.V3CompressedVSizeColumnarMultiIntsSerializer;
import org.apache.druid.segment.data.VSizeColumnarIntsSerializer;
import org.apache.druid.segment.data.VSizeColumnarMultiIntsSerializer;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.serde.Serializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@ -101,6 +112,16 @@ public abstract class DictionaryEncodedColumnMerger<T extends Comparable<T>> imp
@Nullable
protected T firstDictionaryValue;
protected File segmentBaseDir;
/**
* This becomes non-null if {@link #markAsParent()} is called indicating that this column is a base table 'parent'
* to some projection column, which requires persisting id conversion buffers to a temporary files. If there are no
* projections defined (or projections which reference this column) then id conversion buffers will be freed after
* calling {@link #writeIndexes(List)}
*/
@MonotonicNonNull
protected PersistedIdConversions persistedIdConversions;
public DictionaryEncodedColumnMerger(
String dimensionName,
@ -109,6 +130,7 @@ public abstract class DictionaryEncodedColumnMerger<T extends Comparable<T>> imp
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
File segmentBaseDir,
Closer closer
)
{
@ -118,8 +140,8 @@ public abstract class DictionaryEncodedColumnMerger<T extends Comparable<T>> imp
this.capabilities = capabilities;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.nullRowsBitmap = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
this.progress = progress;
this.segmentBaseDir = segmentBaseDir;
this.closer = closer;
}
@ -129,6 +151,19 @@ public abstract class DictionaryEncodedColumnMerger<T extends Comparable<T>> imp
@Nullable
protected abstract T coerceValue(T value);
@Override
public void markAsParent()
{
final File tmpOutputFilesDir = new File(segmentBaseDir, "tmp_" + outputName + "_merger");
try {
FileUtils.mkdirp(tmpOutputFilesDir);
}
catch (IOException e) {
throw new RuntimeException(e);
}
persistedIdConversions = closer.register(new PersistedIdConversions(tmpOutputFilesDir));
}
@Override
public void writeMergedValueDictionary(List<IndexableAdapter> adapters) throws IOException
{
@ -192,7 +227,18 @@ public abstract class DictionaryEncodedColumnMerger<T extends Comparable<T>> imp
writeDictionary(() -> dictionaryMergeIterator);
for (int i = 0; i < adapters.size(); i++) {
if (dimValueLookups[i] != null && dictionaryMergeIterator.needConversion(i)) {
dimConversions.set(i, dictionaryMergeIterator.conversions[i]);
final IntBuffer conversionBuffer;
if (persistedIdConversions != null) {
// if we are a projection parent column, persist the id mapping buffer so that child mergers have access
// to the mappings during serialization to adjust their dictionary ids as needed when serializing
conversionBuffer = persistedIdConversions.map(
dimensionName + "_idConversions_" + i,
dictionaryMergeIterator.conversions[i]
);
} else {
conversionBuffer = dictionaryMergeIterator.conversions[i];
}
dimConversions.set(i, conversionBuffer);
}
}
cardinality = dictionaryMergeIterator.getCardinality();
@ -702,4 +748,117 @@ public abstract class DictionaryEncodedColumnMerger<T extends Comparable<T>> imp
void mergeIndexes(int dictId, MutableBitmap mergedIndexes) throws IOException;
void write() throws IOException;
}
protected static class IdConversionSerializer implements Serializer
{
private final IntBuffer buffer;
private final ByteBuffer scratch;
protected IdConversionSerializer(IntBuffer buffer)
{
this.buffer = buffer.asReadOnlyBuffer();
this.buffer.position(0);
this.scratch = ByteBuffer.allocate(Integer.BYTES).order(ByteOrder.nativeOrder());
}
@Override
public long getSerializedSize()
{
return (long) buffer.capacity() * Integer.BYTES;
}
@Override
public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
{
// currently no support for id conversion buffers larger than 2gb
buffer.position(0);
while (buffer.remaining() > 0) {
scratch.position(0);
scratch.putInt(buffer.get());
scratch.flip();
channel.write(scratch);
}
}
}
/**
* Closer of {@link PersistedIdConversion} and a parent path which they are stored in for easy cleanup when the
* segment is closed.
*/
protected static class PersistedIdConversions implements Closeable
{
private final File tempDir;
private final Closer closer;
protected PersistedIdConversions(File tempDir)
{
this.tempDir = tempDir;
this.closer = Closer.create();
}
@Nullable
public IntBuffer map(String name, IntBuffer intBuffer) throws IOException
{
final File bufferDir = new File(tempDir, name);
FileUtils.mkdirp(bufferDir);
final IdConversionSerializer serializer = new IdConversionSerializer(intBuffer);
return closer.register(new PersistedIdConversion(bufferDir, serializer)).getBuffer();
}
@Override
public void close() throws IOException
{
try {
closer.close();
}
finally {
FileUtils.deleteDirectory(tempDir);
}
}
}
/**
* Peristent dictionary id conversion mappings, artifacts created during segment merge which map old dictionary ids
* to new dictionary ids. These persistent mappings are only used when the id mapping needs a lifetime longer than
* the merge of the column itself, such as when the column being merged is a 'parent' column of a projection.
*
* @see DimensionMergerV9#markAsParent()
* @see DimensionMergerV9#attachParent(DimensionMergerV9, List)
*/
protected static class PersistedIdConversion implements Closeable
{
private final File idConversionFile;
private final SmooshedFileMapper bufferMapper;
private final IntBuffer buffer;
private boolean isClosed;
protected PersistedIdConversion(File idConversionDir, Serializer idConversionSerializer) throws IOException
{
this.idConversionFile = idConversionDir;
this.bufferMapper = ColumnSerializerUtils.mapSerializer(idConversionDir, idConversionSerializer, idConversionDir.getName());
final ByteBuffer mappedBuffer = bufferMapper.mapFile(idConversionDir.getName());
mappedBuffer.order(ByteOrder.nativeOrder());
this.buffer = mappedBuffer.asIntBuffer();
}
@Nullable
public IntBuffer getBuffer()
{
if (isClosed) {
return null;
}
return buffer;
}
@Override
public void close() throws IOException
{
if (!isClosed) {
isClosed = true;
bufferMapper.close();
FileUtils.deleteDirectory(idConversionFile);
}
}
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.segment;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
@ -29,6 +30,7 @@ import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Comparator;
/**
@ -100,6 +102,28 @@ public interface DimensionHandler
*/
DimensionIndexer<EncodedType, EncodedKeyComponentType, ActualType> makeIndexer(boolean useMaxMemoryEstimates);
/**
* @deprecated use {@link #makeMerger(String, IndexSpec, SegmentWriteOutMedium, ColumnCapabilities, ProgressIndicator, File, Closer)}
*
* This method exists for backwards compatiblity with older versions of Druid since this is an unofficial extension
* point that must be implemented to create custom dimension types, and will be removed in a future release.
*/
@Deprecated
default DimensionMergerV9 makeMerger(
String outputName,
IndexSpec indexSpec,
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
Closer closer
)
{
throw DruidException.defensive(
"this method is no longer supported, use makeMerger(String, IndexSpec, SegmentWriteOutMedium, ColumnCapabilities, ProgressIndicator, File, Closer) instead"
);
}
/**
* Creates a new DimensionMergerV9, a per-dimension object responsible for merging indexes/row data across segments
* and building the on-disk representation of a dimension. For use with IndexMergerV9 only.
@ -113,16 +137,32 @@ public interface DimensionHandler
* needed
* @param capabilities The ColumnCapabilities of the dimension represented by this DimensionHandler
* @param progress ProgressIndicator used by the merging process
* @param segmentBaseDir segment write out path; temporary files may be created here, though should delete
* after merge is finished OR be registered with the Closer parameter
* @param closer Closer tied to segment completion. Anything which is not cleaned up inside of the
* merger after merge is complete should be registered with this closer. For example,
* resources which are required for final serialization of the column
* @return A new DimensionMergerV9 object.
*/
DimensionMergerV9 makeMerger(
default DimensionMergerV9 makeMerger(
String outputName,
IndexSpec indexSpec,
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
File segmentBaseDir,
Closer closer
);
)
{
return makeMerger(
outputName,
indexSpec,
segmentWriteOutMedium,
capabilities,
progress,
closer
);
}
/**
* Given an key component representing a single set of row value(s) for this dimension as an Object,

View File

@ -40,11 +40,26 @@ public interface DimensionMergerV9 extends DimensionMerger
ColumnDescriptor makeColumnDescriptor();
/**
* Attaches the {@link DimensionMergerV9} of a "projection" parent column so that stuff like value dictionaries can
* be shared between parent and child
* Sets this merger as the "parent" of another merger for a "projection", allowing for this merger to preserve any
* state which might be required for the projection mergers to do their thing. This method MUST be called prior to
* performing any merge work. Typically, this method is only implemented if
* {@link #attachParent(DimensionMergerV9, List)} requires it.
*/
default void attachParent(DimensionMergerV9 parent, List<IndexableAdapter> projectionAdapters) throws IOException
default void markAsParent()
{
// do nothing
}
/**
* Attaches the {@link DimensionMergerV9} of a "projection" parent column so that stuff like value dictionaries can
* be shared between parent and child. This method is called during merging instead of {@link #writeMergedValueDictionary(List)} if
* the parent column exists.
*
* @see IndexMergerV9#makeProjections
*/
default void attachParent(DimensionMergerV9 parent, List<IndexableAdapter> projectionAdapters) throws IOException
{
// by default fall through to writing merged dictionary
writeMergedValueDictionary(projectionAdapters);
}
}

View File

@ -30,6 +30,7 @@ import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import org.apache.druid.segment.selector.settable.SettableDoubleColumnValueSelector;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.io.File;
import java.util.Comparator;
public class DoubleDimensionHandler implements DimensionHandler<Double, Double, Double>
@ -82,6 +83,7 @@ public class DoubleDimensionHandler implements DimensionHandler<Double, Double,
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
File segmentBaseDir,
Closer closer
)
{

View File

@ -30,6 +30,7 @@ import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import org.apache.druid.segment.selector.settable.SettableFloatColumnValueSelector;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.io.File;
import java.util.Comparator;
public class FloatDimensionHandler implements DimensionHandler<Float, Float, Float>
@ -82,6 +83,7 @@ public class FloatDimensionHandler implements DimensionHandler<Float, Float, Flo
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
File segmentBaseDir,
Closer closer
)
{

View File

@ -719,6 +719,7 @@ public class IndexIO
if (groupingColumn.equals(projectionSpec.getSchema().getTimeColumnName())) {
projectionColumns.put(ColumnHolder.TIME_COLUMN_NAME, projectionColumns.get(groupingColumn));
projectionColumns.remove(groupingColumn);
}
}
for (AggregatorFactory aggregator : projectionSpec.getSchema().getAggregators()) {

View File

@ -227,12 +227,24 @@ public class IndexMergerV9 implements IndexMerger
segmentWriteOutMedium,
dimFormats.get(i).toColumnCapabilities(),
progress,
outDir,
closer
);
mergers.add(merger);
mergersMap.put(mergedDimensions.get(i), merger);
}
if (segmentMetadata != null && segmentMetadata.getProjections() != null) {
for (AggregateProjectionMetadata projectionMetadata : segmentMetadata.getProjections()) {
for (String dimension : projectionMetadata.getSchema().getGroupingColumns()) {
DimensionMergerV9 merger = mergersMap.get(dimension);
if (merger != null) {
merger.markAsParent();
}
}
}
}
/************* Setup Dim Conversions **************/
progress.progress();
startTime = System.currentTimeMillis();
@ -302,6 +314,7 @@ public class IndexMergerV9 implements IndexMerger
indexSpec,
segmentWriteOutMedium,
progress,
outDir,
closer,
mergersMap,
segmentMetadata
@ -356,6 +369,7 @@ public class IndexMergerV9 implements IndexMerger
final IndexSpec indexSpec,
final SegmentWriteOutMedium segmentWriteOutMedium,
final ProgressIndicator progress,
final File segmentBaseDir,
final Closer closer,
final Map<String, DimensionMergerV9> parentMergers,
final Metadata segmentMetadata
@ -389,6 +403,7 @@ public class IndexMergerV9 implements IndexMerger
segmentWriteOutMedium,
dimensionFormat.toColumnCapabilities(),
progress,
segmentBaseDir,
closer
);
if (parentMergers.containsKey(dimension)) {

View File

@ -30,6 +30,7 @@ import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import org.apache.druid.segment.selector.settable.SettableLongColumnValueSelector;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.io.File;
import java.util.Comparator;
public class LongDimensionHandler implements DimensionHandler<Long, Long, Long>
@ -82,6 +83,7 @@ public class LongDimensionHandler implements DimensionHandler<Long, Long, Long>
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
File segmentBaseDir,
Closer closer
)
{

View File

@ -31,6 +31,7 @@ import org.apache.druid.segment.selector.settable.SettableObjectColumnValueSelec
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Comparator;
public class NestedCommonFormatColumnHandler implements DimensionHandler<StructuredData, StructuredData, StructuredData>
@ -82,10 +83,11 @@ public class NestedCommonFormatColumnHandler implements DimensionHandler<Structu
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
File segmentBaseDir,
Closer closer
)
{
return new AutoTypeColumnMerger(name, outputName, castTo, indexSpec, segmentWriteOutMedium, closer);
return new AutoTypeColumnMerger(name, outputName, castTo, indexSpec, segmentWriteOutMedium, segmentBaseDir, closer);
}
@Override

View File

@ -30,6 +30,7 @@ import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import org.apache.druid.segment.selector.settable.SettableObjectColumnValueSelector;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.io.File;
import java.util.Comparator;
public class NestedDataColumnHandlerV4 implements DimensionHandler<StructuredData, StructuredData, StructuredData>
@ -78,6 +79,7 @@ public class NestedDataColumnHandlerV4 implements DimensionHandler<StructuredDat
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
File segmentBaseDir,
Closer closer
)
{

View File

@ -44,6 +44,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.stream.Collectors;
/**
*
@ -239,21 +240,36 @@ public abstract class SimpleQueryableIndex implements QueryableIndex
public QueryableIndex getProjectionQueryableIndex(String name)
{
final AggregateProjectionMetadata projectionSpec = projectionsMap.get(name);
final Metadata projectionMetadata = new Metadata(
null,
projectionSpec.getSchema().getAggregators(),
null,
null,
true,
projectionSpec.getSchema().getOrderingWithTimeColumnSubstitution(),
null
);
return new SimpleQueryableIndex(
dataInterval,
new ListIndexed<>(projectionSpec.getSchema().getGroupingColumns()),
new ListIndexed<>(
projectionSpec.getSchema()
.getGroupingColumns()
.stream()
.filter(x -> !x.equals(projectionSpec.getSchema().getTimeColumnName()))
.collect(Collectors.toList())
),
bitmapFactory,
projectionColumns.get(name),
fileMapper,
true,
null,
projectionMetadata,
null
)
{
@Override
public Metadata getMetadata()
{
return null;
return projectionMetadata;
}
@Override

View File

@ -32,6 +32,7 @@ import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import org.apache.druid.segment.selector.settable.SettableDimensionValueSelector;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.io.File;
import java.util.Collections;
import java.util.Comparator;
@ -169,6 +170,7 @@ public class StringDimensionHandler implements DimensionHandler<Integer, int[],
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
File segmentBaseDir,
Closer closer
)
{
@ -188,6 +190,7 @@ public class StringDimensionHandler implements DimensionHandler<Integer, int[],
segmentWriteOutMedium,
capabilities,
progress,
segmentBaseDir,
closer
);
}

View File

@ -48,6 +48,7 @@ import org.apache.druid.segment.serde.DictionaryEncodedColumnPartSerde;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
@ -84,10 +85,11 @@ public class StringDimensionMergerV9 extends DictionaryEncodedColumnMerger<Strin
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
File segmentBaseDir,
Closer closer
)
{
super(dimensionName, outputName, indexSpec, segmentWriteOutMedium, capabilities, progress, closer);
super(dimensionName, outputName, indexSpec, segmentWriteOutMedium, capabilities, progress, segmentBaseDir, closer);
}
@Override

View File

@ -19,15 +19,11 @@
package org.apache.druid.segment.nested;
import com.google.common.primitives.Ints;
import org.apache.druid.annotations.SuppressFBWarnings;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.ByteBufferUtils;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
import org.apache.druid.segment.column.StringEncodingStrategies;
import org.apache.druid.segment.column.TypeStrategies;
import org.apache.druid.segment.data.DictionaryWriter;
@ -35,6 +31,7 @@ import org.apache.druid.segment.data.FixedIndexed;
import org.apache.druid.segment.data.FrontCodedIntArrayIndexed;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable;
import java.io.Closeable;
@ -42,13 +39,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.EnumSet;
/**
* Value to dictionary id lookup, backed with memory mapped dictionaries populated lazily by the supplied
@ -56,36 +46,38 @@ import java.util.EnumSet;
*/
public final class DictionaryIdLookup implements Closeable
{
private final String name;
private final Path tempBasePath;
private final File tempBasePath;
@Nullable
private final DictionaryWriter<String> stringDictionaryWriter;
private Path stringDictionaryFile = null;
private File stringDictionaryFile = null;
private SmooshedFileMapper stringBufferMapper = null;
private Indexed<ByteBuffer> stringDictionary = null;
@Nullable
private final DictionaryWriter<Long> longDictionaryWriter;
private Path longDictionaryFile = null;
private MappedByteBuffer longBuffer = null;
private File longDictionaryFile = null;
private SmooshedFileMapper longBufferMapper = null;
private FixedIndexed<Long> longDictionary = null;
@Nullable
private final DictionaryWriter<Double> doubleDictionaryWriter;
private Path doubleDictionaryFile = null;
MappedByteBuffer doubleBuffer = null;
private File doubleDictionaryFile = null;
SmooshedFileMapper doubleBufferMapper = null;
FixedIndexed<Double> doubleDictionary = null;
@Nullable
private final DictionaryWriter<int[]> arrayDictionaryWriter;
private Path arrayDictionaryFile = null;
private MappedByteBuffer arrayBuffer = null;
private File arrayDictionaryFile = null;
private SmooshedFileMapper arrayBufferMapper = null;
private FrontCodedIntArrayIndexed arrayDictionary = null;
private final Closer closer = Closer.create();
public DictionaryIdLookup(
String name,
Path tempBasePath,
File tempBaseDir,
@Nullable DictionaryWriter<String> stringDictionaryWriter,
@Nullable DictionaryWriter<Long> longDictionaryWriter,
@Nullable DictionaryWriter<Double> doubleDictionaryWriter,
@ -93,7 +85,7 @@ public final class DictionaryIdLookup implements Closeable
)
{
this.name = name;
this.tempBasePath = tempBasePath;
this.tempBasePath = tempBaseDir;
this.stringDictionaryWriter = stringDictionaryWriter;
this.longDictionaryWriter = longDictionaryWriter;
this.doubleDictionaryWriter = doubleDictionaryWriter;
@ -172,42 +164,27 @@ public final class DictionaryIdLookup implements Closeable
}
@Nullable
public ByteBuffer getLongBuffer()
public SmooshedFileMapper getLongBufferMapper()
{
return longBuffer;
return longBufferMapper;
}
@Nullable
public ByteBuffer getDoubleBuffer()
public SmooshedFileMapper getDoubleBufferMapper()
{
return doubleBuffer;
return doubleBufferMapper;
}
@Nullable
public ByteBuffer getArrayBuffer()
public SmooshedFileMapper getArrayBufferMapper()
{
return arrayBuffer;
return arrayBufferMapper;
}
@Override
public void close()
{
if (stringBufferMapper != null) {
stringBufferMapper.close();
deleteTempFile(stringDictionaryFile);
}
if (longBuffer != null) {
ByteBufferUtils.unmap(longBuffer);
deleteTempFile(longDictionaryFile);
}
if (doubleBuffer != null) {
ByteBufferUtils.unmap(doubleBuffer);
deleteTempFile(doubleDictionaryFile);
}
if (arrayBuffer != null) {
ByteBufferUtils.unmap(arrayBuffer);
deleteTempFile(arrayDictionaryFile);
}
CloseableUtils.closeAndWrapExceptions(closer);
}
private int longOffset()
@ -228,28 +205,16 @@ public final class DictionaryIdLookup implements Closeable
private void ensureStringDictionaryLoaded()
{
if (stringDictionary == null) {
// GenericIndexed v2 can write to multiple files if the dictionary is larger than 2gb, so we use a smooshfile
// for strings because of this. if other type dictionary writers could potentially use multiple internal files
// in the future, we should transition them to using this approach as well (or build a combination smoosher and
// mapper so that we can have a mutable smoosh)
File stringSmoosh = FileUtils.createTempDirInLocation(tempBasePath, StringUtils.urlEncode(name) + "__stringTempSmoosh");
stringDictionaryFile = stringSmoosh.toPath();
final String fileName = ColumnSerializerUtils.getInternalFileName(
name,
ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME
);
stringDictionaryFile = makeTempDir(fileName);
stringBufferMapper = closer.register(
ColumnSerializerUtils.mapSerializer(stringDictionaryFile, stringDictionaryWriter, fileName)
);
try (
final FileSmoosher smoosher = new FileSmoosher(stringSmoosh);
final SmooshedWriter writer = smoosher.addWithSmooshedWriter(
fileName,
stringDictionaryWriter.getSerializedSize()
)
) {
stringDictionaryWriter.writeTo(writer, smoosher);
writer.close();
smoosher.close();
stringBufferMapper = SmooshedFileMapper.load(stringSmoosh);
try {
final ByteBuffer stringBuffer = stringBufferMapper.mapFile(fileName);
stringDictionary = StringEncodingStrategies.getStringDictionarySupplier(
stringBufferMapper,
@ -266,148 +231,79 @@ public final class DictionaryIdLookup implements Closeable
private void ensureLongDictionaryLoaded()
{
if (longDictionary == null) {
longDictionaryFile = makeTempFile(name + ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
longBuffer = mapWriter(longDictionaryFile, longDictionaryWriter);
longDictionary = FixedIndexed.read(longBuffer, TypeStrategies.LONG, ByteOrder.nativeOrder(), Long.BYTES).get();
// reset position
longBuffer.position(0);
final String fileName = ColumnSerializerUtils.getInternalFileName(
name,
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME
);
longDictionaryFile = makeTempDir(fileName);
longBufferMapper = closer.register(
ColumnSerializerUtils.mapSerializer(longDictionaryFile, longDictionaryWriter, fileName)
);
try {
final ByteBuffer buffer = longBufferMapper.mapFile(fileName);
longDictionary = FixedIndexed.read(buffer, TypeStrategies.LONG, ByteOrder.nativeOrder(), Long.BYTES).get();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
private void ensureDoubleDictionaryLoaded()
{
if (doubleDictionary == null) {
doubleDictionaryFile = makeTempFile(name + ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
doubleBuffer = mapWriter(doubleDictionaryFile, doubleDictionaryWriter);
doubleDictionary = FixedIndexed.read(
doubleBuffer,
TypeStrategies.DOUBLE,
ByteOrder.nativeOrder(),
Double.BYTES
).get();
// reset position
doubleBuffer.position(0);
final String fileName = ColumnSerializerUtils.getInternalFileName(
name,
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME
);
doubleDictionaryFile = makeTempDir(fileName);
doubleBufferMapper = closer.register(
ColumnSerializerUtils.mapSerializer(doubleDictionaryFile, doubleDictionaryWriter, fileName)
);
try {
final ByteBuffer buffer = doubleBufferMapper.mapFile(fileName);
doubleDictionary = FixedIndexed.read(buffer, TypeStrategies.DOUBLE, ByteOrder.nativeOrder(), Long.BYTES).get();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
private void ensureArrayDictionaryLoaded()
{
if (arrayDictionary == null && arrayDictionaryWriter != null) {
arrayDictionaryFile = makeTempFile(name + ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
arrayBuffer = mapWriter(arrayDictionaryFile, arrayDictionaryWriter);
arrayDictionary = FrontCodedIntArrayIndexed.read(arrayBuffer, ByteOrder.nativeOrder()).get();
// reset position
arrayBuffer.position(0);
final String fileName = ColumnSerializerUtils.getInternalFileName(
name,
ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME
);
arrayDictionaryFile = makeTempDir(fileName);
arrayBufferMapper = closer.register(
ColumnSerializerUtils.mapSerializer(arrayDictionaryFile, arrayDictionaryWriter, fileName)
);
try {
final ByteBuffer buffer = arrayBufferMapper.mapFile(fileName);
arrayDictionary = FrontCodedIntArrayIndexed.read(buffer, ByteOrder.nativeOrder()).get();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
private Path makeTempFile(String name)
private File makeTempDir(String fileName)
{
try {
return Files.createTempFile(tempBasePath, StringUtils.urlEncode(name), null);
final File f = new File(tempBasePath, StringUtils.urlEncode(fileName));
FileUtils.mkdirp(f);
closer.register(() -> FileUtils.deleteDirectory(f));
return f;
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
private void deleteTempFile(Path path)
{
try {
final File file = path.toFile();
if (file.isDirectory()) {
FileUtils.deleteDirectory(file);
} else {
Files.delete(path);
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION")
private MappedByteBuffer mapWriter(Path path, DictionaryWriter<?> writer)
{
final EnumSet<StandardOpenOption> options = EnumSet.of(
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING
);
try (FileChannel fileChannel = FileChannel.open(path, options);
GatheringByteChannel smooshChannel = makeWriter(fileChannel, writer.getSerializedSize())) {
//noinspection DataFlowIssue
writer.writeTo(smooshChannel, null);
return fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, writer.getSerializedSize());
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
private GatheringByteChannel makeWriter(FileChannel channel, long size)
{
// basically same code as smooshed writer, can't use channel directly because copying between channels
// doesn't handle size of source channel correctly
return new GatheringByteChannel()
{
private boolean isClosed = false;
private long currOffset = 0;
@Override
public boolean isOpen()
{
return !isClosed;
}
@Override
public void close() throws IOException
{
channel.close();
isClosed = true;
}
public int bytesLeft()
{
return (int) (size - currOffset);
}
@Override
public int write(ByteBuffer buffer) throws IOException
{
return addToOffset(channel.write(buffer));
}
@Override
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException
{
return addToOffset(channel.write(srcs, offset, length));
}
@Override
public long write(ByteBuffer[] srcs) throws IOException
{
return addToOffset(channel.write(srcs));
}
public int addToOffset(long numBytesWritten)
{
if (numBytesWritten > bytesLeft()) {
throw DruidException.defensive(
"Wrote more bytes[%,d] than available[%,d]. Don't do that.",
numBytesWritten,
bytesLeft()
);
}
currOffset += numBytesWritten;
return Ints.checkedCast(numBytesWritten);
}
};
}
public int getStringCardinality()
{
ensureStringDictionaryLoaded();

View File

@ -21,11 +21,13 @@ package org.apache.druid.segment.nested;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.data.VByte;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.serde.Serializer;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@ -53,7 +55,7 @@ public abstract class NestedCommonFormatColumnSerializer implements GenericColum
public static final String RAW_FILE_NAME = "__raw";
public static final String NESTED_FIELD_PREFIX = "__field_";
public abstract void openDictionaryWriter() throws IOException;
public abstract void openDictionaryWriter(File segmentBaseDir) throws IOException;
public void serializeFields(SortedMap<String, FieldTypeInfo.MutableTypeSet> fields) throws IOException
{
@ -80,9 +82,11 @@ public abstract class NestedCommonFormatColumnSerializer implements GenericColum
ColumnSerializerUtils.writeInternal(smoosher, serializer, getColumnName(), fileName);
}
protected void writeInternal(FileSmoosher smoosher, ByteBuffer buffer, String fileName) throws IOException
protected void copyFromTempSmoosh(FileSmoosher smoosher, SmooshedFileMapper fileMapper) throws IOException
{
ColumnSerializerUtils.writeInternal(smoosher, buffer, getColumnName(), fileName);
for (String internalName : fileMapper.getInternalFilenames()) {
smoosher.add(internalName, fileMapper.mapFile(internalName));
}
}
protected void writeV0Header(WritableByteChannel channel, ByteBuffer columnNameBuffer) throws IOException

View File

@ -24,12 +24,10 @@ import com.google.common.collect.Maps;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprEval;
@ -51,6 +49,7 @@ import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@ -201,7 +200,7 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ
}
@Override
public void openDictionaryWriter() throws IOException
public void openDictionaryWriter(File segmentBaseDir) throws IOException
{
fieldsWriter = new GenericIndexedWriter<>(segmentWriteOutMedium, name, GenericIndexed.STRING_STRATEGY);
fieldsWriter.open();
@ -243,7 +242,7 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ
globalDictionaryIdLookup = closer.register(
new DictionaryIdLookup(
name,
FileUtils.getTempDir(),
segmentBaseDir,
dictionaryWriter,
longDictionaryWriter,
doubleDictionaryWriter,
@ -440,37 +439,22 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ
if (writeDictionary) {
if (globalDictionaryIdLookup.getStringBufferMapper() != null) {
SmooshedFileMapper fileMapper = globalDictionaryIdLookup.getStringBufferMapper();
for (String internalName : fileMapper.getInternalFilenames()) {
smoosher.add(internalName, fileMapper.mapFile(internalName));
}
copyFromTempSmoosh(smoosher, globalDictionaryIdLookup.getStringBufferMapper());
} else {
writeInternal(smoosher, dictionaryWriter, ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME);
}
if (globalDictionaryIdLookup.getLongBuffer() != null) {
writeInternal(
smoosher,
globalDictionaryIdLookup.getLongBuffer(),
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME
);
if (globalDictionaryIdLookup.getLongBufferMapper() != null) {
copyFromTempSmoosh(smoosher, globalDictionaryIdLookup.getLongBufferMapper());
} else {
writeInternal(smoosher, longDictionaryWriter, ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
}
if (globalDictionaryIdLookup.getDoubleBuffer() != null) {
writeInternal(
smoosher,
globalDictionaryIdLookup.getDoubleBuffer(),
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME
);
if (globalDictionaryIdLookup.getDoubleBufferMapper() != null) {
copyFromTempSmoosh(smoosher, globalDictionaryIdLookup.getDoubleBufferMapper());
} else {
writeInternal(smoosher, doubleDictionaryWriter, ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
}
if (globalDictionaryIdLookup.getArrayBuffer() != null) {
writeInternal(
smoosher,
globalDictionaryIdLookup.getArrayBuffer(),
ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME
);
if (globalDictionaryIdLookup.getArrayBufferMapper() != null) {
copyFromTempSmoosh(smoosher, globalDictionaryIdLookup.getArrayBufferMapper());
} else {
writeInternal(smoosher, arrayDictionaryWriter, ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
}

View File

@ -200,7 +200,7 @@ public class NestedDataColumnSerializerV4 implements GenericColumnSerializer<Str
globalDictionaryIdLookup = closer.register(
new DictionaryIdLookup(
name,
FileUtils.getTempDir(),
FileUtils.getTempDir().toFile(),
dictionaryWriter,
longDictionaryWriter,
doubleDictionaryWriter,

View File

@ -19,7 +19,6 @@
package org.apache.druid.segment.nested;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
@ -35,6 +34,7 @@ import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteOrder;
@ -66,7 +66,7 @@ public class ScalarDoubleColumnSerializer extends ScalarNestedCommonFormatColumn
}
@Override
public void openDictionaryWriter() throws IOException
public void openDictionaryWriter(File segmentBaseDir) throws IOException
{
dictionaryWriter = new FixedIndexedWriter<>(
segmentWriteOutMedium,
@ -79,7 +79,7 @@ public class ScalarDoubleColumnSerializer extends ScalarNestedCommonFormatColumn
dictionaryIdLookup = closer.register(
new DictionaryIdLookup(
name,
FileUtils.getTempDir(),
segmentBaseDir,
null,
null,
dictionaryWriter,
@ -136,8 +136,8 @@ public class ScalarDoubleColumnSerializer extends ScalarNestedCommonFormatColumn
@Override
protected void writeDictionaryFile(FileSmoosher smoosher) throws IOException
{
if (dictionaryIdLookup.getDoubleBuffer() != null) {
writeInternal(smoosher, dictionaryIdLookup.getDoubleBuffer(), ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
if (dictionaryIdLookup.getDoubleBufferMapper() != null) {
copyFromTempSmoosh(smoosher, dictionaryIdLookup.getDoubleBufferMapper());
} else {
writeInternal(smoosher, dictionaryWriter, ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
}

View File

@ -19,7 +19,6 @@
package org.apache.druid.segment.nested;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
@ -35,6 +34,7 @@ import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteOrder;
@ -67,7 +67,7 @@ public class ScalarLongColumnSerializer extends ScalarNestedCommonFormatColumnSe
}
@Override
public void openDictionaryWriter() throws IOException
public void openDictionaryWriter(File segmentBaseDir) throws IOException
{
dictionaryWriter = new FixedIndexedWriter<>(
segmentWriteOutMedium,
@ -80,7 +80,7 @@ public class ScalarLongColumnSerializer extends ScalarNestedCommonFormatColumnSe
dictionaryIdLookup = closer.register(
new DictionaryIdLookup(
name,
FileUtils.getTempDir(),
segmentBaseDir,
null,
dictionaryWriter,
null,
@ -136,8 +136,8 @@ public class ScalarLongColumnSerializer extends ScalarNestedCommonFormatColumnSe
@Override
protected void writeDictionaryFile(FileSmoosher smoosher) throws IOException
{
if (dictionaryIdLookup.getLongBuffer() != null) {
writeInternal(smoosher, dictionaryIdLookup.getLongBuffer(), ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
if (dictionaryIdLookup.getLongBufferMapper() != null) {
copyFromTempSmoosh(smoosher, dictionaryIdLookup.getLongBufferMapper());
} else {
writeInternal(smoosher, dictionaryWriter, ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.segment.nested;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
@ -34,6 +33,7 @@ import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
/**
@ -62,7 +62,7 @@ public class ScalarStringColumnSerializer extends ScalarNestedCommonFormatColumn
}
@Override
public void openDictionaryWriter() throws IOException
public void openDictionaryWriter(File segmentBaseDir) throws IOException
{
dictionaryWriter = StringEncodingStrategies.getStringDictionaryWriter(
indexSpec.getStringDictionaryEncoding(),
@ -73,7 +73,7 @@ public class ScalarStringColumnSerializer extends ScalarNestedCommonFormatColumn
dictionaryIdLookup = closer.register(
new DictionaryIdLookup(
name,
FileUtils.getTempDir(),
segmentBaseDir,
dictionaryWriter,
null,
null,

View File

@ -26,13 +26,11 @@ import it.unimi.dsi.fastutil.ints.IntIterator;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExpressionType;
@ -52,6 +50,7 @@ import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@ -133,7 +132,7 @@ public class VariantColumnSerializer extends NestedCommonFormatColumnSerializer
}
@Override
public void openDictionaryWriter() throws IOException
public void openDictionaryWriter(File segmentBaseDir) throws IOException
{
dictionaryWriter = StringEncodingStrategies.getStringDictionaryWriter(
indexSpec.getStringDictionaryEncoding(),
@ -171,7 +170,7 @@ public class VariantColumnSerializer extends NestedCommonFormatColumnSerializer
dictionaryIdLookup = closer.register(
new DictionaryIdLookup(
name,
FileUtils.getTempDir(),
segmentBaseDir,
dictionaryWriter,
longDictionaryWriter,
doubleDictionaryWriter,
@ -423,29 +422,22 @@ public class VariantColumnSerializer extends NestedCommonFormatColumnSerializer
if (writeDictionary) {
if (dictionaryIdLookup.getStringBufferMapper() != null) {
SmooshedFileMapper fileMapper = dictionaryIdLookup.getStringBufferMapper();
for (String internalName : fileMapper.getInternalFilenames()) {
smoosher.add(internalName, fileMapper.mapFile(internalName));
}
copyFromTempSmoosh(smoosher, dictionaryIdLookup.getStringBufferMapper());
} else {
writeInternal(smoosher, dictionaryWriter, ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME);
}
if (dictionaryIdLookup.getLongBuffer() != null) {
writeInternal(smoosher, dictionaryIdLookup.getLongBuffer(), ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
if (dictionaryIdLookup.getLongBufferMapper() != null) {
copyFromTempSmoosh(smoosher, dictionaryIdLookup.getLongBufferMapper());
} else {
writeInternal(smoosher, longDictionaryWriter, ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
}
if (dictionaryIdLookup.getDoubleBuffer() != null) {
writeInternal(
smoosher,
dictionaryIdLookup.getDoubleBuffer(),
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME
);
if (dictionaryIdLookup.getDoubleBufferMapper() != null) {
copyFromTempSmoosh(smoosher, dictionaryIdLookup.getDoubleBufferMapper());
} else {
writeInternal(smoosher, doubleDictionaryWriter, ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
}
if (dictionaryIdLookup.getArrayBuffer() != null) {
writeInternal(smoosher, dictionaryIdLookup.getArrayBuffer(), ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
if (dictionaryIdLookup.getArrayBufferMapper() != null) {
copyFromTempSmoosh(smoosher, dictionaryIdLookup.getArrayBufferMapper());
} else {
writeInternal(smoosher, arrayDictionaryWriter, ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
}

View File

@ -25,10 +25,11 @@ import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
public class ColumnSerializerUtils
{
@ -65,17 +66,31 @@ public class ColumnSerializerUtils
}
}
public static void writeInternal(FileSmoosher smoosher, ByteBuffer buffer, String columnName, String fileName)
throws IOException
{
final String internalName = getInternalFileName(columnName, fileName);
try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(internalName, buffer.capacity())) {
smooshChannel.write(buffer);
}
}
public static String getInternalFileName(String fileNameBase, String field)
{
return fileNameBase + "." + field;
}
/**
* Writes a {@link Serializer} to a 'smoosh file' which contains the contents of this single serializer, with the
* serializer writing to an internal file specified by the name argument, returning a {@link SmooshedFileMapper}
*/
public static SmooshedFileMapper mapSerializer(File smooshFile, Serializer writer, String name)
{
try (
final FileSmoosher smoosher = new FileSmoosher(smooshFile);
final SmooshedWriter smooshedWriter = smoosher.addWithSmooshedWriter(
name,
writer.getSerializedSize()
)
) {
writer.writeTo(smooshedWriter, smoosher);
smooshedWriter.close();
smoosher.close();
return SmooshedFileMapper.load(smooshFile);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -36,8 +36,11 @@ public interface Serializer
long getSerializedSize() throws IOException;
/**
* Writes serialized form of this object to the given channel. If parallel data streams are needed, they could be
* created with the provided smoosher.
* Writes the serialized form of this object. The entire object may be written to the provided channel, or the object
* may be split over the provided channel and files added to the {@link FileSmoosher], where additional channels can
* be created via {@link FileSmoosher#addWithSmooshedWriter(String, long)}. The latter approach is useful when the
* serialized form of the object is too large for a single smoosh container. At the time this javadoc was written,
* the max smoosh container size is limit to the max {@link java.nio.ByteBuffer} size.
*/
void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException;
}

View File

@ -30,7 +30,9 @@ import it.unimi.dsi.fastutil.ints.IntIterator;
import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.ListBasedInputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.AggregateProjectionSpec;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import org.apache.druid.data.input.impl.DimensionsSpec;
@ -39,17 +41,23 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.query.BitmapResultFactory;
import org.apache.druid.query.DefaultBitmapResultFactory;
import org.apache.druid.query.OrderBy;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnIndexSupplier;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.DictionaryEncodedColumn;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.StringUtf8DictionaryEncodedColumn;
import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.data.BitmapValues;
@ -63,9 +71,11 @@ import org.apache.druid.segment.incremental.IncrementalIndexAdapter;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
import org.apache.druid.segment.index.semantic.ValueIndexes;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
@ -143,6 +153,7 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
private final IndexSpec indexSpec;
private final IndexIO indexIO;
private final boolean useBitmapIndexes;
private final BitmapSerdeFactory serdeFactory;
@Rule
public final CloserRule closer = new CloserRule(false);
@ -163,6 +174,7 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
.withLongEncoding(longEncodingStrategy)
.build();
this.indexIO = TestHelper.getTestIndexIO();
this.serdeFactory = bitmapSerdeFactory;
this.useBitmapIndexes = bitmapSerdeFactory != null;
}
@ -3031,6 +3043,229 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
validateTestMaxColumnsToMergeOutputSegment(merged7);
}
@Test
public void testMergeProjections() throws IOException
{
File tmp = FileUtils.createTempDir();
closer.closeLater(tmp::delete);
final DateTime timestamp = Granularities.DAY.bucket(DateTimes.nowUtc()).getStart();
final RowSignature rowSignature = RowSignature.builder()
.add("a", ColumnType.STRING)
.add("b", ColumnType.STRING)
.add("c", ColumnType.LONG)
.build();
final List<InputRow> rows1 = Arrays.asList(
new ListBasedInputRow(
rowSignature,
timestamp,
rowSignature.getColumnNames(),
Arrays.asList("a", "x", 1L)
),
new ListBasedInputRow(
rowSignature,
timestamp.plusMinutes(1),
rowSignature.getColumnNames(),
Arrays.asList("b", "y", 2L)
),
new ListBasedInputRow(
rowSignature,
timestamp.plusHours(2),
rowSignature.getColumnNames(),
Arrays.asList("a", "z", 3L)
)
);
final List<InputRow> rows2 = Arrays.asList(
new ListBasedInputRow(
rowSignature,
timestamp,
rowSignature.getColumnNames(),
Arrays.asList("b", "y", 1L)
),
new ListBasedInputRow(
rowSignature,
timestamp.plusMinutes(1),
rowSignature.getColumnNames(),
Arrays.asList("d", "w", 2L)
),
new ListBasedInputRow(
rowSignature,
timestamp.plusHours(2),
rowSignature.getColumnNames(),
Arrays.asList("b", "z", 3L)
)
);
final DimensionsSpec.Builder dimensionsBuilder =
DimensionsSpec.builder()
.setDimensions(
Arrays.asList(
new StringDimensionSchema("a"),
new StringDimensionSchema("b")
)
);
List<AggregateProjectionSpec> projections = Arrays.asList(
new AggregateProjectionSpec(
"a_hourly_c_sum",
VirtualColumns.create(
Granularities.toVirtualColumn(Granularities.HOUR, "__gran")
),
Arrays.asList(
new StringDimensionSchema("a"),
new LongDimensionSchema("__gran")
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("c_sum", "c")
}
),
new AggregateProjectionSpec(
"a_c_sum",
VirtualColumns.EMPTY,
Collections.singletonList(
new StringDimensionSchema("a")
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("c_sum", "c")
}
)
);
IndexBuilder bob = IndexBuilder.create()
.tmpDir(tmp)
.schema(
IncrementalIndexSchema.builder()
.withDimensionsSpec(dimensionsBuilder.build())
.withRollup(false)
.withProjections(projections)
.build()
)
.rows(rows1);
IndexBuilder bob2 = IndexBuilder.create()
.tmpDir(tmp)
.schema(
IncrementalIndexSchema.builder()
.withDimensionsSpec(dimensionsBuilder.build())
.withRollup(false)
.withProjections(projections)
.build()
)
.rows(rows2);
QueryableIndex q1 = bob.buildMMappedIndex();
QueryableIndex q2 = bob2.buildMMappedIndex();
QueryableIndex merged = indexIO.loadIndex(
indexMerger.merge(
Arrays.asList(
new QueryableIndexIndexableAdapter(q1),
new QueryableIndexIndexableAdapter(q2)
),
true,
new AggregatorFactory[0],
temporaryFolder.newFolder(),
dimensionsBuilder.build(),
IndexSpec.DEFAULT,
-1
)
);
CursorBuildSpec p1Spec = CursorBuildSpec.builder()
.setQueryContext(
QueryContext.of(
ImmutableMap.of(QueryContexts.USE_PROJECTION, "a_hourly_c_sum")
)
)
.setVirtualColumns(
VirtualColumns.create(
Granularities.toVirtualColumn(Granularities.HOUR, "gran")
)
)
.setAggregators(
Collections.singletonList(
new LongSumAggregatorFactory("c", "c")
)
)
.setGroupingColumns(Collections.singletonList("a"))
.build();
CursorBuildSpec p2Spec = CursorBuildSpec.builder()
.setQueryContext(
QueryContext.of(
ImmutableMap.of(QueryContexts.USE_PROJECTION, "a_c_sum")
)
)
.setAggregators(
Collections.singletonList(
new LongSumAggregatorFactory("c", "c")
)
)
.setGroupingColumns(Collections.singletonList("a"))
.build();
QueryableIndexCursorFactory cursorFactory = new QueryableIndexCursorFactory(merged);
try (final CursorHolder cursorHolder = cursorFactory.makeCursorHolder(p1Spec)) {
final Cursor cursor = cursorHolder.asCursor();
int rowCount = 0;
while (!cursor.isDone()) {
rowCount++;
cursor.advance();
}
Assert.assertEquals(5, rowCount);
}
try (final CursorHolder cursorHolder = cursorFactory.makeCursorHolder(p2Spec)) {
final Cursor cursor = cursorHolder.asCursor();
int rowCount = 0;
while (!cursor.isDone()) {
rowCount++;
cursor.advance();
}
Assert.assertEquals(3, rowCount);
}
QueryableIndex p1Index = merged.getProjectionQueryableIndex("a_hourly_c_sum");
Assert.assertNotNull(p1Index);
ColumnHolder aHolder = p1Index.getColumnHolder("a");
DictionaryEncodedColumn aCol = (DictionaryEncodedColumn) aHolder.getColumn();
Assert.assertEquals(3, aCol.getCardinality());
QueryableIndex p2Index = merged.getProjectionQueryableIndex("a_c_sum");
Assert.assertNotNull(p2Index);
ColumnHolder aHolder2 = p2Index.getColumnHolder("a");
DictionaryEncodedColumn aCol2 = (DictionaryEncodedColumn) aHolder2.getColumn();
Assert.assertEquals(3, aCol2.getCardinality());
if (serdeFactory != null) {
BitmapResultFactory resultFactory = new DefaultBitmapResultFactory(serdeFactory.getBitmapFactory());
Assert.assertEquals(
2,
resultFactory.toImmutableBitmap(
aHolder.getIndexSupplier()
.as(ValueIndexes.class)
.forValue("a", ColumnType.STRING)
.computeBitmapResult(resultFactory, false)
).size()
);
Assert.assertEquals(
1,
resultFactory.toImmutableBitmap(
aHolder2.getIndexSupplier()
.as(ValueIndexes.class)
.forValue("a", ColumnType.STRING)
.computeBitmapResult(resultFactory, false)
).size()
);
}
}
private QueryableIndex persistAndLoad(List<DimensionSchema> schema, InputRow... rows) throws IOException
{

View File

@ -38,7 +38,6 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.nio.ByteOrder;
import java.nio.file.Path;
public class DictionaryIdLookupTest extends InitializedNullHandlingTest
{
@ -92,12 +91,12 @@ public class DictionaryIdLookupTest extends InitializedNullHandlingTest
4
);
Path dictTempPath = temp.newFolder().toPath();
File dictTempDir = temp.newFolder();
// make lookup with references to writers
DictionaryIdLookup idLookup = new DictionaryIdLookup(
"test",
dictTempPath,
dictTempDir,
stringWriter,
longWriter,
doubleWriter,
@ -110,7 +109,7 @@ public class DictionaryIdLookupTest extends InitializedNullHandlingTest
doubleWriter.open();
arrayWriter.open();
File tempDir = dictTempPath.toFile();
File tempDir = dictTempDir;
Assert.assertEquals(0, tempDir.listFiles().length);
for (String s : sortedValueDictionary.getSortedStrings()) {

View File

@ -197,7 +197,7 @@ public class NestedDataColumnSupplierTest extends InitializedNullHandlingTest
SortedValueDictionary globalDictionarySortedCollector = mergable.getValueDictionary();
mergable.mergeFieldsInto(sortedFields);
serializer.openDictionaryWriter();
serializer.openDictionaryWriter(tempFolder.newFolder());
serializer.serializeFields(sortedFields);
serializer.serializeDictionaries(
globalDictionarySortedCollector.getSortedStrings(),

View File

@ -150,7 +150,7 @@ public class ScalarDoubleColumnSupplierTest extends InitializedNullHandlingTest
SortedValueDictionary globalDictionarySortedCollector = mergable.getValueDictionary();
mergable.mergeFieldsInto(sortedFields);
serializer.openDictionaryWriter();
serializer.openDictionaryWriter(tempFolder.newFolder());
serializer.serializeDictionaries(
globalDictionarySortedCollector.getSortedStrings(),
globalDictionarySortedCollector.getSortedLongs(),

View File

@ -150,7 +150,7 @@ public class ScalarLongColumnSupplierTest extends InitializedNullHandlingTest
SortedValueDictionary globalDictionarySortedCollector = mergable.getValueDictionary();
mergable.mergeFieldsInto(sortedFields);
serializer.openDictionaryWriter();
serializer.openDictionaryWriter(tempFolder.newFolder());
serializer.serializeDictionaries(
globalDictionarySortedCollector.getSortedStrings(),
globalDictionarySortedCollector.getSortedLongs(),

View File

@ -150,7 +150,7 @@ public class ScalarStringColumnSupplierTest extends InitializedNullHandlingTest
SortedValueDictionary globalDictionarySortedCollector = mergable.getValueDictionary();
mergable.mergeFieldsInto(sortedFields);
serializer.openDictionaryWriter();
serializer.openDictionaryWriter(tempFolder.newFolder());
serializer.serializeDictionaries(
globalDictionarySortedCollector.getSortedStrings(),
globalDictionarySortedCollector.getSortedLongs(),

View File

@ -275,7 +275,7 @@ public class VariantColumnSupplierTest extends InitializedNullHandlingTest
closer
);
serializer.openDictionaryWriter();
serializer.openDictionaryWriter(tempFolder.newFolder());
serializer.serializeDictionaries(
globalDictionarySortedCollector.getSortedStrings(),
globalDictionarySortedCollector.getSortedLongs(),