mirror of https://github.com/apache/druid.git
1) Adjust IndexMerger to create convert the indexes it creates from the old format to the new. This is done quite sub-optimally, but it will work for now...
This commit is contained in:
parent
998f0bf3c8
commit
8b31d8db9f
|
@ -332,7 +332,7 @@ public class IndexIO
|
||||||
throw new UnsupportedOperationException("Shouldn't ever happen in a cluster that is not owned by MMX.");
|
throw new UnsupportedOperationException("Shouldn't ever happen in a cluster that is not owned by MMX.");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void convertV8toV9(File v8Dir, File v9Dir) throws IOException
|
public static void convertV8toV9(File v8Dir, File v9Dir) throws IOException
|
||||||
{
|
{
|
||||||
log.info("Converting v8[%s] to v9[%s]", v8Dir, v9Dir);
|
log.info("Converting v8[%s] to v9[%s]", v8Dir, v9Dir);
|
||||||
|
|
||||||
|
|
|
@ -384,7 +384,6 @@ public class IndexMerger
|
||||||
final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn
|
final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn
|
||||||
) throws IOException
|
) throws IOException
|
||||||
{
|
{
|
||||||
// TODO: make v9 index, complain to Eric when you see this, cause he should be doing it.
|
|
||||||
Map<String, String> metricTypes = Maps.newTreeMap(Ordering.<String>natural().nullsFirst());
|
Map<String, String> metricTypes = Maps.newTreeMap(Ordering.<String>natural().nullsFirst());
|
||||||
for (IndexableAdapter adapter : indexes) {
|
for (IndexableAdapter adapter : indexes) {
|
||||||
for (String metric : adapter.getAvailableMetrics()) {
|
for (String metric : adapter.getAvailableMetrics()) {
|
||||||
|
@ -392,11 +391,12 @@ public class IndexMerger
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
final Interval dataInterval;
|
final Interval dataInterval;
|
||||||
|
File v8OutDir = new File(outDir, "v8-tmp");
|
||||||
|
|
||||||
/************* Main index.drd file **************/
|
/************* Main index.drd file **************/
|
||||||
progress.progress();
|
progress.progress();
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
File indexFile = new File(outDir, "index.drd");
|
File indexFile = new File(v8OutDir, "index.drd");
|
||||||
|
|
||||||
FileOutputStream fileOutputStream = null;
|
FileOutputStream fileOutputStream = null;
|
||||||
FileChannel channel = null;
|
FileChannel channel = null;
|
||||||
|
@ -426,7 +426,7 @@ public class IndexMerger
|
||||||
fileOutputStream = null;
|
fileOutputStream = null;
|
||||||
}
|
}
|
||||||
IndexIO.checkFileSize(indexFile);
|
IndexIO.checkFileSize(indexFile);
|
||||||
log.info("outDir[%s] completed index.drd in %,d millis.", outDir, System.currentTimeMillis() - startTime);
|
log.info("outDir[%s] completed index.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
|
||||||
|
|
||||||
/************* Setup Dim Conversions **************/
|
/************* Setup Dim Conversions **************/
|
||||||
progress.progress();
|
progress.progress();
|
||||||
|
@ -499,7 +499,7 @@ public class IndexMerger
|
||||||
}
|
}
|
||||||
dimensionCardinalities.put(dimension, count);
|
dimensionCardinalities.put(dimension, count);
|
||||||
|
|
||||||
FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(outDir, dimension), true);
|
FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(v8OutDir, dimension), true);
|
||||||
dimOuts.add(dimOut);
|
dimOuts.add(dimOut);
|
||||||
|
|
||||||
writer.close();
|
writer.close();
|
||||||
|
@ -514,7 +514,7 @@ public class IndexMerger
|
||||||
|
|
||||||
ioPeon.cleanup();
|
ioPeon.cleanup();
|
||||||
}
|
}
|
||||||
log.info("outDir[%s] completed dim conversions in %,d millis.", outDir, System.currentTimeMillis() - startTime);
|
log.info("outDir[%s] completed dim conversions in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
|
||||||
|
|
||||||
/************* Walk through data sets and merge them *************/
|
/************* Walk through data sets and merge them *************/
|
||||||
progress.progress();
|
progress.progress();
|
||||||
|
@ -595,7 +595,7 @@ public class IndexMerger
|
||||||
String metric = entry.getKey();
|
String metric = entry.getKey();
|
||||||
String typeName = entry.getValue();
|
String typeName = entry.getValue();
|
||||||
if ("float".equals(typeName)) {
|
if ("float".equals(typeName)) {
|
||||||
metWriters.add(new FloatMetricColumnSerializer(metric, outDir, ioPeon));
|
metWriters.add(new FloatMetricColumnSerializer(metric, v8OutDir, ioPeon));
|
||||||
} else {
|
} else {
|
||||||
ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
|
ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
|
||||||
|
|
||||||
|
@ -603,7 +603,7 @@ public class IndexMerger
|
||||||
throw new ISE("Unknown type[%s]", typeName);
|
throw new ISE("Unknown type[%s]", typeName);
|
||||||
}
|
}
|
||||||
|
|
||||||
metWriters.add(new ComplexMetricColumnSerializer(metric, outDir, ioPeon, serde));
|
metWriters.add(new ComplexMetricColumnSerializer(metric, v8OutDir, ioPeon, serde));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (MetricColumnSerializer metWriter : metWriters) {
|
for (MetricColumnSerializer metWriter : metWriters) {
|
||||||
|
@ -650,7 +650,7 @@ public class IndexMerger
|
||||||
|
|
||||||
if ((++rowCount % 500000) == 0) {
|
if ((++rowCount % 500000) == 0) {
|
||||||
log.info(
|
log.info(
|
||||||
"outDir[%s] walked 500,000/%,d rows in %,d millis.", outDir, rowCount, System.currentTimeMillis() - time
|
"outDir[%s] walked 500,000/%,d rows in %,d millis.", v8OutDir, rowCount, System.currentTimeMillis() - time
|
||||||
);
|
);
|
||||||
time = System.currentTimeMillis();
|
time = System.currentTimeMillis();
|
||||||
}
|
}
|
||||||
|
@ -660,13 +660,13 @@ public class IndexMerger
|
||||||
rowNumConversion.rewind();
|
rowNumConversion.rewind();
|
||||||
}
|
}
|
||||||
|
|
||||||
final File littleEndianFile = IndexIO.makeTimeFile(outDir, ByteOrder.LITTLE_ENDIAN);
|
final File littleEndianFile = IndexIO.makeTimeFile(v8OutDir, ByteOrder.LITTLE_ENDIAN);
|
||||||
littleEndianFile.delete();
|
littleEndianFile.delete();
|
||||||
OutputSupplier<FileOutputStream> out = Files.newOutputStreamSupplier(littleEndianFile, true);
|
OutputSupplier<FileOutputStream> out = Files.newOutputStreamSupplier(littleEndianFile, true);
|
||||||
littleEndianTimeWriter.closeAndConsolidate(out);
|
littleEndianTimeWriter.closeAndConsolidate(out);
|
||||||
IndexIO.checkFileSize(littleEndianFile);
|
IndexIO.checkFileSize(littleEndianFile);
|
||||||
|
|
||||||
final File bigEndianFile = IndexIO.makeTimeFile(outDir, ByteOrder.BIG_ENDIAN);
|
final File bigEndianFile = IndexIO.makeTimeFile(v8OutDir, ByteOrder.BIG_ENDIAN);
|
||||||
bigEndianFile.delete();
|
bigEndianFile.delete();
|
||||||
out = Files.newOutputStreamSupplier(bigEndianFile, true);
|
out = Files.newOutputStreamSupplier(bigEndianFile, true);
|
||||||
bigEndianTimeWriter.closeAndConsolidate(out);
|
bigEndianTimeWriter.closeAndConsolidate(out);
|
||||||
|
@ -684,7 +684,7 @@ public class IndexMerger
|
||||||
ioPeon.cleanup();
|
ioPeon.cleanup();
|
||||||
log.info(
|
log.info(
|
||||||
"outDir[%s] completed walk through of %,d rows in %,d millis.",
|
"outDir[%s] completed walk through of %,d rows in %,d millis.",
|
||||||
outDir,
|
v8OutDir,
|
||||||
rowCount,
|
rowCount,
|
||||||
System.currentTimeMillis() - startTime
|
System.currentTimeMillis() - startTime
|
||||||
);
|
);
|
||||||
|
@ -692,7 +692,7 @@ public class IndexMerger
|
||||||
/************ Create Inverted Indexes *************/
|
/************ Create Inverted Indexes *************/
|
||||||
startTime = System.currentTimeMillis();
|
startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
final File invertedFile = new File(outDir, "inverted.drd");
|
final File invertedFile = new File(v8OutDir, "inverted.drd");
|
||||||
Files.touch(invertedFile);
|
Files.touch(invertedFile);
|
||||||
out = Files.newOutputStreamSupplier(invertedFile, true);
|
out = Files.newOutputStreamSupplier(invertedFile, true);
|
||||||
for (int i = 0; i < mergedDimensions.size(); ++i) {
|
for (int i = 0; i < mergedDimensions.size(); ++i) {
|
||||||
|
@ -725,10 +725,7 @@ public class IndexMerger
|
||||||
}
|
}
|
||||||
|
|
||||||
ConciseSet bitset = new ConciseSet();
|
ConciseSet bitset = new ConciseSet();
|
||||||
for (Integer row : CombiningIterable.createSplatted(
|
for (Integer row : CombiningIterable.createSplatted(convertedInverteds, Ordering.<Integer>natural().nullsFirst())) {
|
||||||
convertedInverteds,
|
|
||||||
Ordering.<Integer>natural().nullsFirst()
|
|
||||||
)) {
|
|
||||||
if (row != INVALID_ROW) {
|
if (row != INVALID_ROW) {
|
||||||
bitset.add(row);
|
bitset.add(row);
|
||||||
}
|
}
|
||||||
|
@ -744,7 +741,7 @@ public class IndexMerger
|
||||||
|
|
||||||
log.info("Completed dimension[%s] in %,d millis.", dimension, System.currentTimeMillis() - dimStartTime);
|
log.info("Completed dimension[%s] in %,d millis.", dimension, System.currentTimeMillis() - dimStartTime);
|
||||||
}
|
}
|
||||||
log.info("outDir[%s] completed inverted.drd in %,d millis.", outDir, System.currentTimeMillis() - startTime);
|
log.info("outDir[%s] completed inverted.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
|
||||||
|
|
||||||
final ArrayList<String> expectedFiles = Lists.newArrayList(
|
final ArrayList<String> expectedFiles = Lists.newArrayList(
|
||||||
Iterables.concat(
|
Iterables.concat(
|
||||||
|
@ -759,18 +756,18 @@ public class IndexMerger
|
||||||
|
|
||||||
Map<String, File> files = Maps.newLinkedHashMap();
|
Map<String, File> files = Maps.newLinkedHashMap();
|
||||||
for (String fileName : expectedFiles) {
|
for (String fileName : expectedFiles) {
|
||||||
files.put(fileName, new File(outDir, fileName));
|
files.put(fileName, new File(v8OutDir, fileName));
|
||||||
}
|
}
|
||||||
|
|
||||||
File smooshDir = new File(outDir, "smoosher");
|
File smooshDir = new File(v8OutDir, "smoosher");
|
||||||
smooshDir.mkdir();
|
smooshDir.mkdir();
|
||||||
|
|
||||||
for (Map.Entry<String, File> entry : Smoosh.smoosh(outDir, smooshDir, files).entrySet()) {
|
for (Map.Entry<String, File> entry : Smoosh.smoosh(v8OutDir, smooshDir, files).entrySet()) {
|
||||||
entry.getValue().delete();
|
entry.getValue().delete();
|
||||||
}
|
}
|
||||||
|
|
||||||
for (File file : smooshDir.listFiles()) {
|
for (File file : smooshDir.listFiles()) {
|
||||||
Files.move(file, new File(outDir, file.getName()));
|
Files.move(file, new File(v8OutDir, file.getName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!smooshDir.delete()) {
|
if (!smooshDir.delete()) {
|
||||||
|
@ -780,12 +777,15 @@ public class IndexMerger
|
||||||
|
|
||||||
createIndexDrdFile(
|
createIndexDrdFile(
|
||||||
IndexIO.CURRENT_VERSION_ID,
|
IndexIO.CURRENT_VERSION_ID,
|
||||||
outDir,
|
v8OutDir,
|
||||||
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.stringStrategy),
|
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.stringStrategy),
|
||||||
GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.stringStrategy),
|
GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.stringStrategy),
|
||||||
dataInterval
|
dataInterval
|
||||||
);
|
);
|
||||||
|
|
||||||
|
IndexIO.DefaultIndexIOHandler.convertV8toV9(v8OutDir, outDir);
|
||||||
|
FileUtils.deleteDirectory(v8OutDir);
|
||||||
|
|
||||||
return outDir;
|
return outDir;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue