HBASE-13625 Use HDFS for HFileOutputFormat2 partitioner's path (Stephen Yuan Jiang)

This commit is contained in:
tedyu 2015-05-06 07:29:57 -07:00
parent ad8f1d076f
commit ec3d7189ad
5 changed files with 24 additions and 4 deletions

View File

@ -37,6 +37,6 @@ public class SecureBulkLoadUtil {
} }
public static Path getBaseStagingDir(Configuration conf) { public static Path getBaseStagingDir(Configuration conf) {
return new Path(conf.get(BULKLOAD_STAGING_DIR, "/tmp/hbase-staging")); return new Path(conf.get(BULKLOAD_STAGING_DIR));
} }
} }

View File

@ -62,6 +62,20 @@ possible configurations would overwhelm and obscure the important.
so change this configuration or else all data will be lost on so change this configuration or else all data will be lost on
machine restart.</description> machine restart.</description>
</property> </property>
<property >
<name>hbase.fs.tmp.dir</name>
<value>/user/${user.name}/hbase-staging</value>
<description>A staging directory in default file system (HDFS)
for keeping temporary data.
</description>
</property>
<property >
<name>hbase.bulkload.staging.dir</name>
<value>${hbase.fs.tmp.dir}</value>
<description>A staging directory in default file system (HDFS)
for bulk loading.
</description>
</property>
<property > <property >
<name>hbase.cluster.distributed</name> <name>hbase.cluster.distributed</name>
<value>false</value> <value>false</value>

View File

@ -588,7 +588,7 @@ public class HFileOutputFormat2
Configuration conf = job.getConfiguration(); Configuration conf = job.getConfiguration();
// create the partitions file // create the partitions file
FileSystem fs = FileSystem.get(conf); FileSystem fs = FileSystem.get(conf);
Path partitionsPath = new Path(conf.get("hadoop.tmp.dir"), "partitions_" + UUID.randomUUID()); Path partitionsPath = new Path(conf.get("hbase.fs.tmp.dir"), "partitions_" + UUID.randomUUID());
fs.makeQualified(partitionsPath); fs.makeQualified(partitionsPath);
writePartitions(conf, partitionsPath, splitPoints); writePartitions(conf, partitionsPath, splitPoints);
fs.deleteOnExit(partitionsPath); fs.deleteOnExit(partitionsPath);

View File

@ -333,7 +333,9 @@ public class TestHFileOutputFormat {
@Test @Test
public void testJobConfiguration() throws Exception { public void testJobConfiguration() throws Exception {
Job job = new Job(util.getConfiguration()); Configuration conf = new Configuration(this.util.getConfiguration());
conf.set("hbase.fs.tmp.dir", util.getDataTestDir("testJobConfiguration").toString());
Job job = new Job(conf);
job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
HTableDescriptor tableDescriptor = Mockito.mock(HTableDescriptor.class); HTableDescriptor tableDescriptor = Mockito.mock(HTableDescriptor.class);
RegionLocator regionLocator = Mockito.mock(RegionLocator.class); RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
@ -820,6 +822,7 @@ public class TestHFileOutputFormat {
// We turn off the sequence file compression, because DefaultCodec // We turn off the sequence file compression, because DefaultCodec
// pollutes the GZip codec pool with an incompatible compressor. // pollutes the GZip codec pool with an incompatible compressor.
conf.set("io.seqfile.compression.type", "NONE"); conf.set("io.seqfile.compression.type", "NONE");
conf.set("hbase.fs.tmp.dir", dir.toString());
Job job = new Job(conf, "testLocalMRIncrementalLoad"); Job job = new Job(conf, "testLocalMRIncrementalLoad");
job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
setupRandomGeneratorMapper(job); setupRandomGeneratorMapper(job);

View File

@ -337,7 +337,9 @@ public class TestHFileOutputFormat2 {
@Test @Test
public void testJobConfiguration() throws Exception { public void testJobConfiguration() throws Exception {
Job job = new Job(util.getConfiguration()); Configuration conf = new Configuration(this.util.getConfiguration());
conf.set("hbase.fs.tmp.dir", util.getDataTestDir("testJobConfiguration").toString());
Job job = new Job(conf);
job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
RegionLocator regionLocator = Mockito.mock(RegionLocator.class); RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
setupMockStartKeys(regionLocator); setupMockStartKeys(regionLocator);
@ -822,6 +824,7 @@ public class TestHFileOutputFormat2 {
// We turn off the sequence file compression, because DefaultCodec // We turn off the sequence file compression, because DefaultCodec
// pollutes the GZip codec pool with an incompatible compressor. // pollutes the GZip codec pool with an incompatible compressor.
conf.set("io.seqfile.compression.type", "NONE"); conf.set("io.seqfile.compression.type", "NONE");
conf.set("hbase.fs.tmp.dir", dir.toString());
Job job = new Job(conf, "testLocalMRIncrementalLoad"); Job job = new Job(conf, "testLocalMRIncrementalLoad");
job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
setupRandomGeneratorMapper(job); setupRandomGeneratorMapper(job);