From 504eeec96e9a9cdc137f7b50ff16d2c11e59a1a9 Mon Sep 17 00:00:00 2001 From: Nicolas Spiegelberg Date: Wed, 8 Dec 2010 06:37:06 +0000 Subject: [PATCH] HBASE-1861 : Multi-Family support for bulk upload tools git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1043318 13f79535-47bb-0310-9956-ffa450edef68 --- .../hbase/mapreduce/HFileOutputFormat.java | 97 ++++++++++++------- .../hbase/mapreduce/PutSortReducer.java | 44 ++++++--- .../mapreduce/TestHFileOutputFormat.java | 61 +++++++++--- 3 files changed, 141 insertions(+), 61 deletions(-) diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java index 8ccdf4d7fb7..a0a2f867258 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -58,7 +58,8 @@ import org.apache.commons.logging.LogFactory; * Currently, can only write files to a single column family at a * time. Multiple column families requires coordinating keys cross family. * Writes current time as the sequence id for the file. Sets the major compacted - * attribute on created hfiles. + * attribute on created hfiles. Calling write(null,null) will forceably roll + * all HFiles being written. * @see KeyValueSortReducer */ public class HFileOutputFormat extends FileOutputFormat { @@ -72,9 +73,10 @@ public class HFileOutputFormat extends FileOutputFormat(Bytes.BYTES_COMPARATOR); private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY; private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); + private boolean rollRequested = false; public void write(ImmutableBytesWritable row, KeyValue kv) throws IOException { + // null input == user explicitly wants to flush + if (row == null && kv == null) { + rollWriters(); + return; + } + + byte [] rowKey = kv.getRow(); long length = kv.getLength(); byte [] family = kv.getFamily(); WriterLength wl = this.writers.get(family); - if (wl == null || ((length + wl.written) >= maxsize) && - Bytes.compareTo(this.previousRow, 0, this.previousRow.length, - kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()) != 0) { - // Get a new writer. - Path basedir = new Path(outputdir, Bytes.toString(family)); - if (wl == null) { - wl = new WriterLength(); - this.writers.put(family, wl); - if (this.writers.size() > 1) throw new IOException("One family only"); - // If wl == null, first file in family. Ensure family dir exits. - if (!fs.exists(basedir)) fs.mkdirs(basedir); - } - wl.writer = getNewWriter(wl.writer, basedir); - LOG.info("Writer=" + wl.writer.getPath() + - ((wl.written == 0)? "": ", wrote=" + wl.written)); - wl.written = 0; + + // If this is a new column family, verify that the directory exists + if (wl == null) { + fs.mkdirs(new Path(outputdir, Bytes.toString(family))); } + + // If any of the HFiles for the column families has reached + // maxsize, we need to roll all the writers + if (wl != null && wl.written + length >= maxsize) { + this.rollRequested = true; + } + + // This can only happen once a row is finished though + if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { + rollWriters(); + } + + // create a new HLog writer, if necessary + if (wl == null || wl.writer == null) { + wl = getNewWriter(family); + } + + // we now have the proper HLog writer. full steam ahead kv.updateLatestStamp(this.now); wl.writer.append(kv); wl.written += length; + // Copy the row so we know when a row transition. - this.previousRow = kv.getRow(); + this.previousRow = rowKey; } - /* Create a new HFile.Writer. Close current if there is one. - * @param writer - * @param familydir - * @return A new HFile.Writer. + private void rollWriters() throws IOException { + for (WriterLength wl : this.writers.values()) { + if (wl.writer != null) { + LOG.info("Writer=" + wl.writer.getPath() + + ((wl.written == 0)? "": ", wrote=" + wl.written)); + close(wl.writer); + } + wl.writer = null; + wl.written = 0; + } + this.rollRequested = false; + } + + /* Create a new HFile.Writer. + * @param family + * @return A WriterLength, containing a new HFile.Writer. * @throws IOException */ - private HFile.Writer getNewWriter(final HFile.Writer writer, - final Path familydir) - throws IOException { - close(writer); - return new HFile.Writer(fs, StoreFile.getUniqueFile(fs, familydir), - blocksize, compression, KeyValue.KEY_COMPARATOR); + private WriterLength getNewWriter(byte[] family) throws IOException { + WriterLength wl = new WriterLength(); + Path familydir = new Path(outputdir, Bytes.toString(family)); + wl.writer = new HFile.Writer(fs, + StoreFile.getUniqueFile(fs, familydir), blocksize, + compression, KeyValue.KEY_COMPARATOR); + this.writers.put(family, wl); + return wl; } private void close(final HFile.Writer w) throws IOException { @@ -143,8 +174,8 @@ public class HFileOutputFormat extends FileOutputFormat e: this.writers.entrySet()) { - close(e.getValue().writer); + for (WriterLength wl: this.writers.values()) { + close(wl.writer); } } }; diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java index 5fb3e83bd2a..0cba4bfad68 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.mapreduce; +import java.util.Iterator; import java.util.List; import java.util.TreeSet; @@ -26,6 +27,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.StringUtils; /** * Emits sorted Puts. @@ -46,21 +48,37 @@ public class PutSortReducer extends ImmutableBytesWritable, KeyValue>.Context context) throws java.io.IOException, InterruptedException { - TreeSet map = new TreeSet(KeyValue.COMPARATOR); - - for (Put p : puts) { - for (List kvs : p.getFamilyMap().values()) { - for (KeyValue kv : kvs) { - map.add(kv.clone()); + // although reduce() is called per-row, handle pathological case + long threshold = context.getConfiguration().getLong( + "putsortreducer.row.threshold", 2L * (1<<30)); + Iterator iter = puts.iterator(); + while (iter.hasNext()) { + TreeSet map = new TreeSet(KeyValue.COMPARATOR); + long curSize = 0; + // stop at the end or the RAM threshold + while (iter.hasNext() && curSize < threshold) { + Put p = iter.next(); + for (List kvs : p.getFamilyMap().values()) { + for (KeyValue kv : kvs) { + map.add(kv); + curSize += kv.getValueLength(); + } } } - } - context.setStatus("Read " + map.getClass()); - int index = 0; - for (KeyValue kv : map) { - context.write(row, kv); - if (index > 0 && index % 100 == 0) - context.setStatus("Wrote " + index); + context.setStatus("Read " + map.size() + " entries of " + map.getClass() + + "(" + StringUtils.humanReadableInt(curSize) + ")"); + int index = 0; + for (KeyValue kv : map) { + context.write(row, kv); + if (index > 0 && index % 100 == 0) + context.setStatus("Wrote " + index); + } + + // if we have more entries to process + if (iter.hasNext()) { + // force flush because we cannot guarantee intra-row sorted order + context.write(null, null); + } } } } diff --git a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java index c5d56cc4e25..5760d486c65 100644 --- a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java +++ b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java @@ -41,6 +41,9 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PerformanceEvaluation; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; @@ -65,7 +68,9 @@ import org.mockito.Mockito; public class TestHFileOutputFormat { private final static int ROWSPERSPLIT = 1024; - private static final byte[] FAMILY_NAME = PerformanceEvaluation.FAMILY_NAME; + private static final byte[][] FAMILIES + = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")) + , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))}; private static final byte[] TABLE_NAME = Bytes.toBytes("TestTable"); private HBaseTestingUtility util = new HBaseTestingUtility(); @@ -119,9 +124,11 @@ public class TestHFileOutputFormat { random.nextBytes(valBytes); ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); - KeyValue kv = new KeyValue(keyBytes, PerformanceEvaluation.FAMILY_NAME, - PerformanceEvaluation.QUALIFIER_NAME, valBytes); - context.write(key, kv); + for (byte[] family : TestHFileOutputFormat.FAMILIES) { + KeyValue kv = new KeyValue(keyBytes, family, + PerformanceEvaluation.QUALIFIER_NAME, valBytes); + context.write(key, kv); + } } } } @@ -268,14 +275,12 @@ public class TestHFileOutputFormat { try { util.startMiniCluster(); HBaseAdmin admin = new HBaseAdmin(conf); - HTable table = util.createTable(TABLE_NAME, FAMILY_NAME); - int numRegions = util.createMultiRegions( - util.getConfiguration(), table, FAMILY_NAME, - startKeys); - assertEquals("Should make 5 regions", - numRegions, 5); + HTable table = util.createTable(TABLE_NAME, FAMILIES); assertEquals("Should start with empty table", 0, util.countRows(table)); + int numRegions = util.createMultiRegions( + util.getConfiguration(), table, FAMILIES[0], startKeys); + assertEquals("Should make 5 regions", numRegions, 5); // Generate the bulk load files util.startMiniMapReduceCluster(); @@ -284,6 +289,19 @@ public class TestHFileOutputFormat { assertEquals("HFOF should not touch actual table", 0, util.countRows(table)); + + // Make sure that a directory was created for every CF + int dir = 0; + for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) { + for (byte[] family : FAMILIES) { + if (Bytes.toString(family).equals(f.getPath().getName())) { + ++dir; + } + } + } + assertEquals("Column family not found in FS.", FAMILIES.length, dir); + + // handle the split case if (shouldChangeRegions) { LOG.info("Changing regions in table"); admin.disableTable(table.getTableName()); @@ -293,8 +311,8 @@ public class TestHFileOutputFormat { LOG.info("Waiting on table to finish disabling"); } byte[][] newStartKeys = generateRandomStartKeys(15); - util.createMultiRegions(util.getConfiguration(), - table, FAMILY_NAME, newStartKeys); + util.createMultiRegions( + util.getConfiguration(), table, FAMILIES[0], newStartKeys); admin.enableTable(table.getTableName()); while (table.getRegionsInfo().size() != 15 || !admin.isTableAvailable(table.getTableName())) { @@ -310,6 +328,19 @@ public class TestHFileOutputFormat { int expectedRows = conf.getInt("mapred.map.tasks", 1) * ROWSPERSPLIT; assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows, util.countRows(table)); + Scan scan = new Scan(); + ResultScanner results = table.getScanner(scan); + int count = 0; + for (Result res : results) { + count++; + assertEquals(FAMILIES.length, res.raw().length); + KeyValue first = res.raw()[0]; + for (KeyValue kv : res.raw()) { + assertTrue(KeyValue.COMPARATOR.matchingRows(first, kv)); + assertTrue(Bytes.equals(first.getValue(), kv.getValue())); + } + } + results.close(); String tableDigestBefore = util.checksumRows(table); // Cause regions to reopen @@ -351,11 +382,11 @@ public class TestHFileOutputFormat { util = new HBaseTestingUtility(conf); if ("newtable".equals(args[0])) { byte[] tname = args[1].getBytes(); - HTable table = util.createTable(tname, FAMILY_NAME); + HTable table = util.createTable(tname, FAMILIES); HBaseAdmin admin = new HBaseAdmin(conf); admin.disableTable(tname); - util.createMultiRegions(conf, table, FAMILY_NAME, - generateRandomStartKeys(5)); + byte[][] startKeys = generateRandomStartKeys(5); + util.createMultiRegions(conf, table, FAMILIES[0], startKeys); admin.enableTable(tname); } else if ("incremental".equals(args[0])) { byte[] tname = args[1].getBytes();