From 1b3921faad1d336039898e8446c756907cb9db76 Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 23 Oct 2014 14:33:25 -0700 Subject: [PATCH] fix for long columns --- .../java/io/druid/segment/IndexMerger.java | 2 + .../segment/LongMetricColumnSerializer.java | 81 +++++++++++++++++++ .../java/io/druid/segment/MetricHolder.java | 11 +++ 3 files changed, 94 insertions(+) create mode 100644 processing/src/main/java/io/druid/segment/LongMetricColumnSerializer.java diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index fbfe01e976d..98828642036 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -635,6 +635,8 @@ public class IndexMerger for (String metric : mergedMetrics) { ValueType type = valueTypes.get(metric); switch (type) { + case LONG: + metWriters.add(new LongMetricColumnSerializer(metric, v8OutDir, ioPeon)); case FLOAT: metWriters.add(new FloatMetricColumnSerializer(metric, v8OutDir, ioPeon)); break; diff --git a/processing/src/main/java/io/druid/segment/LongMetricColumnSerializer.java b/processing/src/main/java/io/druid/segment/LongMetricColumnSerializer.java new file mode 100644 index 00000000000..bcb1a39d007 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/LongMetricColumnSerializer.java @@ -0,0 +1,81 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment; + +import com.google.common.io.Files; +import io.druid.segment.data.CompressedLongsSupplierSerializer; +import io.druid.segment.data.CompressedObjectStrategy; +import io.druid.segment.data.IOPeon; + +import java.io.File; +import java.io.IOException; + +/** + */ +public class LongMetricColumnSerializer implements MetricColumnSerializer +{ + private final String metricName; + private final IOPeon ioPeon; + private final File outDir; + + private CompressedLongsSupplierSerializer writer; + + public LongMetricColumnSerializer( + String metricName, + File outDir, + IOPeon ioPeon + ) + { + this.metricName = metricName; + this.ioPeon = ioPeon; + this.outDir = outDir; + } + + @Override + public void open() throws IOException + { + writer = CompressedLongsSupplierSerializer.create( + ioPeon, String.format("%s_little", metricName), IndexIO.BYTE_ORDER, + CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY + ); + + writer.open(); + } + + @Override + public void serialize(Object obj) throws IOException + { + long val = (obj == null) ? 0 : ((Number) obj).longValue(); + writer.add(val); + } + + @Override + public void close() throws IOException + { + final File outFile = IndexIO.makeMetricFile(outDir, metricName, IndexIO.BYTE_ORDER); + outFile.delete(); + MetricHolder.writeLongMetric( + Files.newOutputStreamSupplier(outFile, true), metricName, writer + ); + IndexIO.checkFileSize(outFile); + + writer = null; + } +} diff --git a/processing/src/main/java/io/druid/segment/MetricHolder.java b/processing/src/main/java/io/druid/segment/MetricHolder.java index 2627444e758..c0608ffe151 100644 --- a/processing/src/main/java/io/druid/segment/MetricHolder.java +++ b/processing/src/main/java/io/druid/segment/MetricHolder.java @@ -28,6 +28,7 @@ import com.metamx.common.guava.CloseQuietly; import io.druid.common.utils.SerializerUtils; import io.druid.segment.data.CompressedFloatsIndexedSupplier; import io.druid.segment.data.CompressedFloatsSupplierSerializer; +import io.druid.segment.data.CompressedLongsSupplierSerializer; import io.druid.segment.data.GenericIndexed; import io.druid.segment.data.GenericIndexedWriter; import io.druid.segment.data.Indexed; @@ -99,6 +100,16 @@ public class MetricHolder column.closeAndConsolidate(outSupplier); } + public static void writeLongMetric( + OutputSupplier outSupplier, String name, CompressedLongsSupplierSerializer column + ) throws IOException + { + ByteStreams.write(version, outSupplier); + serializerUtils.writeString(outSupplier, name); + serializerUtils.writeString(outSupplier, "long"); + column.closeAndConsolidate(outSupplier); + } + public static void writeToChannel(MetricHolder holder, WritableByteChannel out) throws IOException { out.write(ByteBuffer.wrap(version));