HBASE-1215 migration; fixes for jgray -- make HStoreFileToStoreFile implement Tool so can pass params
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@797230 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cb6db82e31
commit
a190929981
@ -23,6 +23,7 @@ import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
@ -37,29 +38,42 @@ import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
/**
|
||||
* Mapper that rewrites hbase 0.19 HStoreFiles as 0.20 StoreFiles.
|
||||
* Creates passed directories as input and output. On startup, it checks
|
||||
* filesystem is 0.19 generation. It then crawls the filesystem to find the
|
||||
* Creates passed directories as input and output. On startup, it does not
|
||||
* check filesystem is 0.19 generation just in case it fails part way so it
|
||||
* should be possible to rerun the MR job. It'll just fix the 0.19 regions
|
||||
* found.
|
||||
* If the input dir does not exist, it first crawls the filesystem to find the
|
||||
* files to migrate writing a file into the input directory. Next it starts up
|
||||
* the MR job to rewrite the 0.19 HStoreFiles as 0.20 StoreFiles deleting the
|
||||
* old as it goes. Presumption is that only
|
||||
* one file per in the family Store else stuff breaks; i.e. the 0.19 install
|
||||
* was major compacted before migration began.
|
||||
* was major compacted before migration began. If this job fails, fix why then
|
||||
* it should be possible to rerun the job. You may want to edit the
|
||||
* generated file in the input dir first.
|
||||
*/
|
||||
public class HStoreFileToStoreFile {
|
||||
public class HStoreFileToStoreFile extends Configured implements Tool {
|
||||
static final Log LOG = LogFactory.getLog(HStoreFileToStoreFile.class);
|
||||
public static final String JOBNAME = "hsf2sf";
|
||||
|
||||
HStoreFileToStoreFile() {
|
||||
super();
|
||||
}
|
||||
|
||||
public static class Map extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
|
||||
protected void map(LongWritable key, Text value, Mapper<LongWritable,Text, LongWritable, LongWritable>.Context context)
|
||||
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context)
|
||||
throws java.io.IOException, InterruptedException {
|
||||
HBaseConfiguration c = new HBaseConfiguration(context.getConfiguration());
|
||||
Migrate.rewrite(c, FileSystem.get(c), new Path(value.toString()));
|
||||
Path p = new Path(value.toString());
|
||||
context.setStatus(key.toString() + " " + p.toString());
|
||||
Migrate.rewrite(c, FileSystem.get(c), p);
|
||||
}
|
||||
}
|
||||
|
||||
@ -67,108 +81,107 @@ public class HStoreFileToStoreFile {
|
||||
final FileSystem fs, final Path dir)
|
||||
throws IOException {
|
||||
if (fs.exists(dir)) {
|
||||
throw new IOException("Input exists -- please specify empty input dir");
|
||||
LOG.warn("Input directory already exits. Using content for this MR job.");
|
||||
return;
|
||||
}
|
||||
FSDataOutputStream out = fs.create(new Path(dir, "mapfiles"));
|
||||
try {
|
||||
gathermapfiles(conf, fs, out);
|
||||
} finally {
|
||||
if (out != null) out.close();
|
||||
}
|
||||
gathermapfiles(conf, fs, dir);
|
||||
}
|
||||
|
||||
private static void gathermapfiles(final HBaseConfiguration conf,
|
||||
final FileSystem fs, final Path dir)
|
||||
final FileSystem fs, final FSDataOutputStream out)
|
||||
throws IOException {
|
||||
int index = 0;
|
||||
FSDataOutputStream out = getOut(fs, dir, index, null);
|
||||
try {
|
||||
// Presumes any directory under hbase.rootdir is a table.
|
||||
FileStatus [] tableDirs =
|
||||
fs.listStatus(FSUtils.getRootDir(conf), new DirFilter(fs));
|
||||
for (int i = 0; i < tableDirs.length; i++) {
|
||||
// Inside a table, there are compaction.dir directories to skip.
|
||||
// Otherwise, all else should be regions. Then in each region, should
|
||||
// only be family directories. Under each of these, should be a mapfile
|
||||
// and info directory and in these only one file.
|
||||
Path d = tableDirs[i].getPath();
|
||||
if (d.getName().equals(HConstants.HREGION_LOGDIR_NAME)) continue;
|
||||
FileStatus [] regionDirs = fs.listStatus(d, new DirFilter(fs));
|
||||
for (int j = 0; j < regionDirs.length; j++) {
|
||||
Path dd = regionDirs[j].getPath();
|
||||
if (dd.equals(HConstants.HREGION_COMPACTIONDIR_NAME)) continue;
|
||||
// Else its a region name. Now look in region for families.
|
||||
FileStatus [] familyDirs = fs.listStatus(dd, new DirFilter(fs));
|
||||
for (int k = 0; k < familyDirs.length; k++) {
|
||||
Path family = familyDirs[k].getPath();
|
||||
FileStatus [] infoAndMapfile = fs.listStatus(family);
|
||||
// Assert that only info and mapfile in family dir.
|
||||
if (infoAndMapfile.length != 0 && infoAndMapfile.length != 2) {
|
||||
LOG.warn(family.toString() +
|
||||
" has more than just info and mapfile: " + infoAndMapfile.length + ". Continuing...");
|
||||
// Presumes any directory under hbase.rootdir is a table.
|
||||
FileStatus [] tableDirs =
|
||||
fs.listStatus(FSUtils.getRootDir(conf), new DirFilter(fs));
|
||||
for (int i = 0; i < tableDirs.length; i++) {
|
||||
// Inside a table, there are compaction.dir directories to skip.
|
||||
// Otherwise, all else should be regions. Then in each region, should
|
||||
// only be family directories. Under each of these, should be a mapfile
|
||||
// and info directory and in these only one file.
|
||||
Path d = tableDirs[i].getPath();
|
||||
if (d.getName().equals(HConstants.HREGION_LOGDIR_NAME)) continue;
|
||||
FileStatus [] regionDirs = fs.listStatus(d, new DirFilter(fs));
|
||||
for (int j = 0; j < regionDirs.length; j++) {
|
||||
Path dd = regionDirs[j].getPath();
|
||||
if (dd.equals(HConstants.HREGION_COMPACTIONDIR_NAME)) continue;
|
||||
// Else its a region name. Now look in region for families.
|
||||
FileStatus [] familyDirs = fs.listStatus(dd, new DirFilter(fs));
|
||||
for (int k = 0; k < familyDirs.length; k++) {
|
||||
Path family = familyDirs[k].getPath();
|
||||
FileStatus [] infoAndMapfile = fs.listStatus(family);
|
||||
// Assert that only info and mapfile in family dir.
|
||||
if (infoAndMapfile.length != 0 && infoAndMapfile.length != 2) {
|
||||
LOG.warn(family.toString() + " has more than just info and mapfile: " +
|
||||
infoAndMapfile.length + ". Continuing...");
|
||||
continue;
|
||||
}
|
||||
// Make sure directory named info or mapfile.
|
||||
for (int ll = 0; ll < 2; ll++) {
|
||||
if (infoAndMapfile[ll].getPath().getName().equals("info") ||
|
||||
infoAndMapfile[ll].getPath().getName().equals("mapfiles"))
|
||||
continue;
|
||||
}
|
||||
// Make sure directory named info or mapfile.
|
||||
for (int ll = 0; ll < 2; ll++) {
|
||||
if (infoAndMapfile[ll].getPath().getName().equals("info") ||
|
||||
infoAndMapfile[ll].getPath().getName().equals("mapfiles"))
|
||||
continue;
|
||||
LOG.warn("Unexpected directory name: " +
|
||||
infoAndMapfile[ll].getPath() + ". Continuing...");
|
||||
continue;
|
||||
}
|
||||
// Now in family, there are 'mapfile' and 'info' subdirs. Just
|
||||
// look in the 'mapfile' subdir.
|
||||
FileStatus [] familyStatus =
|
||||
fs.listStatus(new Path(family, "mapfiles"));
|
||||
if (familyStatus.length > 1) {
|
||||
LOG.warn(family.toString() + " has " + familyStatus.length +
|
||||
" files. Continuing...");
|
||||
continue;
|
||||
}
|
||||
if (familyStatus.length == 1) {
|
||||
// If we got here, then this is good. Add the mapfile to out
|
||||
String str = familyStatus[0].getPath().makeQualified(fs).toString();
|
||||
LOG.info(str);
|
||||
out.write(Bytes.toBytes(str + "\n"));
|
||||
if (index++ % 100 == 0) {
|
||||
if (index != 0) {
|
||||
out = getOut(fs, dir, index, out);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Empty store " + family.toString());
|
||||
}
|
||||
LOG.warn("Unexpected directory name: " +
|
||||
infoAndMapfile[ll].getPath() + ". Continuing...");
|
||||
continue;
|
||||
}
|
||||
// Now in family, there are 'mapfile' and 'info' subdirs. Just
|
||||
// look in the 'mapfile' subdir.
|
||||
Path mfsdir = new Path(family, "mapfiles");
|
||||
FileStatus [] familyStatus = fs.listStatus(mfsdir);
|
||||
if (familyStatus.length > 1) {
|
||||
LOG.warn(family.toString() + " has " + familyStatus.length +
|
||||
" files. Continuing...");
|
||||
continue;
|
||||
}
|
||||
if (familyStatus.length == 1) {
|
||||
// If we got here, then this is good. Add the mapfile to out
|
||||
String str = familyStatus[0].getPath().makeQualified(fs).toString();
|
||||
LOG.info(str);
|
||||
out.write(Bytes.toBytes(str + "\n"));
|
||||
} else {
|
||||
// Special case. Empty region. Remove the mapfiles and info dirs.
|
||||
Path infodir = new Path(family, "info");
|
||||
LOG.info("Removing " + mfsdir + " and " + infodir + " because empty");
|
||||
fs.delete(mfsdir, true);
|
||||
fs.delete(infodir, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
out.close();
|
||||
}
|
||||
}
|
||||
|
||||
private static FSDataOutputStream getOut(final FileSystem fs, final Path dir,
|
||||
final int index, FSDataOutputStream out)
|
||||
throws IOException {
|
||||
if (out == null) out.close();
|
||||
return fs.create(new Path(dir, "mapfiles-" + index));
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
HBaseConfiguration conf = new HBaseConfiguration();
|
||||
String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
|
||||
if (otherArgs.length < 2) {
|
||||
System.err.println("ERROR: Wrong number of parameters: " + args.length);
|
||||
System.err.println("Usage: " + HStoreFileToStoreFile.class.getName() +
|
||||
" <inputdir> <output>");
|
||||
System.exit(-1);
|
||||
public int run(final String[] args) throws Exception {
|
||||
if (args.length < 2) {
|
||||
System.err.println("ERROR: Wrong number of arguments: " + args.length);
|
||||
System.err.println("Usage: " + getClass().getSimpleName() +
|
||||
" <inputdir> <outputdir>");
|
||||
ToolRunner.printGenericCommandUsage(System.err);
|
||||
return -1;
|
||||
}
|
||||
Path input = new Path(args[0]);
|
||||
HBaseConfiguration conf = (HBaseConfiguration)getConf();
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
writeInputFiles(conf, fs, input);
|
||||
Job job = new Job(conf);
|
||||
job.setJarByClass(HStoreFileToStoreFile.class);
|
||||
job.setJobName(JOBNAME);
|
||||
job.setInputFormatClass(TextInputFormat.class);
|
||||
job.setMapperClass(Map.class);
|
||||
job.setNumReduceTasks(0);
|
||||
TextInputFormat.setInputPaths(job, input);
|
||||
FileInputFormat.addInputPath(job, input);
|
||||
Path output = new Path(args[1]);
|
||||
FileOutputFormat.setOutputPath(job, output);
|
||||
System.exit(job.waitForCompletion(true) ? 0 : 1);
|
||||
return job.waitForCompletion(true) ? 0 : 1;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
int exitCode = ToolRunner.run(new HBaseConfiguration(),
|
||||
new HStoreFileToStoreFile(), args);
|
||||
System.exit(exitCode);
|
||||
}
|
||||
}
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.io.hfile.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
|
||||
import org.apache.hadoop.hbase.migration.nineteen.io.BloomFilterMapFile;
|
||||
@ -86,7 +87,6 @@ import org.apache.hadoop.util.ToolRunner;
|
||||
*/
|
||||
public class Migrate extends Configured implements Tool {
|
||||
private static final Log LOG = LogFactory.getLog(Migrate.class);
|
||||
private final HBaseConfiguration conf;
|
||||
private FileSystem fs;
|
||||
boolean migrationNeeded = false;
|
||||
boolean check = false;
|
||||
@ -100,26 +100,24 @@ public class Migrate extends Configured implements Tool {
|
||||
private static final String MIGRATION_LINK =
|
||||
" See http://wiki.apache.org/hadoop/Hbase/HowToMigrate for more information.";
|
||||
|
||||
/** default constructor */
|
||||
public Migrate() {
|
||||
this(new HBaseConfiguration());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param conf
|
||||
*/
|
||||
public Migrate(HBaseConfiguration conf) {
|
||||
super(conf);
|
||||
this.conf = conf;
|
||||
public Migrate() {
|
||||
super();
|
||||
}
|
||||
|
||||
|
||||
public Migrate(final HBaseConfiguration c) {
|
||||
super(c);
|
||||
}
|
||||
|
||||
/*
|
||||
* Sets the hbase rootdir as fs.default.name.
|
||||
* @return True if succeeded.
|
||||
*/
|
||||
private boolean setFsDefaultName() {
|
||||
// Validate root directory path
|
||||
Path rd = new Path(conf.get(HConstants.HBASE_DIR));
|
||||
Path rd = new Path(getConf().get(HConstants.HBASE_DIR));
|
||||
try {
|
||||
// Validate root directory path
|
||||
FSUtils.validateRootPath(rd);
|
||||
@ -129,7 +127,7 @@ public class Migrate extends Configured implements Tool {
|
||||
" configuration parameter '" + HConstants.HBASE_DIR + "'", e);
|
||||
return false;
|
||||
}
|
||||
this.conf.set("fs.default.name", rd.toString());
|
||||
getConf().set("fs.default.name", rd.toString());
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -139,7 +137,7 @@ public class Migrate extends Configured implements Tool {
|
||||
private boolean verifyFilesystem() {
|
||||
try {
|
||||
// Verify file system is up.
|
||||
fs = FileSystem.get(conf); // get DFS handle
|
||||
fs = FileSystem.get(getConf()); // get DFS handle
|
||||
LOG.info("Verifying that file system is available..");
|
||||
FSUtils.checkFileSystemAvailable(fs);
|
||||
return true;
|
||||
@ -154,7 +152,7 @@ public class Migrate extends Configured implements Tool {
|
||||
LOG.info("Verifying that HBase is not running...." +
|
||||
"Trys ten times to connect to running master");
|
||||
try {
|
||||
HBaseAdmin.checkHBaseAvailable(conf);
|
||||
HBaseAdmin.checkHBaseAvailable((HBaseConfiguration)getConf());
|
||||
LOG.fatal("HBase cluster must be off-line.");
|
||||
return false;
|
||||
} catch (MasterNotRunningException e) {
|
||||
@ -177,7 +175,8 @@ public class Migrate extends Configured implements Tool {
|
||||
LOG.info("Starting upgrade" + (check ? " check" : ""));
|
||||
|
||||
// See if there is a file system version file
|
||||
String versionStr = FSUtils.getVersion(fs, FSUtils.getRootDir(this.conf));
|
||||
String versionStr = FSUtils.getVersion(fs,
|
||||
FSUtils.getRootDir((HBaseConfiguration)getConf()));
|
||||
if (versionStr == null) {
|
||||
throw new IOException("File system version file " +
|
||||
HConstants.VERSION_FILE_NAME +
|
||||
@ -202,7 +201,7 @@ public class Migrate extends Configured implements Tool {
|
||||
if (!check) {
|
||||
// Set file system version
|
||||
LOG.info("Setting file system version.");
|
||||
FSUtils.setVersion(fs, FSUtils.getRootDir(this.conf));
|
||||
FSUtils.setVersion(fs, FSUtils.getRootDir((HBaseConfiguration)getConf()));
|
||||
LOG.info("Upgrade successful.");
|
||||
} else if (this.migrationNeeded) {
|
||||
LOG.info("Upgrade needed.");
|
||||
@ -220,7 +219,7 @@ public class Migrate extends Configured implements Tool {
|
||||
return;
|
||||
}
|
||||
// Before we start, make sure all is major compacted.
|
||||
Path hbaseRootDir = new Path(conf.get(HConstants.HBASE_DIR));
|
||||
Path hbaseRootDir = new Path(getConf().get(HConstants.HBASE_DIR));
|
||||
boolean pre020 = FSUtils.isPre020FileLayout(fs, hbaseRootDir);
|
||||
if (pre020) {
|
||||
LOG.info("Checking pre020 filesystem is major compacted");
|
||||
@ -244,7 +243,7 @@ public class Migrate extends Configured implements Tool {
|
||||
throw new IOException(msg);
|
||||
}
|
||||
}
|
||||
final MetaUtils utils = new MetaUtils(this.conf);
|
||||
final MetaUtils utils = new MetaUtils((HBaseConfiguration)getConf());
|
||||
final List<HRegionInfo> metas = new ArrayList<HRegionInfo>();
|
||||
try {
|
||||
// Rewrite root.
|
||||
@ -334,12 +333,20 @@ public class Migrate extends Configured implements Tool {
|
||||
if (mfs.length > 1) {
|
||||
throw new IOException("Should only be one directory in: " + mfdir);
|
||||
}
|
||||
rewrite(this.conf, this.fs, mfs[0].getPath());
|
||||
if (mfs.length == 0) {
|
||||
// Special case. Empty region. Remove the mapfiles and info dirs.
|
||||
Path infodir = new Path(family, "info");
|
||||
LOG.info("Removing " + mfdir + " and " + infodir + " because empty");
|
||||
fs.delete(mfdir, true);
|
||||
fs.delete(infodir, true);
|
||||
} else {
|
||||
rewrite((HBaseConfiguration)getConf(), this.fs, mfs[0].getPath());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Rewrite the passed 0.19 mapfile as a 0.20 file.
|
||||
* @param fs
|
||||
@ -359,11 +366,12 @@ public class Migrate extends Configured implements Tool {
|
||||
Integer.parseInt(regiondir.getName()),
|
||||
Bytes.toBytes(familydir.getName()), Long.parseLong(mf.getName()), null);
|
||||
BloomFilterMapFile.Reader src = hsf.getReader(fs, false, false);
|
||||
HFile.Writer tgt = StoreFile.getWriter(fs, familydir);
|
||||
HFile.Writer tgt = StoreFile.getWriter(fs, familydir,
|
||||
conf.getInt("hfile.min.blocksize.size", 64*1024),
|
||||
Compression.Algorithm.NONE, getComparator(basedir));
|
||||
// From old 0.19 HLogEdit.
|
||||
ImmutableBytesWritable deleteBytes =
|
||||
new ImmutableBytesWritable("HBASE::DELETEVAL".getBytes("UTF-8"));
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
HStoreKey key = new HStoreKey();
|
||||
@ -398,6 +406,13 @@ public class Migrate extends Configured implements Tool {
|
||||
}
|
||||
}
|
||||
|
||||
private static KeyValue.KeyComparator getComparator(final Path tabledir) {
|
||||
String tablename = tabledir.getName();
|
||||
return tablename.equals("-ROOT-")? KeyValue.META_KEY_COMPARATOR:
|
||||
tablename.equals(".META.")? KeyValue.META_KEY_COMPARATOR:
|
||||
KeyValue.KEY_COMPARATOR;
|
||||
}
|
||||
|
||||
/*
|
||||
* Enable blockcaching on catalog tables.
|
||||
* @param mr
|
||||
@ -523,11 +538,11 @@ public class Migrate extends Configured implements Tool {
|
||||
public static void main(String[] args) {
|
||||
int status = 0;
|
||||
try {
|
||||
status = ToolRunner.run(new Migrate(), args);
|
||||
status = ToolRunner.run(new HBaseConfiguration(), new Migrate(), args);
|
||||
} catch (Exception e) {
|
||||
LOG.error(e);
|
||||
status = -1;
|
||||
}
|
||||
System.exit(status);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user