HBASE-10660 MR over snapshots can OOM when alternative blockcache is enabled
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1575454 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7f95fccd06
commit
f032023ea5
|
@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.catalog.MetaReader;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
@ -301,6 +302,17 @@ public class TableMapReduceUtil {
|
|||
initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
|
||||
outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
|
||||
|
||||
/*
|
||||
* Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on
|
||||
* direct memory will likely cause the map tasks to OOM when opening the region. This
|
||||
* is done here instead of in TableSnapshotRegionRecordReader in case an advanced user
|
||||
* wants to override this behavior in their job.
|
||||
*/
|
||||
job.getConfiguration().setFloat(
|
||||
HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
|
||||
job.getConfiguration().setFloat("hbase.offheapcache.percentage", 0f);
|
||||
job.getConfiguration().setFloat("hbase.bucketcache.size", 0f);
|
||||
|
||||
// We would need even more libraries that hbase-server depends on
|
||||
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Counter.class);
|
||||
}
|
||||
|
|
|
@ -231,6 +231,8 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
|
|||
// region is immutable, this should be fine,
|
||||
// otherwise we have to set the thread read point
|
||||
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
|
||||
// disable caching of data blocks
|
||||
scan.setCacheBlocks(false);
|
||||
|
||||
scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
|
||||
if (context != null) {
|
||||
|
|
|
@ -34,6 +34,11 @@ import org.junit.experimental.categories.Category;
|
|||
@Category(SmallTests.class)
|
||||
public class TestTableMapReduceUtil {
|
||||
|
||||
/*
|
||||
* initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because
|
||||
* the method depends on an online cluster.
|
||||
*/
|
||||
|
||||
@Test
|
||||
public void testInitTableMapperJob1() throws Exception {
|
||||
Configuration configuration = new Configuration();
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
|
@ -42,6 +43,9 @@ import org.apache.hadoop.hbase.client.HTable;
|
|||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
|
||||
|
@ -55,6 +59,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|||
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
|
@ -201,6 +206,40 @@ public class TestTableSnapshotInputFormat {
|
|||
table.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInitTableSnapshotMapperJobConfig() throws Exception {
|
||||
setupCluster();
|
||||
TableName tableName = TableName.valueOf("testInitTableSnapshotMapperJobConfig");
|
||||
String snapshotName = "foo";
|
||||
|
||||
try {
|
||||
createTableAndSnapshot(UTIL, tableName, snapshotName, 1);
|
||||
Job job = new Job(UTIL.getConfiguration());
|
||||
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
|
||||
|
||||
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
|
||||
new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
|
||||
NullWritable.class, job, false, tmpTableDir);
|
||||
|
||||
// TODO: would be better to examine directly the cache instance that results from this
|
||||
// config. Currently this is not possible because BlockCache initialization is static.
|
||||
Assert.assertEquals(
|
||||
"Snapshot job should be configured for default LruBlockCache.",
|
||||
HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
|
||||
job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
|
||||
Assert.assertEquals(
|
||||
"Snapshot job should not use SlabCache.",
|
||||
0, job.getConfiguration().getFloat("hbase.offheapcache.percentage", -1), 0.01);
|
||||
Assert.assertEquals(
|
||||
"Snapshot job should not use BucketCache.",
|
||||
0, job.getConfiguration().getFloat("hbase.bucketcache.size", -1), 0.01);
|
||||
} finally {
|
||||
UTIL.getHBaseAdmin().deleteSnapshot(snapshotName);
|
||||
UTIL.deleteTable(tableName);
|
||||
tearDownCluster();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithMockedMapReduceSingleRegion() throws Exception {
|
||||
testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1);
|
||||
|
|
Loading…
Reference in New Issue