fix for long columns

This commit is contained in:
fjy 2014-10-23 14:33:25 -07:00
parent 1ebc8a2a72
commit 1b3921faad
3 changed files with 94 additions and 0 deletions

View File

@ -635,6 +635,8 @@ public class IndexMerger
for (String metric : mergedMetrics) { for (String metric : mergedMetrics) {
ValueType type = valueTypes.get(metric); ValueType type = valueTypes.get(metric);
switch (type) { switch (type) {
case LONG:
metWriters.add(new LongMetricColumnSerializer(metric, v8OutDir, ioPeon));
case FLOAT: case FLOAT:
metWriters.add(new FloatMetricColumnSerializer(metric, v8OutDir, ioPeon)); metWriters.add(new FloatMetricColumnSerializer(metric, v8OutDir, ioPeon));
break; break;

View File

@ -0,0 +1,81 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment;
import com.google.common.io.Files;
import io.druid.segment.data.CompressedLongsSupplierSerializer;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.IOPeon;
import java.io.File;
import java.io.IOException;
/**
*/
public class LongMetricColumnSerializer implements MetricColumnSerializer
{
private final String metricName;
private final IOPeon ioPeon;
private final File outDir;
private CompressedLongsSupplierSerializer writer;
public LongMetricColumnSerializer(
String metricName,
File outDir,
IOPeon ioPeon
)
{
this.metricName = metricName;
this.ioPeon = ioPeon;
this.outDir = outDir;
}
@Override
public void open() throws IOException
{
writer = CompressedLongsSupplierSerializer.create(
ioPeon, String.format("%s_little", metricName), IndexIO.BYTE_ORDER,
CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY
);
writer.open();
}
@Override
public void serialize(Object obj) throws IOException
{
long val = (obj == null) ? 0 : ((Number) obj).longValue();
writer.add(val);
}
@Override
public void close() throws IOException
{
final File outFile = IndexIO.makeMetricFile(outDir, metricName, IndexIO.BYTE_ORDER);
outFile.delete();
MetricHolder.writeLongMetric(
Files.newOutputStreamSupplier(outFile, true), metricName, writer
);
IndexIO.checkFileSize(outFile);
writer = null;
}
}

View File

@ -28,6 +28,7 @@ import com.metamx.common.guava.CloseQuietly;
import io.druid.common.utils.SerializerUtils; import io.druid.common.utils.SerializerUtils;
import io.druid.segment.data.CompressedFloatsIndexedSupplier; import io.druid.segment.data.CompressedFloatsIndexedSupplier;
import io.druid.segment.data.CompressedFloatsSupplierSerializer; import io.druid.segment.data.CompressedFloatsSupplierSerializer;
import io.druid.segment.data.CompressedLongsSupplierSerializer;
import io.druid.segment.data.GenericIndexed; import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.GenericIndexedWriter; import io.druid.segment.data.GenericIndexedWriter;
import io.druid.segment.data.Indexed; import io.druid.segment.data.Indexed;
@ -99,6 +100,16 @@ public class MetricHolder
column.closeAndConsolidate(outSupplier); column.closeAndConsolidate(outSupplier);
} }
public static void writeLongMetric(
OutputSupplier<? extends OutputStream> outSupplier, String name, CompressedLongsSupplierSerializer column
) throws IOException
{
ByteStreams.write(version, outSupplier);
serializerUtils.writeString(outSupplier, name);
serializerUtils.writeString(outSupplier, "long");
column.closeAndConsolidate(outSupplier);
}
public static void writeToChannel(MetricHolder holder, WritableByteChannel out) throws IOException public static void writeToChannel(MetricHolder holder, WritableByteChannel out) throws IOException
{ {
out.write(ByteBuffer.wrap(version)); out.write(ByteBuffer.wrap(version));