From 1162cbdf15acfc63b64835cb9e7ef29d5b9c6494 Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 7 Jul 2015 10:54:05 -0700 Subject: [PATCH] HBASE-13897 OOM may occur when Import imports a row with too many KeyValues (Liu Junhong) --- .../apache/hadoop/hbase/mapreduce/Import.java | 211 +++++++++++++++++- 1 file changed, 200 insertions(+), 11 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 1fe5d56f2d7..d51d79adfd4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -18,12 +18,28 @@ */ package org.apache.hadoop.hbase.mapreduce; +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; @@ -47,25 +63,21 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.zookeeper.KeeperException; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.UUID; - /** * Import data written by {@link Export}. @@ -81,9 +93,159 @@ public class Import extends Configured implements Tool { public final static String FILTER_ARGS_CONF_KEY = "import.filter.args"; public final static String TABLE_NAME = "import.table.name"; public final static String WAL_DURABILITY = "import.wal.durability"; + public final static String HAS_LARGE_RESULT= "import.bulk.hasLargeResult"; private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + public static class KeyValueWritableComparablePartitioner + extends Partitioner { + private static KeyValueWritableComparable[] START_KEYS = null; + @Override + public int getPartition(KeyValueWritableComparable key, KeyValue value, + int numPartitions) { + for (int i = 0; i < START_KEYS.length; ++i) { + if (key.compareTo(START_KEYS[i]) <= 0) { + return i; + } + } + return START_KEYS.length; + } + + } + + public static class KeyValueWritableComparable + implements WritableComparable { + + private KeyValue kv = null; + + static { + // register this comparator + WritableComparator.define(KeyValueWritableComparable.class, + new KeyValueWritableComparator()); + } + + public KeyValueWritableComparable() { + } + + public KeyValueWritableComparable(KeyValue kv) { + this.kv = kv; + } + + @Override + public void write(DataOutput out) throws IOException { + KeyValue.write(kv, out); + } + + @Override + public void readFields(DataInput in) throws IOException { + kv = KeyValue.create(in); + } + + @Override + public int compareTo(KeyValueWritableComparable o) { + return CellComparator.COMPARATOR.compare(this.kv, ((KeyValueWritableComparable)o).kv); + } + + public static class KeyValueWritableComparator extends WritableComparator { + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + try { + KeyValueWritableComparable kv1 = new KeyValueWritableComparable(); + kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1))); + KeyValueWritableComparable kv2 = new KeyValueWritableComparable(); + kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2))); + return compare(kv1, kv2); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + } + + } + + public static class KeyValueReducer + extends + Reducer { + protected void reduce( + KeyValueWritableComparable row, + Iterable kvs, + Reducer.Context context) + throws java.io.IOException, InterruptedException { + int index = 0; + for (KeyValue kv : kvs) { + context.write(new ImmutableBytesWritable(kv.getRowArray()), kv); + if (++index % 100 == 0) + context.setStatus("Wrote " + index + " KeyValues, " + + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray())); + } + } + } + + public static class KeyValueSortImporter + extends TableMapper { + private Map cfRenameMap; + private Filter filter; + private static final Log LOG = LogFactory.getLog(KeyValueImporter.class); + + /** + * @param row The current table row key. + * @param value The columns. + * @param context The current context. + * @throws IOException When something is broken with the data. + */ + @Override + public void map(ImmutableBytesWritable row, Result value, + Context context) + throws IOException { + try { + if (LOG.isTraceEnabled()) { + LOG.trace("Considering the row." + + Bytes.toString(row.get(), row.getOffset(), row.getLength())); + } + if (filter == null + || !filter.filterRowKey(KeyValueUtil.createFirstOnRow(row.get(), row.getOffset(), + (short) row.getLength()))) { + for (Cell kv : value.rawCells()) { + kv = filterKv(filter, kv); + // skip if we filtered it out + if (kv == null) continue; + // TODO get rid of ensureKeyValue + KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)); + context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void setup(Context context) throws IOException { + cfRenameMap = createCfRenameMap(context.getConfiguration()); + filter = instantiateFilter(context.getConfiguration()); + int reduceNum = context.getNumReduceTasks(); + Configuration conf = context.getConfiguration(); + TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME)); + try (Connection conn = ConnectionFactory.createConnection(conf); + RegionLocator regionLocator = conn.getRegionLocator(tableName)) { + byte[][] startKeys = regionLocator.getStartKeys(); + if (startKeys.length != reduceNum) { + throw new IOException("Region split after job initialization"); + } + KeyValueWritableComparable[] startKeyWraps = + new KeyValueWritableComparable[startKeys.length - 1]; + for (int i = 1; i < startKeys.length; ++i) { + startKeyWraps[i - 1] = + new KeyValueWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i])); + } + KeyValueWritableComparablePartitioner.START_KEYS = startKeyWraps; + } + } + } + /** * A mapper that just writes out KeyValues. */ @@ -455,7 +617,31 @@ public class Import extends Configured implements Tool { throw new IOException(e); } - if (hfileOutPath != null) { + if (hfileOutPath != null && conf.getBoolean(HAS_LARGE_RESULT, false)) { + LOG.info("Use Large Result!!"); + try (Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(tableName); + RegionLocator regionLocator = conn.getRegionLocator(tableName)) { + HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); + job.setMapperClass(KeyValueSortImporter.class); + job.setReducerClass(KeyValueReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputKeyClass(KeyValueWritableComparable.class); + job.setMapOutputValueClass(KeyValue.class); + job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class", + KeyValueWritableComparable.KeyValueWritableComparator.class, + RawComparator.class); + Path partitionsPath = + new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration())); + FileSystem fs = FileSystem.get(job.getConfiguration()); + fs.deleteOnExit(partitionsPath); + job.setPartitionerClass(KeyValueWritableComparablePartitioner.class); + job.setNumReduceTasks(regionLocator.getStartKeys().length); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), + com.google.common.base.Preconditions.class); + } + } else if (hfileOutPath != null) { LOG.info("writing to hfiles for bulk load."); job.setMapperClass(KeyValueImporter.class); try (Connection conn = ConnectionFactory.createConnection(conf); @@ -492,6 +678,9 @@ public class Import extends Configured implements Tool { System.err.println("By default Import will load data directly into HBase. To instead generate"); System.err.println("HFiles of data to prepare for a bulk data load, pass the option:"); System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); + System.err.println("If there is a large result that includes too much KeyValue " + + "whitch can occur OOME caused by the memery sort in reducer, pass the option:"); + System.err.println(" -D" + HAS_LARGE_RESULT + "=true"); System.err .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use"); System.err.println(" -D" + FILTER_CLASS_CONF_KEY + "=");