HBASE-4285 partitions file created in user's home directory by importtsv

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1457078 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-03-15 19:45:09 +00:00
parent afa31acc64
commit 167766c262
2 changed files with 47 additions and 28 deletions

View File

@ -5,6 +5,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.Set; import java.util.Set;
@ -15,6 +16,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.IntegrationTestingUtility;
@ -25,6 +27,7 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
@ -137,6 +140,18 @@ public class IntegrationTestImportTsv implements Configurable, Tool {
} }
} }
/**
* Confirm the absence of the {@link TotalOrderPartitioner} partitions file.
*/
protected static void validateDeletedPartitionsFile(Configuration conf) throws IOException {
if (!conf.getBoolean(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, false))
return;
FileSystem fs = FileSystem.get(conf);
Path partitionsFile = new Path(TotalOrderPartitioner.getPartitionFile(conf));
assertFalse("Failed to clean up partitions file.", fs.exists(partitionsFile));
}
@Test @Test
public void testGenerateAndLoad() throws Exception { public void testGenerateAndLoad() throws Exception {
String table = NAME + "-" + UUID.randomUUID(); String table = NAME + "-" + UUID.randomUUID();
@ -155,8 +170,13 @@ public class IntegrationTestImportTsv implements Configurable, Tool {
// run the job, complete the load. // run the job, complete the load.
util.createTable(table, cf); util.createTable(table, cf);
TestImportTsv.doMROnTableTest(util, cf, simple_tsv, args); Tool t = TestImportTsv.doMROnTableTest(util, cf, simple_tsv, args);
doLoadIncrementalHFiles(hfiles, table); doLoadIncrementalHFiles(hfiles, table);
// validate post-conditions
validateDeletedPartitionsFile(t.getConf());
// clean up after ourselves.
util.deleteTable(table); util.deleteTable(table);
util.cleanupDataTestDirOnTestFS(table); util.cleanupDataTestDirOnTestFS(table);
} }

View File

@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.util.ArrayList; import java.util.ArrayList;
@ -37,7 +35,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
@ -54,9 +51,9 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
@ -267,13 +264,12 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
} }
/** /**
* Write out a SequenceFile that can be read by TotalOrderPartitioner * Write out a {@link SequenceFile} that can be read by
* that contains the split points in startKeys. * {@link TotalOrderPartitioner} that contains the split points in startKeys.
* @param partitionsPath output path for SequenceFile
* @param startKeys the region start keys
*/ */
private static void writePartitions(Configuration conf, Path partitionsPath, private static void writePartitions(Configuration conf, Path partitionsPath,
List<ImmutableBytesWritable> startKeys) throws IOException { List<ImmutableBytesWritable> startKeys) throws IOException {
LOG.info("Writing partition information to " + partitionsPath);
if (startKeys.isEmpty()) { if (startKeys.isEmpty()) {
throw new IllegalArgumentException("No regions passed"); throw new IllegalArgumentException("No regions passed");
} }
@ -325,7 +321,6 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
throws IOException { throws IOException {
Configuration conf = job.getConfiguration(); Configuration conf = job.getConfiguration();
job.setPartitionerClass(TotalOrderPartitioner.class);
job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class); job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(HFileOutputFormat.class); job.setOutputFormatClass(HFileOutputFormat.class);
@ -341,29 +336,14 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
} }
// Use table's region boundaries for TOP split points.
LOG.info("Looking up current regions for table " + table); LOG.info("Looking up current regions for table " + table);
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table); List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
LOG.info("Configuring " + startKeys.size() + " reduce partitions " + LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
"to match current region count"); "to match current region count");
job.setNumReduceTasks(startKeys.size()); job.setNumReduceTasks(startKeys.size());
Path partitionsPath = new Path(job.getWorkingDirectory(), configurePartitioner(job, startKeys);
"partitions_" + UUID.randomUUID());
LOG.info("Writing partition information to " + partitionsPath);
FileSystem fs = partitionsPath.getFileSystem(conf);
writePartitions(conf, partitionsPath, startKeys);
partitionsPath.makeQualified(fs);
URI cacheUri;
try {
cacheUri = new URI(partitionsPath.toString() + "#" + TotalOrderPartitioner.DEFAULT_PATH);
} catch (URISyntaxException e) {
throw new IOException(e);
}
DistributedCache.addCacheFile(cacheUri, conf);
DistributedCache.createSymlink(conf);
// Set compression algorithms based on column families // Set compression algorithms based on column families
configureCompression(table, conf); configureCompression(table, conf);
configureBloomType(table, conf); configureBloomType(table, conf);
@ -415,7 +395,26 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
} }
return confValMap; return confValMap;
} }
/**
* Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
* <code>splitPoints</code>. Cleans up the partitions file after job exists.
*/
static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints)
throws IOException {
// create the partitions file
FileSystem fs = FileSystem.get(job.getConfiguration());
Path partitionsPath = new Path("/tmp", "partitions_" + UUID.randomUUID());
fs.makeQualified(partitionsPath);
fs.deleteOnExit(partitionsPath);
writePartitions(job.getConfiguration(), partitionsPath, splitPoints);
// configure job to use it
job.setPartitionerClass(TotalOrderPartitioner.class);
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath);
}
/** /**
* Serialize column family to compression algorithm map to configuration. * Serialize column family to compression algorithm map to configuration.
* Invoked while configuring the MR job for incremental load. * Invoked while configuring the MR job for incremental load.