Merge pull request #2675 from binlijin/clean_temp_file

clean tmp file when index merge fail
This commit is contained in:
Fangjin Yang 2016-03-23 09:09:07 -07:00
commit bbf08fcc24
3 changed files with 515 additions and 444 deletions

View File

@ -34,6 +34,7 @@ import com.google.common.collect.Ordering;
import com.google.common.collect.PeekingIterator;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closer;
import com.google.common.io.Files;
import com.google.common.io.OutputSupplier;
import com.google.common.primitives.Ints;
@ -85,6 +86,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@ -604,79 +606,95 @@ public class IndexMerger
}
}
Closer closer = Closer.create();
final Interval dataInterval;
File v8OutDir = new File(outDir, "v8-tmp");
final File v8OutDir = new File(outDir, "v8-tmp");
v8OutDir.mkdirs();
/************* Main index.drd file **************/
progress.progress();
long startTime = System.currentTimeMillis();
File indexFile = new File(v8OutDir, "index.drd");
try (FileOutputStream fileOutputStream = new FileOutputStream(indexFile);
FileChannel channel = fileOutputStream.getChannel()) {
channel.write(ByteBuffer.wrap(new byte[]{IndexIO.V8_VERSION}));
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.STRING_STRATEGY).writeToChannel(channel);
GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.STRING_STRATEGY).writeToChannel(channel);
DateTime minTime = new DateTime(JodaUtils.MAX_INSTANT);
DateTime maxTime = new DateTime(JodaUtils.MIN_INSTANT);
for (IndexableAdapter index : indexes) {
minTime = JodaUtils.minDateTime(minTime, index.getDataInterval().getStart());
maxTime = JodaUtils.maxDateTime(maxTime, index.getDataInterval().getEnd());
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
FileUtils.deleteDirectory(v8OutDir);
}
});
final IOPeon ioPeon = new TmpFileIOPeon();
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
ioPeon.cleanup();
}
});
try {
/************* Main index.drd file **************/
progress.progress();
long startTime = System.currentTimeMillis();
File indexFile = new File(v8OutDir, "index.drd");
dataInterval = new Interval(minTime, maxTime);
serializerUtils.writeString(channel, String.format("%s/%s", minTime, maxTime));
serializerUtils.writeString(channel, mapper.writeValueAsString(indexSpec.getBitmapSerdeFactory()));
}
IndexIO.checkFileSize(indexFile);
log.info("outDir[%s] completed index.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
try (FileOutputStream fileOutputStream = new FileOutputStream(indexFile);
FileChannel channel = fileOutputStream.getChannel()) {
channel.write(ByteBuffer.wrap(new byte[]{IndexIO.V8_VERSION}));
/************* Setup Dim Conversions **************/
progress.progress();
startTime = System.currentTimeMillis();
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.STRING_STRATEGY).writeToChannel(channel);
GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.STRING_STRATEGY).writeToChannel(channel);
IOPeon ioPeon = new TmpFileIOPeon();
ArrayList<FileOutputSupplier> dimOuts = Lists.newArrayListWithCapacity(mergedDimensions.size());
Map<String, Integer> dimensionCardinalities = Maps.newHashMap();
ArrayList<Map<String, IntBuffer>> dimConversions = Lists.newArrayListWithCapacity(indexes.size());
final ArrayList<Boolean> convertMissingDimsFlags = Lists.newArrayListWithCapacity(mergedDimensions.size());
DateTime minTime = new DateTime(JodaUtils.MAX_INSTANT);
DateTime maxTime = new DateTime(JodaUtils.MIN_INSTANT);
for (int i = 0; i < indexes.size(); ++i) {
dimConversions.add(Maps.<String, IntBuffer>newHashMap());
}
for (String dimension : mergedDimensions) {
final GenericIndexedWriter<String> writer = new GenericIndexedWriter<String>(
ioPeon, dimension, GenericIndexed.STRING_STRATEGY
);
writer.open();
boolean dimHasNull = false;
boolean dimHasValues = false;
boolean dimAbsentFromSomeIndex = false;
int numMergeIndex = 0;
Indexed<String> dimValueLookup = null;
Indexed<String>[] dimValueLookups = new Indexed[indexes.size() + 1];
for (int i = 0; i < indexes.size(); i++) {
Indexed<String> dimValues = indexes.get(i).getDimValueLookup(dimension);
if (!isNullColumn(dimValues)) {
dimHasValues = true;
dimHasNull |= dimValues.indexOf(null) >= 0;
dimValueLookups[i] = dimValueLookup = dimValues;
numMergeIndex++;
} else {
dimAbsentFromSomeIndex = true;
for (IndexableAdapter index : indexes) {
minTime = JodaUtils.minDateTime(minTime, index.getDataInterval().getStart());
maxTime = JodaUtils.maxDateTime(maxTime, index.getDataInterval().getEnd());
}
dataInterval = new Interval(minTime, maxTime);
serializerUtils.writeString(channel, String.format("%s/%s", minTime, maxTime));
serializerUtils.writeString(channel, mapper.writeValueAsString(indexSpec.getBitmapSerdeFactory()));
}
IndexIO.checkFileSize(indexFile);
log.info("outDir[%s] completed index.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
/************* Setup Dim Conversions **************/
progress.progress();
startTime = System.currentTimeMillis();
ArrayList<FileOutputSupplier> dimOuts = Lists.newArrayListWithCapacity(mergedDimensions.size());
Map<String, Integer> dimensionCardinalities = Maps.newHashMap();
ArrayList<Map<String, IntBuffer>> dimConversions = Lists.newArrayListWithCapacity(indexes.size());
final ArrayList<Boolean> convertMissingDimsFlags = Lists.newArrayListWithCapacity(mergedDimensions.size());
for (int i = 0; i < indexes.size(); ++i) {
dimConversions.add(Maps.<String, IntBuffer>newHashMap());
}
boolean convertMissingDims = dimHasValues && dimAbsentFromSomeIndex;
convertMissingDimsFlags.add(convertMissingDims);
for (String dimension : mergedDimensions) {
final GenericIndexedWriter<String> writer = new GenericIndexedWriter<String>(
ioPeon, dimension, GenericIndexed.STRING_STRATEGY
);
writer.open();
boolean dimHasNull = false;
boolean dimHasValues = false;
boolean dimAbsentFromSomeIndex = false;
int numMergeIndex = 0;
Indexed<String> dimValueLookup = null;
Indexed<String>[] dimValueLookups = new Indexed[indexes.size() + 1];
for (int i = 0; i < indexes.size(); i++) {
Indexed<String> dimValues = indexes.get(i).getDimValueLookup(dimension);
if (!isNullColumn(dimValues)) {
dimHasValues = true;
dimHasNull |= dimValues.indexOf(null) >= 0;
dimValueLookups[i] = dimValueLookup = dimValues;
numMergeIndex++;
} else {
dimAbsentFromSomeIndex = true;
}
}
boolean convertMissingDims = dimHasValues && dimAbsentFromSomeIndex;
convertMissingDimsFlags.add(convertMissingDims);
/*
* Ensure the empty str is always in the dictionary if the dimension was missing from one index but
@ -685,331 +703,333 @@ public class IndexMerger
* later on, to allow rows from indexes without a particular dimension to merge correctly with
* rows from indexes with null/empty str values for that dimension.
*/
if (convertMissingDims && !dimHasNull) {
dimValueLookups[indexes.size()] = dimValueLookup = EMPTY_STR_DIM_VAL;
numMergeIndex++;
}
int cardinality = 0;
if (numMergeIndex > 1) {
DictionaryMergeIterator iterator = new DictionaryMergeIterator(dimValueLookups, true);
while (iterator.hasNext()) {
writer.write(iterator.next());
if (convertMissingDims && !dimHasNull) {
dimValueLookups[indexes.size()] = dimValueLookup = EMPTY_STR_DIM_VAL;
numMergeIndex++;
}
for (int i = 0; i < indexes.size(); i++) {
if (dimValueLookups[i] != null && iterator.needConversion(i)) {
dimConversions.get(i).put(dimension, iterator.conversions[i]);
int cardinality = 0;
if (numMergeIndex > 1) {
DictionaryMergeIterator iterator = new DictionaryMergeIterator(dimValueLookups, true);
while (iterator.hasNext()) {
writer.write(iterator.next());
}
for (int i = 0; i < indexes.size(); i++) {
if (dimValueLookups[i] != null && iterator.needConversion(i)) {
dimConversions.get(i).put(dimension, iterator.conversions[i]);
}
}
cardinality = iterator.counter;
} else if (numMergeIndex == 1) {
for (String value : dimValueLookup) {
writer.write(value);
}
cardinality = dimValueLookup.size();
}
dimensionCardinalities.put(dimension, cardinality);
FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(v8OutDir, dimension), true);
dimOuts.add(dimOut);
writer.close();
serializerUtils.writeString(dimOut, dimension);
ByteStreams.copy(writer.combineStreams(), dimOut);
ioPeon.cleanup();
}
log.info("outDir[%s] completed dim conversions in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
/************* Walk through data sets and merge them *************/
progress.progress();
startTime = System.currentTimeMillis();
Iterable<Rowboat> theRows = makeRowIterable(
indexes,
mergedDimensions,
mergedMetrics,
dimConversions,
convertMissingDimsFlags,
rowMergerFn
);
CompressedLongsSupplierSerializer timeWriter = CompressedLongsSupplierSerializer.create(
ioPeon, "little_end_time", IndexIO.BYTE_ORDER, CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY
);
timeWriter.open();
ArrayList<VSizeIndexedWriter> forwardDimWriters = Lists.newArrayListWithCapacity(mergedDimensions.size());
for (String dimension : mergedDimensions) {
VSizeIndexedWriter writer = new VSizeIndexedWriter(ioPeon, dimension, dimensionCardinalities.get(dimension));
writer.open();
forwardDimWriters.add(writer);
}
ArrayList<MetricColumnSerializer> metWriters = Lists.newArrayListWithCapacity(mergedMetrics.size());
for (String metric : mergedMetrics) {
ValueType type = valueTypes.get(metric);
switch (type) {
case LONG:
metWriters.add(new LongMetricColumnSerializer(metric, v8OutDir, ioPeon));
break;
case FLOAT:
metWriters.add(new FloatMetricColumnSerializer(metric, v8OutDir, ioPeon));
break;
case COMPLEX:
final String typeName = metricTypeNames.get(metric);
ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Unknown type[%s]", typeName);
}
metWriters.add(new ComplexMetricColumnSerializer(metric, v8OutDir, ioPeon, serde));
break;
default:
throw new ISE("Unknown type[%s]", type);
}
}
for (MetricColumnSerializer metWriter : metWriters) {
metWriter.open();
}
int rowCount = 0;
long time = System.currentTimeMillis();
List<IntBuffer> rowNumConversions = Lists.newArrayListWithCapacity(indexes.size());
for (IndexableAdapter index : indexes) {
int[] arr = new int[index.getNumRows()];
Arrays.fill(arr, INVALID_ROW);
rowNumConversions.add(IntBuffer.wrap(arr));
}
for (Rowboat theRow : theRows) {
progress.progress();
timeWriter.add(theRow.getTimestamp());
final Object[] metrics = theRow.getMetrics();
for (int i = 0; i < metrics.length; ++i) {
metWriters.get(i).serialize(metrics[i]);
}
int[][] dims = theRow.getDims();
for (int i = 0; i < dims.length; ++i) {
List<Integer> listToWrite = (i >= dims.length || dims[i] == null)
? null
: Ints.asList(dims[i]);
forwardDimWriters.get(i).write(listToWrite);
}
for (Map.Entry<Integer, TreeSet<Integer>> comprisedRow : theRow.getComprisedRows().entrySet()) {
final IntBuffer conversionBuffer = rowNumConversions.get(comprisedRow.getKey());
for (Integer rowNum : comprisedRow.getValue()) {
while (conversionBuffer.position() < rowNum) {
conversionBuffer.put(INVALID_ROW);
}
conversionBuffer.put(rowCount);
}
}
cardinality = iterator.counter;
} else if (numMergeIndex == 1) {
for (String value : dimValueLookup) {
writer.write(value);
if ((++rowCount % 500000) == 0) {
log.info(
"outDir[%s] walked 500,000/%,d rows in %,d millis.", v8OutDir, rowCount, System.currentTimeMillis() - time
);
time = System.currentTimeMillis();
}
cardinality = dimValueLookup.size();
}
dimensionCardinalities.put(dimension, cardinality);
for (IntBuffer rowNumConversion : rowNumConversions) {
rowNumConversion.rewind();
}
FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(v8OutDir, dimension), true);
dimOuts.add(dimOut);
final File timeFile = IndexIO.makeTimeFile(v8OutDir, IndexIO.BYTE_ORDER);
timeFile.delete();
OutputSupplier<FileOutputStream> out = Files.newOutputStreamSupplier(timeFile, true);
timeWriter.closeAndConsolidate(out);
IndexIO.checkFileSize(timeFile);
writer.close();
serializerUtils.writeString(dimOut, dimension);
ByteStreams.copy(writer.combineStreams(), dimOut);
for (int i = 0; i < mergedDimensions.size(); ++i) {
forwardDimWriters.get(i).close();
ByteStreams.copy(forwardDimWriters.get(i).combineStreams(), dimOuts.get(i));
}
for (MetricColumnSerializer metWriter : metWriters) {
metWriter.close();
}
ioPeon.cleanup();
}
log.info("outDir[%s] completed dim conversions in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
/************* Walk through data sets and merge them *************/
progress.progress();
startTime = System.currentTimeMillis();
Iterable<Rowboat> theRows = makeRowIterable(
indexes,
mergedDimensions,
mergedMetrics,
dimConversions,
convertMissingDimsFlags,
rowMergerFn
);
CompressedLongsSupplierSerializer timeWriter = CompressedLongsSupplierSerializer.create(
ioPeon, "little_end_time", IndexIO.BYTE_ORDER, CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY
);
timeWriter.open();
ArrayList<VSizeIndexedWriter> forwardDimWriters = Lists.newArrayListWithCapacity(mergedDimensions.size());
for (String dimension : mergedDimensions) {
VSizeIndexedWriter writer = new VSizeIndexedWriter(ioPeon, dimension, dimensionCardinalities.get(dimension));
writer.open();
forwardDimWriters.add(writer);
}
ArrayList<MetricColumnSerializer> metWriters = Lists.newArrayListWithCapacity(mergedMetrics.size());
for (String metric : mergedMetrics) {
ValueType type = valueTypes.get(metric);
switch (type) {
case LONG:
metWriters.add(new LongMetricColumnSerializer(metric, v8OutDir, ioPeon));
break;
case FLOAT:
metWriters.add(new FloatMetricColumnSerializer(metric, v8OutDir, ioPeon));
break;
case COMPLEX:
final String typeName = metricTypeNames.get(metric);
ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Unknown type[%s]", typeName);
}
metWriters.add(new ComplexMetricColumnSerializer(metric, v8OutDir, ioPeon, serde));
break;
default:
throw new ISE("Unknown type[%s]", type);
}
}
for (MetricColumnSerializer metWriter : metWriters) {
metWriter.open();
}
int rowCount = 0;
long time = System.currentTimeMillis();
List<IntBuffer> rowNumConversions = Lists.newArrayListWithCapacity(indexes.size());
for (IndexableAdapter index : indexes) {
int[] arr = new int[index.getNumRows()];
Arrays.fill(arr, INVALID_ROW);
rowNumConversions.add(IntBuffer.wrap(arr));
}
for (Rowboat theRow : theRows) {
progress.progress();
timeWriter.add(theRow.getTimestamp());
final Object[] metrics = theRow.getMetrics();
for (int i = 0; i < metrics.length; ++i) {
metWriters.get(i).serialize(metrics[i]);
}
int[][] dims = theRow.getDims();
for (int i = 0; i < dims.length; ++i) {
List<Integer> listToWrite = (i >= dims.length || dims[i] == null)
? null
: Ints.asList(dims[i]);
forwardDimWriters.get(i).write(listToWrite);
}
for (Map.Entry<Integer, TreeSet<Integer>> comprisedRow : theRow.getComprisedRows().entrySet()) {
final IntBuffer conversionBuffer = rowNumConversions.get(comprisedRow.getKey());
for (Integer rowNum : comprisedRow.getValue()) {
while (conversionBuffer.position() < rowNum) {
conversionBuffer.put(INVALID_ROW);
}
conversionBuffer.put(rowCount);
}
}
if ((++rowCount % 500000) == 0) {
log.info(
"outDir[%s] walked 500,000/%,d rows in %,d millis.", v8OutDir, rowCount, System.currentTimeMillis() - time
);
time = System.currentTimeMillis();
}
}
for (IntBuffer rowNumConversion : rowNumConversions) {
rowNumConversion.rewind();
}
final File timeFile = IndexIO.makeTimeFile(v8OutDir, IndexIO.BYTE_ORDER);
timeFile.delete();
OutputSupplier<FileOutputStream> out = Files.newOutputStreamSupplier(timeFile, true);
timeWriter.closeAndConsolidate(out);
IndexIO.checkFileSize(timeFile);
for (int i = 0; i < mergedDimensions.size(); ++i) {
forwardDimWriters.get(i).close();
ByteStreams.copy(forwardDimWriters.get(i).combineStreams(), dimOuts.get(i));
}
for (MetricColumnSerializer metWriter : metWriters) {
metWriter.close();
}
ioPeon.cleanup();
log.info(
"outDir[%s] completed walk through of %,d rows in %,d millis.",
v8OutDir,
rowCount,
System.currentTimeMillis() - startTime
);
/************ Create Inverted Indexes *************/
startTime = System.currentTimeMillis();
final File invertedFile = new File(v8OutDir, "inverted.drd");
Files.touch(invertedFile);
out = Files.newOutputStreamSupplier(invertedFile, true);
final File geoFile = new File(v8OutDir, "spatial.drd");
Files.touch(geoFile);
OutputSupplier<FileOutputStream> spatialOut = Files.newOutputStreamSupplier(geoFile, true);
for (int i = 0; i < mergedDimensions.size(); ++i) {
long dimStartTime = System.currentTimeMillis();
String dimension = mergedDimensions.get(i);
File dimOutFile = dimOuts.get(i).getFile();
final MappedByteBuffer dimValsMapped = Files.map(dimOutFile);
if (!dimension.equals(serializerUtils.readString(dimValsMapped))) {
throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension);
}
Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.STRING_STRATEGY);
log.info("Starting dimension[%s] with cardinality[%,d]", dimension, dimVals.size());
final BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory();
GenericIndexedWriter<ImmutableBitmap> writer = new GenericIndexedWriter<>(
ioPeon, dimension, bitmapSerdeFactory.getObjectStrategy()
log.info(
"outDir[%s] completed walk through of %,d rows in %,d millis.",
v8OutDir,
rowCount,
System.currentTimeMillis() - startTime
);
writer.open();
boolean isSpatialDim = columnCapabilities.get(dimension).hasSpatialIndexes();
ByteBufferWriter<ImmutableRTree> spatialWriter = null;
RTree tree = null;
IOPeon spatialIoPeon = new TmpFileIOPeon();
if (isSpatialDim) {
BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory();
spatialWriter = new ByteBufferWriter<ImmutableRTree>(
spatialIoPeon, dimension, new IndexedRTree.ImmutableRTreeObjectStrategy(bitmapFactory)
/************ Create Inverted Indexes *************/
startTime = System.currentTimeMillis();
final File invertedFile = new File(v8OutDir, "inverted.drd");
Files.touch(invertedFile);
out = Files.newOutputStreamSupplier(invertedFile, true);
final File geoFile = new File(v8OutDir, "spatial.drd");
Files.touch(geoFile);
OutputSupplier<FileOutputStream> spatialOut = Files.newOutputStreamSupplier(geoFile, true);
for (int i = 0; i < mergedDimensions.size(); ++i) {
long dimStartTime = System.currentTimeMillis();
String dimension = mergedDimensions.get(i);
File dimOutFile = dimOuts.get(i).getFile();
final MappedByteBuffer dimValsMapped = Files.map(dimOutFile);
if (!dimension.equals(serializerUtils.readString(dimValsMapped))) {
throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension);
}
Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.STRING_STRATEGY);
log.info("Starting dimension[%s] with cardinality[%,d]", dimension, dimVals.size());
final BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory();
GenericIndexedWriter<ImmutableBitmap> writer = new GenericIndexedWriter<>(
ioPeon, dimension, bitmapSerdeFactory.getObjectStrategy()
);
spatialWriter.open();
tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory);
}
writer.open();
IndexSeeker[] dictIdSeeker = toIndexSeekers(indexes, dimConversions, dimension);
//Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result.
for (int dictId = 0; dictId < dimVals.size(); dictId++) {
progress.progress();
List<Iterable<Integer>> convertedInverteds = Lists.newArrayListWithCapacity(indexes.size());
for (int j = 0; j < indexes.size(); ++j) {
int seekedDictId = dictIdSeeker[j].seek(dictId);
if (seekedDictId != IndexSeeker.NOT_EXIST) {
convertedInverteds.add(
new ConvertingIndexedInts(
indexes.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j)
)
);
}
boolean isSpatialDim = columnCapabilities.get(dimension).hasSpatialIndexes();
ByteBufferWriter<ImmutableRTree> spatialWriter = null;
RTree tree = null;
IOPeon spatialIoPeon = new TmpFileIOPeon();
if (isSpatialDim) {
BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory();
spatialWriter = new ByteBufferWriter<ImmutableRTree>(
spatialIoPeon, dimension, new IndexedRTree.ImmutableRTreeObjectStrategy(bitmapFactory)
);
spatialWriter.open();
tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory);
}
MutableBitmap bitset = bitmapSerdeFactory.getBitmapFactory().makeEmptyMutableBitmap();
for (Integer row : CombiningIterable.createSplatted(
convertedInverteds,
Ordering.<Integer>natural().nullsFirst()
)) {
if (row != INVALID_ROW) {
bitset.add(row);
IndexSeeker[] dictIdSeeker = toIndexSeekers(indexes, dimConversions, dimension);
//Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result.
for (int dictId = 0; dictId < dimVals.size(); dictId++) {
progress.progress();
List<Iterable<Integer>> convertedInverteds = Lists.newArrayListWithCapacity(indexes.size());
for (int j = 0; j < indexes.size(); ++j) {
int seekedDictId = dictIdSeeker[j].seek(dictId);
if (seekedDictId != IndexSeeker.NOT_EXIST) {
convertedInverteds.add(
new ConvertingIndexedInts(
indexes.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j)
)
);
}
}
MutableBitmap bitset = bitmapSerdeFactory.getBitmapFactory().makeEmptyMutableBitmap();
for (Integer row : CombiningIterable.createSplatted(
convertedInverteds,
Ordering.<Integer>natural().nullsFirst()
)) {
if (row != INVALID_ROW) {
bitset.add(row);
}
}
writer.write(
bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(bitset)
);
if (isSpatialDim) {
String dimVal = dimVals.get(dictId);
if (dimVal != null) {
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {
coords[j] = Float.valueOf(stringCoords.get(j));
}
tree.insert(coords, bitset);
}
}
}
writer.close();
writer.write(
bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(bitset)
);
serializerUtils.writeString(out, dimension);
ByteStreams.copy(writer.combineStreams(), out);
ioPeon.cleanup();
log.info("Completed dimension[%s] in %,d millis.", dimension, System.currentTimeMillis() - dimStartTime);
if (isSpatialDim) {
String dimVal = dimVals.get(dictId);
if (dimVal != null) {
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {
coords[j] = Float.valueOf(stringCoords.get(j));
}
tree.insert(coords, bitset);
}
spatialWriter.write(ImmutableRTree.newImmutableFromMutable(tree));
spatialWriter.close();
serializerUtils.writeString(spatialOut, dimension);
ByteStreams.copy(spatialWriter.combineStreams(), spatialOut);
spatialIoPeon.cleanup();
}
}
writer.close();
serializerUtils.writeString(out, dimension);
ByteStreams.copy(writer.combineStreams(), out);
ioPeon.cleanup();
log.info("outDir[%s] completed inverted.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
log.info("Completed dimension[%s] in %,d millis.", dimension, System.currentTimeMillis() - dimStartTime);
final ArrayList<String> expectedFiles = Lists.newArrayList(
Iterables.concat(
Arrays.asList(
"index.drd", "inverted.drd", "spatial.drd", String.format("time_%s.drd", IndexIO.BYTE_ORDER)
),
Iterables.transform(mergedDimensions, GuavaUtils.formatFunction("dim_%s.drd")),
Iterables.transform(
mergedMetrics, GuavaUtils.formatFunction(String.format("met_%%s_%s.drd", IndexIO.BYTE_ORDER))
)
)
);
if (isSpatialDim) {
spatialWriter.write(ImmutableRTree.newImmutableFromMutable(tree));
spatialWriter.close();
if (segmentMetadata != null) {
writeMetadataToFile(new File(v8OutDir, "metadata.drd"), segmentMetadata);
log.info("wrote metadata.drd in outDir[%s].", v8OutDir);
serializerUtils.writeString(spatialOut, dimension);
ByteStreams.copy(spatialWriter.combineStreams(), spatialOut);
spatialIoPeon.cleanup();
expectedFiles.add("metadata.drd");
}
Map<String, File> files = Maps.newLinkedHashMap();
for (String fileName : expectedFiles) {
files.put(fileName, new File(v8OutDir, fileName));
}
File smooshDir = new File(v8OutDir, "smoosher");
smooshDir.mkdir();
for (Map.Entry<String, File> entry : Smoosh.smoosh(v8OutDir, smooshDir, files).entrySet()) {
entry.getValue().delete();
}
for (File file : smooshDir.listFiles()) {
Files.move(file, new File(v8OutDir, file.getName()));
}
if (!smooshDir.delete()) {
log.info("Unable to delete temporary dir[%s], contains[%s]", smooshDir, Arrays.asList(smooshDir.listFiles()));
throw new IOException(String.format("Unable to delete temporary dir[%s]", smooshDir));
}
createIndexDrdFile(
IndexIO.V8_VERSION,
v8OutDir,
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.STRING_STRATEGY),
GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.STRING_STRATEGY),
dataInterval,
indexSpec.getBitmapSerdeFactory()
);
indexIO.getDefaultIndexIOHandler().convertV8toV9(v8OutDir, outDir, indexSpec);
return outDir;
}
log.info("outDir[%s] completed inverted.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
final ArrayList<String> expectedFiles = Lists.newArrayList(
Iterables.concat(
Arrays.asList(
"index.drd", "inverted.drd", "spatial.drd", String.format("time_%s.drd", IndexIO.BYTE_ORDER)
),
Iterables.transform(mergedDimensions, GuavaUtils.formatFunction("dim_%s.drd")),
Iterables.transform(
mergedMetrics, GuavaUtils.formatFunction(String.format("met_%%s_%s.drd", IndexIO.BYTE_ORDER))
)
)
);
if (segmentMetadata != null) {
writeMetadataToFile(new File(v8OutDir, "metadata.drd"), segmentMetadata);
log.info("wrote metadata.drd in outDir[%s].", v8OutDir);
expectedFiles.add("metadata.drd");
finally {
closer.close();
}
Map<String, File> files = Maps.newLinkedHashMap();
for (String fileName : expectedFiles) {
files.put(fileName, new File(v8OutDir, fileName));
}
File smooshDir = new File(v8OutDir, "smoosher");
smooshDir.mkdir();
for (Map.Entry<String, File> entry : Smoosh.smoosh(v8OutDir, smooshDir, files).entrySet()) {
entry.getValue().delete();
}
for (File file : smooshDir.listFiles()) {
Files.move(file, new File(v8OutDir, file.getName()));
}
if (!smooshDir.delete()) {
log.info("Unable to delete temporary dir[%s], contains[%s]", smooshDir, Arrays.asList(smooshDir.listFiles()));
throw new IOException(String.format("Unable to delete temporary dir[%s]", smooshDir));
}
createIndexDrdFile(
IndexIO.V8_VERSION,
v8OutDir,
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.STRING_STRATEGY),
GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.STRING_STRATEGY),
dataInterval,
indexSpec.getBitmapSerdeFactory()
);
indexIO.getDefaultIndexIOHandler().convertV8toV9(v8OutDir, outDir, indexSpec);
FileUtils.deleteDirectory(v8OutDir);
return outDir;
}
protected Iterable<Rowboat> makeRowIterable(

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closer;
import com.google.common.io.Files;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
@ -74,6 +75,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@ -144,102 +146,121 @@ public class IndexMergerV9 extends IndexMerger
);
}
Closer closer = Closer.create();
final IOPeon ioPeon = new TmpFileIOPeon(false);
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
ioPeon.cleanup();
}
});
final FileSmoosher v9Smoosher = new FileSmoosher(outDir);
final File v9TmpDir = new File(outDir, "v9-tmp");
v9TmpDir.mkdirs();
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
FileUtils.deleteDirectory(v9TmpDir);
}
});
log.info("Start making v9 index files, outDir:%s", outDir);
try {
long startTime = System.currentTimeMillis();
ByteStreams.write(
Ints.toByteArray(IndexIO.V9_VERSION),
Files.newOutputStreamSupplier(new File(outDir, "version.bin"))
);
log.info("Completed version.bin in %,d millis.", System.currentTimeMillis() - startTime);
long startTime = System.currentTimeMillis();
ByteStreams.write(
Ints.toByteArray(IndexIO.V9_VERSION),
Files.newOutputStreamSupplier(new File(outDir, "version.bin"))
);
log.info("Completed version.bin in %,d millis.", System.currentTimeMillis() - startTime);
progress.progress();
final Map<String, ValueType> metricsValueTypes = Maps.newTreeMap(Ordering.<String>natural().nullsFirst());
final Map<String, String> metricTypeNames = Maps.newTreeMap(Ordering.<String>natural().nullsFirst());
final List<ColumnCapabilitiesImpl> dimCapabilities = Lists.newArrayListWithCapacity(mergedDimensions.size());
mergeCapabilities(adapters, mergedDimensions, metricsValueTypes, metricTypeNames, dimCapabilities);
progress.progress();
final Map<String, ValueType> metricsValueTypes = Maps.newTreeMap(Ordering.<String>natural().nullsFirst());
final Map<String, String> metricTypeNames = Maps.newTreeMap(Ordering.<String>natural().nullsFirst());
final List<ColumnCapabilitiesImpl> dimCapabilities = Lists.newArrayListWithCapacity(mergedDimensions.size());
mergeCapabilities(adapters, mergedDimensions, metricsValueTypes, metricTypeNames, dimCapabilities);
/************* Setup Dim Conversions **************/
progress.progress();
startTime = System.currentTimeMillis();
final Map<String, Integer> dimCardinalities = Maps.newHashMap();
final ArrayList<GenericIndexedWriter<String>> dimValueWriters = setupDimValueWriters(ioPeon, mergedDimensions);
final ArrayList<Map<String, IntBuffer>> dimConversions = Lists.newArrayListWithCapacity(adapters.size());
final ArrayList<Boolean> dimensionSkipFlag = Lists.newArrayListWithCapacity(mergedDimensions.size());
final ArrayList<Boolean> dimHasNullFlags = Lists.newArrayListWithCapacity(mergedDimensions.size());
final ArrayList<Boolean> convertMissingDimsFlags = Lists.newArrayListWithCapacity(mergedDimensions.size());
writeDimValueAndSetupDimConversion(
adapters, progress, mergedDimensions, dimCardinalities, dimValueWriters, dimensionSkipFlag, dimConversions,
convertMissingDimsFlags, dimHasNullFlags
);
log.info("Completed dim conversions in %,d millis.", System.currentTimeMillis() - startTime);
/************* Setup Dim Conversions **************/
progress.progress();
startTime = System.currentTimeMillis();
final Map<String, Integer> dimCardinalities = Maps.newHashMap();
final ArrayList<GenericIndexedWriter<String>> dimValueWriters = setupDimValueWriters(ioPeon, mergedDimensions);
final ArrayList<Map<String, IntBuffer>> dimConversions = Lists.newArrayListWithCapacity(adapters.size());
final ArrayList<Boolean> dimensionSkipFlag = Lists.newArrayListWithCapacity(mergedDimensions.size());
final ArrayList<Boolean> dimHasNullFlags = Lists.newArrayListWithCapacity(mergedDimensions.size());
final ArrayList<Boolean> convertMissingDimsFlags = Lists.newArrayListWithCapacity(mergedDimensions.size());
writeDimValueAndSetupDimConversion(
adapters, progress, mergedDimensions, dimCardinalities, dimValueWriters, dimensionSkipFlag, dimConversions,
convertMissingDimsFlags, dimHasNullFlags
);
log.info("Completed dim conversions in %,d millis.", System.currentTimeMillis() - startTime);
/************* Walk through data sets, merge them, and write merged columns *************/
progress.progress();
final Iterable<Rowboat> theRows = makeRowIterable(
adapters,
mergedDimensions,
mergedMetrics,
dimConversions,
convertMissingDimsFlags,
rowMergerFn
);
final LongColumnSerializer timeWriter = setupTimeWriter(ioPeon);
final ArrayList<IndexedIntsWriter> dimWriters = setupDimensionWriters(
ioPeon, mergedDimensions, dimCapabilities, dimCardinalities, indexSpec
);
final ArrayList<GenericColumnSerializer> metWriters = setupMetricsWriters(
ioPeon, mergedMetrics, metricsValueTypes, metricTypeNames, indexSpec
);
final List<IntBuffer> rowNumConversions = Lists.newArrayListWithCapacity(adapters.size());
final ArrayList<MutableBitmap> nullRowsList = Lists.newArrayListWithCapacity(mergedDimensions.size());
for (int i = 0; i < mergedDimensions.size(); ++i) {
nullRowsList.add(indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap());
}
mergeIndexesAndWriteColumns(
adapters, progress, theRows, timeWriter, dimWriters, metWriters,
dimensionSkipFlag, rowNumConversions, nullRowsList, dimHasNullFlags
);
/************* Walk through data sets, merge them, and write merged columns *************/
progress.progress();
final Iterable<Rowboat> theRows = makeRowIterable(
adapters,
mergedDimensions,
mergedMetrics,
dimConversions,
convertMissingDimsFlags,
rowMergerFn
);
final LongColumnSerializer timeWriter = setupTimeWriter(ioPeon);
final ArrayList<IndexedIntsWriter> dimWriters = setupDimensionWriters(
ioPeon, mergedDimensions, dimCapabilities, dimCardinalities, indexSpec
);
final ArrayList<GenericColumnSerializer> metWriters = setupMetricsWriters(
ioPeon, mergedMetrics, metricsValueTypes, metricTypeNames, indexSpec
);
final List<IntBuffer> rowNumConversions = Lists.newArrayListWithCapacity(adapters.size());
final ArrayList<MutableBitmap> nullRowsList = Lists.newArrayListWithCapacity(mergedDimensions.size());
for (int i = 0; i < mergedDimensions.size(); ++i) {
nullRowsList.add(indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap());
/************ Create Inverted Indexes *************/
progress.progress();
final ArrayList<GenericIndexedWriter<ImmutableBitmap>> bitmapIndexWriters = setupBitmapIndexWriters(
ioPeon, mergedDimensions, indexSpec
);
final ArrayList<ByteBufferWriter<ImmutableRTree>> spatialIndexWriters = setupSpatialIndexWriters(
ioPeon, mergedDimensions, indexSpec, dimCapabilities
);
makeInvertedIndexes(
adapters, progress, mergedDimensions, indexSpec, v9TmpDir, rowNumConversions,
nullRowsList, dimValueWriters, bitmapIndexWriters, spatialIndexWriters, dimConversions
);
/************ Finalize Build Columns *************/
progress.progress();
makeTimeColumn(v9Smoosher, progress, timeWriter);
makeMetricsColumns(v9Smoosher, progress, mergedMetrics, metricsValueTypes, metricTypeNames, metWriters);
makeDimensionColumns(
v9Smoosher, progress, indexSpec, mergedDimensions, dimensionSkipFlag, dimCapabilities,
dimValueWriters, dimWriters, bitmapIndexWriters, spatialIndexWriters
);
/************* Make index.drd & metadata.drd files **************/
progress.progress();
makeIndexBinary(
v9Smoosher, adapters, outDir, mergedDimensions, dimensionSkipFlag, mergedMetrics, progress, indexSpec
);
makeMetadataBinary(v9Smoosher, progress, segmentMetadata);
v9Smoosher.close();
progress.stop();
return outDir;
}
finally {
closer.close();
}
mergeIndexesAndWriteColumns(
adapters, progress, theRows, timeWriter, dimWriters, metWriters,
dimensionSkipFlag, rowNumConversions, nullRowsList, dimHasNullFlags
);
/************ Create Inverted Indexes *************/
progress.progress();
final ArrayList<GenericIndexedWriter<ImmutableBitmap>> bitmapIndexWriters = setupBitmapIndexWriters(
ioPeon, mergedDimensions, indexSpec
);
final ArrayList<ByteBufferWriter<ImmutableRTree>> spatialIndexWriters = setupSpatialIndexWriters(
ioPeon, mergedDimensions, indexSpec, dimCapabilities
);
makeInvertedIndexes(
adapters, progress, mergedDimensions, indexSpec, v9TmpDir, rowNumConversions,
nullRowsList, dimValueWriters, bitmapIndexWriters, spatialIndexWriters, dimConversions
);
/************ Finalize Build Columns *************/
progress.progress();
makeTimeColumn(v9Smoosher, progress, timeWriter);
makeMetricsColumns(v9Smoosher, progress, mergedMetrics, metricsValueTypes, metricTypeNames, metWriters);
makeDimensionColumns(
v9Smoosher, progress, indexSpec, mergedDimensions, dimensionSkipFlag, dimCapabilities,
dimValueWriters, dimWriters, bitmapIndexWriters, spatialIndexWriters
);
/************* Make index.drd & metadata.drd files **************/
progress.progress();
makeIndexBinary(
v9Smoosher, adapters, outDir, mergedDimensions, dimensionSkipFlag, mergedMetrics, progress, indexSpec
);
makeMetadataBinary(v9Smoosher, progress, segmentMetadata);
v9Smoosher.close();
ioPeon.cleanup();
FileUtils.deleteDirectory(v9TmpDir);
progress.stop();
return outDir;
}
private void makeMetadataBinary(

View File

@ -37,6 +37,7 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.SimpleDictionaryEncodedColumn;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.CompressedObjectStrategy;
@ -67,7 +68,6 @@ import java.nio.IntBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -1662,4 +1662,34 @@ public class IndexMergerTest
Assert.assertEquals(2, dictIdSeeker.seek(4));
Assert.assertEquals(-1, dictIdSeeker.seek(5));
}
@Test(expected = IllegalArgumentException.class)
public void testCloser() throws Exception
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist);
ColumnCapabilitiesImpl capabilities = (ColumnCapabilitiesImpl) toPersist.getCapabilities("dim1");
capabilities.setHasSpatialIndexes(true);
final File tempDir = temporaryFolder.newFolder();
final File v8TmpDir = new File(tempDir, "v8-tmp");
final File v9TmpDir = new File(tempDir, "v9-tmp");
try {
INDEX_MERGER.persist(
toPersist,
tempDir,
indexSpec
);
}
finally {
if (v8TmpDir.exists()) {
Assert.fail("v8-tmp dir not clean.");
}
if (v9TmpDir.exists()) {
Assert.fail("v9-tmp dir not clean.");
}
}
}
}