HADOOP-11569. Provide Merge API for MapFile to merge multiple similar MapFiles to one MapFile. Contributed by Vinayakumar B.

This commit is contained in:
Tsuyoshi Ozawa 2015-02-27 17:46:07 +09:00
parent 8ca0d957c4
commit 48c7ee7553
3 changed files with 202 additions and 0 deletions

View File

@ -445,6 +445,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11510. Expose truncate API via FileContext. (yliu) HADOOP-11510. Expose truncate API via FileContext. (yliu)
HADOOP-11569. Provide Merge API for MapFile to merge multiple similar MapFiles
to one MapFile. (Vinayakumar B via ozawa)
IMPROVEMENTS IMPROVEMENTS
HADOOP-11483. HardLink.java should use the jdk7 createLink method (aajisaka) HADOOP-11483. HardLink.java should use the jdk7 createLink method (aajisaka)

View File

@ -25,6 +25,7 @@ import java.util.Arrays;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -824,6 +825,148 @@ public class MapFile {
return cnt; return cnt;
} }
/**
* Class to merge multiple MapFiles of same Key and Value types to one MapFile
*/
public static class Merger {
private Configuration conf;
private WritableComparator comparator = null;
private Reader[] inReaders;
private Writer outWriter;
private Class<Writable> valueClass = null;
private Class<WritableComparable> keyClass = null;
public Merger(Configuration conf) throws IOException {
this.conf = conf;
}
/**
* Merge multiple MapFiles to one Mapfile
*
* @param inMapFiles
* @param outMapFile
* @throws IOException
*/
public void merge(Path[] inMapFiles, boolean deleteInputs,
Path outMapFile) throws IOException {
try {
open(inMapFiles, outMapFile);
mergePass();
} finally {
close();
}
if (deleteInputs) {
for (int i = 0; i < inMapFiles.length; i++) {
Path path = inMapFiles[i];
delete(path.getFileSystem(conf), path.toString());
}
}
}
/*
* Open all input files for reading and verify the key and value types. And
* open Output file for writing
*/
@SuppressWarnings("unchecked")
private void open(Path[] inMapFiles, Path outMapFile) throws IOException {
inReaders = new Reader[inMapFiles.length];
for (int i = 0; i < inMapFiles.length; i++) {
Reader reader = new Reader(inMapFiles[i], conf);
if (keyClass == null || valueClass == null) {
keyClass = (Class<WritableComparable>) reader.getKeyClass();
valueClass = (Class<Writable>) reader.getValueClass();
} else if (keyClass != reader.getKeyClass()
|| valueClass != reader.getValueClass()) {
throw new HadoopIllegalArgumentException(
"Input files cannot be merged as they"
+ " have different Key and Value classes");
}
inReaders[i] = reader;
}
if (comparator == null) {
Class<? extends WritableComparable> cls;
cls = keyClass.asSubclass(WritableComparable.class);
this.comparator = WritableComparator.get(cls, conf);
} else if (comparator.getKeyClass() != keyClass) {
throw new HadoopIllegalArgumentException(
"Input files cannot be merged as they"
+ " have different Key class compared to"
+ " specified comparator");
}
outWriter = new MapFile.Writer(conf, outMapFile,
MapFile.Writer.keyClass(keyClass),
MapFile.Writer.valueClass(valueClass));
}
/**
* Merge all input files to output map file.<br>
* 1. Read first key/value from all input files to keys/values array. <br>
* 2. Select the least key and corresponding value. <br>
* 3. Write the selected key and value to output file. <br>
* 4. Replace the already written key/value in keys/values arrays with the
* next key/value from the selected input <br>
* 5. Repeat step 2-4 till all keys are read. <br>
*/
private void mergePass() throws IOException {
// re-usable array
WritableComparable[] keys = new WritableComparable[inReaders.length];
Writable[] values = new Writable[inReaders.length];
// Read first key/value from all inputs
for (int i = 0; i < inReaders.length; i++) {
keys[i] = ReflectionUtils.newInstance(keyClass, null);
values[i] = ReflectionUtils.newInstance(valueClass, null);
if (!inReaders[i].next(keys[i], values[i])) {
// Handle empty files
keys[i] = null;
values[i] = null;
}
}
do {
int currentEntry = -1;
WritableComparable currentKey = null;
Writable currentValue = null;
for (int i = 0; i < keys.length; i++) {
if (keys[i] == null) {
// Skip Readers reached EOF
continue;
}
if (currentKey == null || comparator.compare(currentKey, keys[i]) > 0) {
currentEntry = i;
currentKey = keys[i];
currentValue = values[i];
}
}
if (currentKey == null) {
// Merge Complete
break;
}
// Write the selected key/value to merge stream
outWriter.append(currentKey, currentValue);
// Replace the already written key/value in keys/values arrays with the
// next key/value from the selected input
if (!inReaders[currentEntry].next(keys[currentEntry],
values[currentEntry])) {
// EOF for this file
keys[currentEntry] = null;
values[currentEntry] = null;
}
} while (true);
}
private void close() throws IOException {
for (int i = 0; i < inReaders.length; i++) {
IOUtils.closeStream(inReaders[i]);
inReaders[i] = null;
}
if (outWriter != null) {
outWriter.close();
outWriter = null;
}
}
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String usage = "Usage: MapFile inFile outFile"; String usage = "Usage: MapFile inFile outFile";

View File

@ -21,6 +21,10 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -730,4 +734,56 @@ public class TestMapFile {
reader.close(); reader.close();
} }
} }
@Test
public void testMerge() throws Exception {
final String TEST_METHOD_KEY = "testMerge.mapfile";
int SIZE = 10;
int ITERATIONS = 5;
Path[] in = new Path[5];
List<Integer> expected = new ArrayList<Integer>();
for (int j = 0; j < 5; j++) {
try (MapFile.Writer writer = createWriter(TEST_METHOD_KEY + "." + j,
IntWritable.class, Text.class)) {
in[j] = new Path(TEST_DIR, TEST_METHOD_KEY + "." + j);
for (int i = 0; i < SIZE; i++) {
expected.add(i + j);
writer.append(new IntWritable(i + j), new Text("Value:" + (i + j)));
}
}
}
// Sort expected values
Collections.sort(expected);
// Merge all 5 files
MapFile.Merger merger = new MapFile.Merger(conf);
merger.merge(in, true, new Path(TEST_DIR, TEST_METHOD_KEY));
try (MapFile.Reader reader = createReader(TEST_METHOD_KEY,
IntWritable.class)) {
int start = 0;
// test iteration
Text startValue = new Text("Value:" + start);
int i = 0;
while (i++ < ITERATIONS) {
Iterator<Integer> expectedIterator = expected.iterator();
IntWritable key = new IntWritable(start);
Text value = startValue;
IntWritable prev = new IntWritable(start);
while (reader.next(key, value)) {
assertTrue("Next key should be always equal or more",
prev.get() <= key.get());
assertEquals(expectedIterator.next().intValue(), key.get());
prev.set(key.get());
}
reader.reset();
}
}
// inputs should be deleted
for (int j = 0; j < in.length; j++) {
Path path = in[j];
assertFalse("inputs should be deleted",
path.getFileSystem(conf).exists(path));
}
}
} }