HBASE-1861 : Multi-Family support for bulk upload tools

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1043318 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Nicolas Spiegelberg 2010-12-08 06:37:06 +00:00
parent afac14fcf0
commit 504eeec96e
3 changed files with 141 additions and 61 deletions

View File

@ -58,7 +58,8 @@ import org.apache.commons.logging.LogFactory;
* Currently, can only write files to a single column family at a
* time. Multiple column families requires coordinating keys cross family.
* Writes current time as the sequence id for the file. Sets the major compacted
* attribute on created hfiles.
* attribute on created hfiles. Calling write(null,null) will forceably roll
* all HFiles being written.
* @see KeyValueSortReducer
*/
public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
@ -72,9 +73,10 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
Configuration conf = context.getConfiguration();
final FileSystem fs = outputdir.getFileSystem(conf);
// These configs. are from hbase-*.xml
final long maxsize = conf.getLong("hbase.hregion.max.filesize", 268435456);
final int blocksize =
conf.getInt("hbase.mapreduce.hfileoutputformat.blocksize", 65536);
final long maxsize = conf.getLong("hbase.hregion.max.filesize",
HConstants.DEFAULT_MAX_FILE_SIZE);
final int blocksize = conf.getInt("hfile.min.blocksize.size",
HFile.DEFAULT_BLOCKSIZE);
// Invented config. Add to hbase-*.xml if other than default compression.
final String compression = conf.get("hfile.compression",
Compression.Algorithm.NONE.getName());
@ -85,48 +87,77 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
private boolean rollRequested = false;
public void write(ImmutableBytesWritable row, KeyValue kv)
throws IOException {
// null input == user explicitly wants to flush
if (row == null && kv == null) {
rollWriters();
return;
}
byte [] rowKey = kv.getRow();
long length = kv.getLength();
byte [] family = kv.getFamily();
WriterLength wl = this.writers.get(family);
if (wl == null || ((length + wl.written) >= maxsize) &&
Bytes.compareTo(this.previousRow, 0, this.previousRow.length,
kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()) != 0) {
// Get a new writer.
Path basedir = new Path(outputdir, Bytes.toString(family));
if (wl == null) {
wl = new WriterLength();
this.writers.put(family, wl);
if (this.writers.size() > 1) throw new IOException("One family only");
// If wl == null, first file in family. Ensure family dir exits.
if (!fs.exists(basedir)) fs.mkdirs(basedir);
}
wl.writer = getNewWriter(wl.writer, basedir);
LOG.info("Writer=" + wl.writer.getPath() +
((wl.written == 0)? "": ", wrote=" + wl.written));
wl.written = 0;
// If this is a new column family, verify that the directory exists
if (wl == null) {
fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
}
// If any of the HFiles for the column families has reached
// maxsize, we need to roll all the writers
if (wl != null && wl.written + length >= maxsize) {
this.rollRequested = true;
}
// This can only happen once a row is finished though
if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
rollWriters();
}
// create a new HLog writer, if necessary
if (wl == null || wl.writer == null) {
wl = getNewWriter(family);
}
// we now have the proper HLog writer. full steam ahead
kv.updateLatestStamp(this.now);
wl.writer.append(kv);
wl.written += length;
// Copy the row so we know when a row transition.
this.previousRow = kv.getRow();
this.previousRow = rowKey;
}
/* Create a new HFile.Writer. Close current if there is one.
* @param writer
* @param familydir
* @return A new HFile.Writer.
private void rollWriters() throws IOException {
for (WriterLength wl : this.writers.values()) {
if (wl.writer != null) {
LOG.info("Writer=" + wl.writer.getPath() +
((wl.written == 0)? "": ", wrote=" + wl.written));
close(wl.writer);
}
wl.writer = null;
wl.written = 0;
}
this.rollRequested = false;
}
/* Create a new HFile.Writer.
* @param family
* @return A WriterLength, containing a new HFile.Writer.
* @throws IOException
*/
private HFile.Writer getNewWriter(final HFile.Writer writer,
final Path familydir)
throws IOException {
close(writer);
return new HFile.Writer(fs, StoreFile.getUniqueFile(fs, familydir),
blocksize, compression, KeyValue.KEY_COMPARATOR);
private WriterLength getNewWriter(byte[] family) throws IOException {
WriterLength wl = new WriterLength();
Path familydir = new Path(outputdir, Bytes.toString(family));
wl.writer = new HFile.Writer(fs,
StoreFile.getUniqueFile(fs, familydir), blocksize,
compression, KeyValue.KEY_COMPARATOR);
this.writers.put(family, wl);
return wl;
}
private void close(final HFile.Writer w) throws IOException {
@ -143,8 +174,8 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
public void close(TaskAttemptContext c)
throws IOException, InterruptedException {
for (Map.Entry<byte [], WriterLength> e: this.writers.entrySet()) {
close(e.getValue().writer);
for (WriterLength wl: this.writers.values()) {
close(wl.writer);
}
}
};

View File

@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.mapreduce;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
@ -26,6 +27,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.StringUtils;
/**
* Emits sorted Puts.
@ -46,21 +48,37 @@ public class PutSortReducer extends
ImmutableBytesWritable, KeyValue>.Context context)
throws java.io.IOException, InterruptedException
{
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
for (Put p : puts) {
for (List<KeyValue> kvs : p.getFamilyMap().values()) {
for (KeyValue kv : kvs) {
map.add(kv.clone());
// although reduce() is called per-row, handle pathological case
long threshold = context.getConfiguration().getLong(
"putsortreducer.row.threshold", 2L * (1<<30));
Iterator<Put> iter = puts.iterator();
while (iter.hasNext()) {
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
long curSize = 0;
// stop at the end or the RAM threshold
while (iter.hasNext() && curSize < threshold) {
Put p = iter.next();
for (List<KeyValue> kvs : p.getFamilyMap().values()) {
for (KeyValue kv : kvs) {
map.add(kv);
curSize += kv.getValueLength();
}
}
}
}
context.setStatus("Read " + map.getClass());
int index = 0;
for (KeyValue kv : map) {
context.write(row, kv);
if (index > 0 && index % 100 == 0)
context.setStatus("Wrote " + index);
context.setStatus("Read " + map.size() + " entries of " + map.getClass()
+ "(" + StringUtils.humanReadableInt(curSize) + ")");
int index = 0;
for (KeyValue kv : map) {
context.write(row, kv);
if (index > 0 && index % 100 == 0)
context.setStatus("Wrote " + index);
}
// if we have more entries to process
if (iter.hasNext()) {
// force flush because we cannot guarantee intra-row sorted order
context.write(null, null);
}
}
}
}

View File

@ -41,6 +41,9 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
@ -65,7 +68,9 @@ import org.mockito.Mockito;
public class TestHFileOutputFormat {
private final static int ROWSPERSPLIT = 1024;
private static final byte[] FAMILY_NAME = PerformanceEvaluation.FAMILY_NAME;
private static final byte[][] FAMILIES
= { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
, Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
private static final byte[] TABLE_NAME = Bytes.toBytes("TestTable");
private HBaseTestingUtility util = new HBaseTestingUtility();
@ -119,9 +124,11 @@ public class TestHFileOutputFormat {
random.nextBytes(valBytes);
ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
KeyValue kv = new KeyValue(keyBytes, PerformanceEvaluation.FAMILY_NAME,
PerformanceEvaluation.QUALIFIER_NAME, valBytes);
context.write(key, kv);
for (byte[] family : TestHFileOutputFormat.FAMILIES) {
KeyValue kv = new KeyValue(keyBytes, family,
PerformanceEvaluation.QUALIFIER_NAME, valBytes);
context.write(key, kv);
}
}
}
}
@ -268,14 +275,12 @@ public class TestHFileOutputFormat {
try {
util.startMiniCluster();
HBaseAdmin admin = new HBaseAdmin(conf);
HTable table = util.createTable(TABLE_NAME, FAMILY_NAME);
int numRegions = util.createMultiRegions(
util.getConfiguration(), table, FAMILY_NAME,
startKeys);
assertEquals("Should make 5 regions",
numRegions, 5);
HTable table = util.createTable(TABLE_NAME, FAMILIES);
assertEquals("Should start with empty table",
0, util.countRows(table));
int numRegions = util.createMultiRegions(
util.getConfiguration(), table, FAMILIES[0], startKeys);
assertEquals("Should make 5 regions", numRegions, 5);
// Generate the bulk load files
util.startMiniMapReduceCluster();
@ -284,6 +289,19 @@ public class TestHFileOutputFormat {
assertEquals("HFOF should not touch actual table",
0, util.countRows(table));
// Make sure that a directory was created for every CF
int dir = 0;
for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) {
for (byte[] family : FAMILIES) {
if (Bytes.toString(family).equals(f.getPath().getName())) {
++dir;
}
}
}
assertEquals("Column family not found in FS.", FAMILIES.length, dir);
// handle the split case
if (shouldChangeRegions) {
LOG.info("Changing regions in table");
admin.disableTable(table.getTableName());
@ -293,8 +311,8 @@ public class TestHFileOutputFormat {
LOG.info("Waiting on table to finish disabling");
}
byte[][] newStartKeys = generateRandomStartKeys(15);
util.createMultiRegions(util.getConfiguration(),
table, FAMILY_NAME, newStartKeys);
util.createMultiRegions(
util.getConfiguration(), table, FAMILIES[0], newStartKeys);
admin.enableTable(table.getTableName());
while (table.getRegionsInfo().size() != 15 ||
!admin.isTableAvailable(table.getTableName())) {
@ -310,6 +328,19 @@ public class TestHFileOutputFormat {
int expectedRows = conf.getInt("mapred.map.tasks", 1) * ROWSPERSPLIT;
assertEquals("LoadIncrementalHFiles should put expected data in table",
expectedRows, util.countRows(table));
Scan scan = new Scan();
ResultScanner results = table.getScanner(scan);
int count = 0;
for (Result res : results) {
count++;
assertEquals(FAMILIES.length, res.raw().length);
KeyValue first = res.raw()[0];
for (KeyValue kv : res.raw()) {
assertTrue(KeyValue.COMPARATOR.matchingRows(first, kv));
assertTrue(Bytes.equals(first.getValue(), kv.getValue()));
}
}
results.close();
String tableDigestBefore = util.checksumRows(table);
// Cause regions to reopen
@ -351,11 +382,11 @@ public class TestHFileOutputFormat {
util = new HBaseTestingUtility(conf);
if ("newtable".equals(args[0])) {
byte[] tname = args[1].getBytes();
HTable table = util.createTable(tname, FAMILY_NAME);
HTable table = util.createTable(tname, FAMILIES);
HBaseAdmin admin = new HBaseAdmin(conf);
admin.disableTable(tname);
util.createMultiRegions(conf, table, FAMILY_NAME,
generateRandomStartKeys(5));
byte[][] startKeys = generateRandomStartKeys(5);
util.createMultiRegions(conf, table, FAMILIES[0], startKeys);
admin.enableTable(tname);
} else if ("incremental".equals(args[0])) {
byte[] tname = args[1].getBytes();