HBASE-12596 bulkload needs to follow locality (Victor Xu)

This commit is contained in:
tedyu 2015-07-09 07:24:18 -07:00
parent d07ff5ec5a
commit f8eaa98962
3 changed files with 156 additions and 31 deletions

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.ArrayList;
@ -41,11 +42,16 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@ -109,6 +115,15 @@ public class HFileOutputFormat2
* Keep locality while generating HFiles for bulkload. See HBASE-12596
public static final String LOCALITY_SENSITIVE_CONF_KEY =
private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
private static final String OUTPUT_TABLE_NAME_CONF_KEY =
public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
final TaskAttemptContext context) throws IOException, InterruptedException {
@ -192,7 +207,48 @@ public class HFileOutputFormat2
// create a new WAL writer, if necessary
if (wl == null || wl.writer == null) {
wl = getNewWriter(family, conf);
HRegionLocation loc = null;
String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
try (Connection connection = ConnectionFactory.createConnection(conf);
RegionLocator locator =
connection.getRegionLocator(TableName.valueOf(tableName))) {
loc = locator.getRegionLocation(rowKey);
} catch (Throwable e) {
LOG.warn("there's something wrong when locating rowkey: " +
Bytes.toString(rowKey), e);
loc = null;
if (null == loc) {
if (LOG.isTraceEnabled()) {
LOG.trace("failed to get region location, so use default writer: " +
wl = getNewWriter(family, conf, null);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
InetSocketAddress initialIsa =
new InetSocketAddress(loc.getHostname(), loc.getPort());
if (initialIsa.isUnresolved()) {
if (LOG.isTraceEnabled()) {
LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
+ loc.getPort() + ", so use default writer");
wl = getNewWriter(family, conf, null);
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa });
} else {
wl = getNewWriter(family, conf, null);
// we now have the proper WAL writer. full steam ahead
@ -224,8 +280,8 @@ public class HFileOutputFormat2
justification="Not important")
private WriterLength getNewWriter(byte[] family, Configuration conf)
throws IOException {
private WriterLength getNewWriter(byte[] family, Configuration conf,
InetSocketAddress[] favoredNodes) throws IOException {
WriterLength wl = new WriterLength();
Path familydir = new Path(outputdir, Bytes.toString(family));
Algorithm compression = compressionMap.get(family);
@ -247,10 +303,18 @@ public class HFileOutputFormat2
HFileContext hFileContext = contextBuilder.build();
wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
if (null == favoredNodes) {
wl.writer =
new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
} else {
wl.writer =
new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
this.writers.put(family, wl);
return wl;
@ -431,6 +495,12 @@ public class HFileOutputFormat2
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
// record this table name for creating writer by favored nodes
LOG.info("bulkload locality sensitive enabled");
conf.set(OUTPUT_TABLE_NAME_CONF_KEY, regionLocator.getName().getNameAsString());
// Use table's region boundaries for TOP split points.
LOG.info("Looking up current regions for table " + regionLocator.getName());
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);

View File

@ -342,6 +342,7 @@ public class TestHFileOutputFormat {
HTableDescriptor tableDescriptor = Mockito.mock(HTableDescriptor.class);
RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator);
assertEquals(job.getNumReduceTasks(), 4);
@ -796,6 +797,11 @@ public class TestHFileOutputFormat {
private void setupMockTableName(RegionLocator table) throws IOException {
TableName mockTableName = TableName.valueOf("mock_table");
* Test that {@link HFileOutputFormat} RecordWriter uses compression and
* bloom filter settings from the column family descriptor
@ -825,6 +831,9 @@ public class TestHFileOutputFormat {
// pollutes the GZip codec pool with an incompatible compressor.
conf.set("io.seqfile.compression.type", "NONE");
conf.set("hbase.fs.tmp.dir", dir.toString());
// turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
Job job = new Job(conf, "testLocalMRIncrementalLoad");

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HadoopShims;
import org.apache.hadoop.hbase.KeyValue;
@ -56,7 +57,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HRegionLocator;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -345,6 +346,7 @@ public class TestHFileOutputFormat2 {
Table table = Mockito.mock(Table.class);
RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
assertEquals(job.getNumReduceTasks(), 4);
@ -374,41 +376,64 @@ public class TestHFileOutputFormat2 {
public void testMRIncrementalLoad() throws Exception {
LOG.info("\nStarting test testMRIncrementalLoad\n");
doIncrementalLoadTest(false, false);
public void testMRIncrementalLoadWithSplit() throws Exception {
LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
doIncrementalLoadTest(true, false);
private void doIncrementalLoadTest(
boolean shouldChangeRegions) throws Exception {
* Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true
* This test could only check the correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY
* is set to true. Because MiniHBaseCluster always run with single hostname (and different ports),
* it's not possible to check the region locality by comparing region locations and DN hostnames.
* When MiniHBaseCluster supports explicit hostnames parameter (just like MiniDFSCluster does),
* we could test region locality features more easily.
public void testMRIncrementalLoadWithLocality() throws Exception {
LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
doIncrementalLoadTest(false, true);
doIncrementalLoadTest(true, true);
private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality)
throws Exception {
util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration();
byte[][] splitKeys = generateRandomSplitKeys(4);
conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
int hostCount = 1;
int regionNum = 5;
if(shouldKeepLocality) {
// We should change host count higher than hdfs replica count when MiniHBaseCluster supports
// explicit hostnames parameter just like MiniDFSCluster does.
hostCount = 3;
regionNum = 20;
byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
String[] hostnames = new String[hostCount];
for(int i = 0; i < hostCount; ++i) {
hostnames[i] = "datanode_" + i;
util.startMiniCluster(1, hostCount, hostnames);
Table table = util.createTable(TABLE_NAME, FAMILIES, splitKeys);
try (RegionLocator r = util.getConnection().getRegionLocator(TABLE_NAME)) {
Admin admin = util.getConnection().getAdmin();
Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
assertEquals("Should start with empty table",
0, util.countRows(table));
int numRegions;
numRegions = r.getStartKeys().length;
assertEquals("Should make 5 regions", numRegions, 5);
try (RegionLocator r = util.getConnection().getRegionLocator(TABLE_NAME); Admin admin =
util.getConnection().getAdmin();) {
assertEquals("Should start with empty table", 0, util.countRows(table));
int numRegions = r.getStartKeys().length;
assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
// Generate the bulk load files
runIncrementalPELoad(conf, table.getTableDescriptor(), r, testDir);
// This doesn't write into the table, just makes files
assertEquals("HFOF should not touch actual table",
0, util.countRows(table));
assertEquals("HFOF should not touch actual table", 0, util.countRows(table));
// Make sure that a directory was created for every CF
int dir = 0;
@ -462,6 +487,17 @@ public class TestHFileOutputFormat2 {
String tableDigestBefore = util.checksumRows(table);
// Check region locality
HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
for (HRegion region : util.getHBaseCluster().getRegions(TABLE_NAME)) {
for (String hostname : hostnames) {
float locality = hbd.getBlockLocalityIndex(hostname);
LOG.info("locality of [" + hostname + "]: " + locality);
assertEquals(100, (int) (locality * 100));
// Cause regions to reopen
while (!admin.isTableDisabled(TABLE_NAME)) {
@ -473,6 +509,8 @@ public class TestHFileOutputFormat2 {
assertEquals("Data should remain after reopening of regions",
tableDigestBefore, util.checksumRows(table));
} finally {
testDir.getFileSystem(conf).delete(testDir, true);
@ -800,6 +838,11 @@ public class TestHFileOutputFormat2 {
private void setupMockTableName(RegionLocator table) throws IOException {
TableName mockTableName = TableName.valueOf("mock_table");
* Test that {@link HFileOutputFormat2} RecordWriter uses compression and
* bloom filter settings from the column family descriptor
@ -829,6 +872,9 @@ public class TestHFileOutputFormat2 {
// pollutes the GZip codec pool with an incompatible compressor.
conf.set("io.seqfile.compression.type", "NONE");
conf.set("hbase.fs.tmp.dir", dir.toString());
// turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
Job job = new Job(conf, "testLocalMRIncrementalLoad");