From 8b31d8db9f93ec1a2e3441e77c367862c037720c Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Wed, 16 Jan 2013 10:01:46 -0600 Subject: [PATCH] 1) Adjust IndexMerger to create convert the indexes it creates from the old format to the new. This is done quite sub-optimally, but it will work for now... --- .../com/metamx/druid/index/v1/IndexIO.java | 2 +- .../metamx/druid/index/v1/IndexMerger.java | 44 +++++++++---------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java b/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java index 2afcbed6446..6836b9233c5 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java @@ -332,7 +332,7 @@ public class IndexIO throw new UnsupportedOperationException("Shouldn't ever happen in a cluster that is not owned by MMX."); } - public void convertV8toV9(File v8Dir, File v9Dir) throws IOException + public static void convertV8toV9(File v8Dir, File v9Dir) throws IOException { log.info("Converting v8[%s] to v9[%s]", v8Dir, v9Dir); diff --git a/server/src/main/java/com/metamx/druid/index/v1/IndexMerger.java b/server/src/main/java/com/metamx/druid/index/v1/IndexMerger.java index 10613561daa..6f9892ee805 100644 --- a/server/src/main/java/com/metamx/druid/index/v1/IndexMerger.java +++ b/server/src/main/java/com/metamx/druid/index/v1/IndexMerger.java @@ -384,7 +384,6 @@ public class IndexMerger final Function>, Iterable> rowMergerFn ) throws IOException { - // TODO: make v9 index, complain to Eric when you see this, cause he should be doing it. Map metricTypes = Maps.newTreeMap(Ordering.natural().nullsFirst()); for (IndexableAdapter adapter : indexes) { for (String metric : adapter.getAvailableMetrics()) { @@ -392,11 +391,12 @@ public class IndexMerger } } final Interval dataInterval; + File v8OutDir = new File(outDir, "v8-tmp"); /************* Main index.drd file **************/ progress.progress(); long startTime = System.currentTimeMillis(); - File indexFile = new File(outDir, "index.drd"); + File indexFile = new File(v8OutDir, "index.drd"); FileOutputStream fileOutputStream = null; FileChannel channel = null; @@ -426,7 +426,7 @@ public class IndexMerger fileOutputStream = null; } IndexIO.checkFileSize(indexFile); - log.info("outDir[%s] completed index.drd in %,d millis.", outDir, System.currentTimeMillis() - startTime); + log.info("outDir[%s] completed index.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime); /************* Setup Dim Conversions **************/ progress.progress(); @@ -499,7 +499,7 @@ public class IndexMerger } dimensionCardinalities.put(dimension, count); - FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(outDir, dimension), true); + FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(v8OutDir, dimension), true); dimOuts.add(dimOut); writer.close(); @@ -514,7 +514,7 @@ public class IndexMerger ioPeon.cleanup(); } - log.info("outDir[%s] completed dim conversions in %,d millis.", outDir, System.currentTimeMillis() - startTime); + log.info("outDir[%s] completed dim conversions in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime); /************* Walk through data sets and merge them *************/ progress.progress(); @@ -595,7 +595,7 @@ public class IndexMerger String metric = entry.getKey(); String typeName = entry.getValue(); if ("float".equals(typeName)) { - metWriters.add(new FloatMetricColumnSerializer(metric, outDir, ioPeon)); + metWriters.add(new FloatMetricColumnSerializer(metric, v8OutDir, ioPeon)); } else { ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); @@ -603,7 +603,7 @@ public class IndexMerger throw new ISE("Unknown type[%s]", typeName); } - metWriters.add(new ComplexMetricColumnSerializer(metric, outDir, ioPeon, serde)); + metWriters.add(new ComplexMetricColumnSerializer(metric, v8OutDir, ioPeon, serde)); } } for (MetricColumnSerializer metWriter : metWriters) { @@ -650,7 +650,7 @@ public class IndexMerger if ((++rowCount % 500000) == 0) { log.info( - "outDir[%s] walked 500,000/%,d rows in %,d millis.", outDir, rowCount, System.currentTimeMillis() - time + "outDir[%s] walked 500,000/%,d rows in %,d millis.", v8OutDir, rowCount, System.currentTimeMillis() - time ); time = System.currentTimeMillis(); } @@ -660,13 +660,13 @@ public class IndexMerger rowNumConversion.rewind(); } - final File littleEndianFile = IndexIO.makeTimeFile(outDir, ByteOrder.LITTLE_ENDIAN); + final File littleEndianFile = IndexIO.makeTimeFile(v8OutDir, ByteOrder.LITTLE_ENDIAN); littleEndianFile.delete(); OutputSupplier out = Files.newOutputStreamSupplier(littleEndianFile, true); littleEndianTimeWriter.closeAndConsolidate(out); IndexIO.checkFileSize(littleEndianFile); - final File bigEndianFile = IndexIO.makeTimeFile(outDir, ByteOrder.BIG_ENDIAN); + final File bigEndianFile = IndexIO.makeTimeFile(v8OutDir, ByteOrder.BIG_ENDIAN); bigEndianFile.delete(); out = Files.newOutputStreamSupplier(bigEndianFile, true); bigEndianTimeWriter.closeAndConsolidate(out); @@ -684,7 +684,7 @@ public class IndexMerger ioPeon.cleanup(); log.info( "outDir[%s] completed walk through of %,d rows in %,d millis.", - outDir, + v8OutDir, rowCount, System.currentTimeMillis() - startTime ); @@ -692,7 +692,7 @@ public class IndexMerger /************ Create Inverted Indexes *************/ startTime = System.currentTimeMillis(); - final File invertedFile = new File(outDir, "inverted.drd"); + final File invertedFile = new File(v8OutDir, "inverted.drd"); Files.touch(invertedFile); out = Files.newOutputStreamSupplier(invertedFile, true); for (int i = 0; i < mergedDimensions.size(); ++i) { @@ -725,10 +725,7 @@ public class IndexMerger } ConciseSet bitset = new ConciseSet(); - for (Integer row : CombiningIterable.createSplatted( - convertedInverteds, - Ordering.natural().nullsFirst() - )) { + for (Integer row : CombiningIterable.createSplatted(convertedInverteds, Ordering.natural().nullsFirst())) { if (row != INVALID_ROW) { bitset.add(row); } @@ -744,7 +741,7 @@ public class IndexMerger log.info("Completed dimension[%s] in %,d millis.", dimension, System.currentTimeMillis() - dimStartTime); } - log.info("outDir[%s] completed inverted.drd in %,d millis.", outDir, System.currentTimeMillis() - startTime); + log.info("outDir[%s] completed inverted.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime); final ArrayList expectedFiles = Lists.newArrayList( Iterables.concat( @@ -759,18 +756,18 @@ public class IndexMerger Map files = Maps.newLinkedHashMap(); for (String fileName : expectedFiles) { - files.put(fileName, new File(outDir, fileName)); + files.put(fileName, new File(v8OutDir, fileName)); } - File smooshDir = new File(outDir, "smoosher"); + File smooshDir = new File(v8OutDir, "smoosher"); smooshDir.mkdir(); - for (Map.Entry entry : Smoosh.smoosh(outDir, smooshDir, files).entrySet()) { + for (Map.Entry entry : Smoosh.smoosh(v8OutDir, smooshDir, files).entrySet()) { entry.getValue().delete(); } for (File file : smooshDir.listFiles()) { - Files.move(file, new File(outDir, file.getName())); + Files.move(file, new File(v8OutDir, file.getName())); } if (!smooshDir.delete()) { @@ -780,12 +777,15 @@ public class IndexMerger createIndexDrdFile( IndexIO.CURRENT_VERSION_ID, - outDir, + v8OutDir, GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.stringStrategy), GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.stringStrategy), dataInterval ); + IndexIO.DefaultIndexIOHandler.convertV8toV9(v8OutDir, outDir); + FileUtils.deleteDirectory(v8OutDir); + return outDir; }