HBASE-9311 Create a migration script that will move data from 0.94.x to 0.96

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1516648 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-08-22 22:46:14 +00:00
parent 40f22a959b
commit 08247bfe76
8 changed files with 862 additions and 82 deletions

View File

@ -77,6 +77,7 @@ if [ $# = 0 ]; then
echo " hlog write-ahead-log analyzer"
echo " hfile store file analyzer"
echo " zkcli run the ZooKeeper shell"
echo " upgrade upgrade hbase"
echo ""
echo "PROCESS MANAGEMENT"
echo " master run an HBase HMaster node"
@ -271,6 +272,8 @@ elif [ "$COMMAND" = "hfile" ] ; then
CLASS='org.apache.hadoop.hbase.io.hfile.HFile'
elif [ "$COMMAND" = "zkcli" ] ; then
CLASS="org.apache.hadoop.hbase.zookeeper.ZooKeeperMainServer"
elif [ "$COMMAND" = "upgrade" ] ; then
CLASS="org.apache.hadoop.hbase.migration.UpgradeTo96"
elif [ "$COMMAND" = "master" ] ; then
CLASS='org.apache.hadoop.hbase.master.HMaster'
if [ "$1" != "stop" ] ; then

View File

@ -54,7 +54,7 @@ public abstract class ReplicationStateZKBase {
// Public for testing
public static final byte[] ENABLED_ZNODE_BYTES =
toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
protected static final byte[] DISABLED_ZNODE_BYTES =
public static final byte[] DISABLED_ZNODE_BYTES =
toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf,

View File

@ -105,6 +105,7 @@ public class NamespaceUpgrade implements Tool {
public void init() throws IOException {
this.rootDir = FSUtils.getRootDir(conf);
FSUtils.setFsDefault(getConf(), rootDir);
this.fs = FileSystem.get(conf);
Path tmpDataDir = new Path(rootDir, TMP_DATA_DIR);
sysNsDir = new Path(tmpDataDir, NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR);

View File

@ -0,0 +1,233 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.migration;
import java.io.IOException;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileV1Detector;
import org.apache.hadoop.hbase.util.ZKDataMigrator;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class UpgradeTo96 extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(UpgradeTo96.class);
private Options options = new Options();
/**
* whether to do overall upgrade (namespace and znodes)
*/
private boolean upgrade;
/**
* whether to check for HFileV1
*/
private boolean checkForHFileV1;
/**
* Path of directory to check for HFileV1
*/
private String dirToCheckForHFileV1;
UpgradeTo96() {
setOptions();
}
private void setOptions() {
options.addOption("h", "help", false, "Help");
options.addOption(new Option("check", false, "Run upgrade check; looks for HFileV1 "
+ " under ${hbase.rootdir} or provided 'dir' directory."));
options.addOption(new Option("execute", false, "Run upgrade; zk and hdfs must be up, hbase down"));
Option pathOption = new Option("dir", true,
"Relative path of dir to check for HFileV1s.");
pathOption.setRequired(false);
options.addOption(pathOption);
}
private boolean parseOption(String[] args) throws ParseException {
if (args.length == 0) return false; // no args shows help.
CommandLineParser parser = new GnuParser();
CommandLine cmd = parser.parse(options, args);
if (cmd.hasOption("h")) {
printUsage();
return false;
}
if (cmd.hasOption("execute")) upgrade = true;
if (cmd.hasOption("check")) checkForHFileV1 = true;
if (checkForHFileV1 && cmd.hasOption("dir")) {
this.dirToCheckForHFileV1 = cmd.getOptionValue("dir");
}
return true;
}
private void printUsage() {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("$bin/hbase upgrade -check [-dir DIR]|-execute", options);
System.out.println("Read http://hbase.apache.org/book.html#upgrade0.96 before attempting upgrade");
System.out.println();
System.out.println("Example usage:");
System.out.println();
System.out.println("Run upgrade check; looks for HFileV1s under ${hbase.rootdir}:");
System.out.println(" $ bin/hbase upgrade -check");
System.out.println();
System.out.println("Run the upgrade: ");
System.out.println(" $ bin/hbase upgrade -execute");
}
@Override
public int run(String[] args) throws Exception {
if (!parseOption(args)) {
printUsage();
return -1;
}
if (checkForHFileV1) {
int res = doHFileV1Check();
if (res == 0) LOG.info("No HFileV1 found.");
else {
LOG.warn("There are some HFileV1, or corrupt files (files with incorrect major version).");
return -1;
}
}
// if the user wants to upgrade, check for any HBase live process.
// If yes, prompt the user to stop them
if (upgrade) {
if (isAnyHBaseProcessAlive()) {
LOG.error("Some HBase processes are still alive, or znodes not expired yet. "
+ "Please stop them before upgrade or try after some time.");
throw new IOException("Some HBase processes are still alive, or znodes not expired yet");
}
return upgradeNamespaceAndZnodes();
}
return -1;
}
private boolean isAnyHBaseProcessAlive() throws IOException {
ZooKeeperWatcher zkw = null;
try {
zkw = new ZooKeeperWatcher(getConf(), "Check Live Processes.", new Abortable() {
private boolean aborted = false;
@Override
public void abort(String why, Throwable e) {
LOG.warn("Got aborted with reason: " + why + ", and error: " + e);
this.aborted = true;
}
@Override
public boolean isAborted() {
return this.aborted;
}
});
boolean liveProcessesExists = false;
if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) {
return false;
}
if (ZKUtil.checkExists(zkw, zkw.backupMasterAddressesZNode) != -1) {
List<String> backupMasters = ZKUtil
.listChildrenNoWatch(zkw, zkw.backupMasterAddressesZNode);
if (!backupMasters.isEmpty()) {
LOG.warn("Backup master(s) " + backupMasters
+ " are alive or backup-master znodes not expired.");
liveProcessesExists = true;
}
}
if (ZKUtil.checkExists(zkw, zkw.rsZNode) != -1) {
List<String> regionServers = ZKUtil.listChildrenNoWatch(zkw, zkw.rsZNode);
if (!regionServers.isEmpty()) {
LOG.warn("Region server(s) " + regionServers + " are alive or rs znodes not expired.");
liveProcessesExists = true;
}
}
if (ZKUtil.checkExists(zkw, zkw.getMasterAddressZNode()) != -1) {
byte[] data = ZKUtil.getData(zkw, zkw.getMasterAddressZNode());
if (data != null && !Bytes.equals(data, HConstants.EMPTY_BYTE_ARRAY)) {
LOG.warn("Active master at address " + Bytes.toString(data)
+ " is still alive or master znode not expired.");
liveProcessesExists = true;
}
}
return liveProcessesExists;
} catch (Exception e) {
LOG.error("Got exception while checking live hbase processes", e);
throw new IOException(e);
} finally {
if (zkw != null) {
zkw.close();
}
}
}
private int doHFileV1Check() throws Exception {
String[] args = null;
if (dirToCheckForHFileV1 != null) args = new String[] { "-p" + dirToCheckForHFileV1 };
return ToolRunner.run(getConf(), new HFileV1Detector(), args);
}
private int upgradeNamespaceAndZnodes() throws Exception {
int res = upgradeNamespace();
if (res == 0) return upgradeZnodes();//upgrade znodes only if we succeed in first step.
else {
LOG.warn("Namespace upgrade returned: "+res +", expected 0. Aborting the upgrade");
throw new Exception("Unexpected return code from Namespace upgrade");
}
}
private int upgradeNamespace() throws Exception {
LOG.info("Upgrading Namespace");
try {
int res = ToolRunner.run(getConf(), new NamespaceUpgrade(), new String[] { "--upgrade" });
LOG.info("Successfully Upgraded NameSpace.");
return res;
} catch (Exception e) {
LOG.error("Got exception while upgrading Namespace", e);
throw e;
}
}
private int upgradeZnodes() throws Exception {
LOG.info("Upgrading Znodes");
try {
int res = ToolRunner.run(getConf(), new ZKDataMigrator(), null);
LOG.info("Succesfully upgraded znodes.");
return res;
} catch (Exception e) {
LOG.error("Got exception while upgrading Znodes", e);
throw e;
}
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(HBaseConfiguration.create(), new UpgradeTo96(), args));
}
}

View File

@ -46,7 +46,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.hfile.FixedFileTrailer;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.util.Tool;
@ -69,9 +68,27 @@ public class HFileV1Detector extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(HFileV1Detector.class);
private static final int DEFAULT_NUM_OF_THREADS = 10;
private int numOfThreads;
private Path dirToProcess;
/**
* directory to start the processing.
*/
private Path targetDirPath;
/**
* executor for processing regions.
*/
private ExecutorService exec;
/**
* Keeps record of processed tables.
*/
private final Set<Path> processedTables = new HashSet<Path>();
/**
* set of corrupted HFiles (with undetermined major version)
*/
private final Set<Path> corruptedHFiles = Collections
.newSetFromMap(new ConcurrentHashMap<Path, Boolean>());
/**
* set of HfileV1;
*/
private final Set<Path> hFileV1Set = Collections
.newSetFromMap(new ConcurrentHashMap<Path, Boolean>());
@ -107,75 +124,101 @@ public class HFileV1Detector extends Configured implements Tool {
}
if (cmd.hasOption("p")) {
dirToProcess = new Path(cmd.getOptionValue("p"));
this.targetDirPath = new Path(FSUtils.getRootDir(getConf()), cmd.getOptionValue("p"));
}
try {
if (cmd.hasOption("n")) {
int n = Integer.parseInt(cmd.getOptionValue("n"));
if (n < 0 || n > 100) {
System.out.println("Please use a positive number <= 100 for number of threads."
LOG.warn("Please use a positive number <= 100 for number of threads."
+ " Continuing with default value " + DEFAULT_NUM_OF_THREADS);
return true;
}
numOfThreads = n;
this.numOfThreads = n;
}
} catch (NumberFormatException nfe) {
System.err.println("Please select a valid number for threads");
LOG.error("Please select a valid number for threads");
return false;
}
return true;
}
/**
* Checks for HFileV1.
* @return 0 when no HFileV1 is present.
* 1 when a HFileV1 is present or, when there is a file with corrupt major version
* (neither V1 nor V2).
* -1 in case of any error/exception
*/
@Override
public int run(String args[]) throws IOException, ParseException {
FSUtils.setFsDefault(getConf(), new Path(FSUtils.getRootDir(getConf()).toUri()));
fs = FileSystem.get(getConf());
numOfThreads = DEFAULT_NUM_OF_THREADS;
dirToProcess = FSUtils.getRootDir(getConf());
targetDirPath = FSUtils.getRootDir(getConf());
if (!parseOption(args)) {
System.exit(1);
System.exit(-1);
}
ExecutorService exec = Executors.newFixedThreadPool(numOfThreads);
Set<Path> regionsWithHFileV1;
this.exec = Executors.newFixedThreadPool(numOfThreads);
try {
regionsWithHFileV1 = checkForV1Files(dirToProcess, exec);
printHRegionsWithHFileV1(regionsWithHFileV1);
printAllHFileV1();
printCorruptedHFiles();
if (hFileV1Set.isEmpty() && corruptedHFiles.isEmpty()) {
// all clear.
System.out.println("No HFile V1 Found");
}
return processResult(checkForV1Files(targetDirPath));
} catch (Exception e) {
System.err.println(e);
return 1;
LOG.error(e);
} finally {
exec.shutdown();
fs.close();
}
return 0;
return -1;
}
private int processResult(Set<Path> regionsWithHFileV1) {
LOG.info("Result: \n");
printSet(processedTables, "Tables Processed: ");
int count = hFileV1Set.size();
LOG.info("Count of HFileV1: " + count);
if (count > 0) printSet(hFileV1Set, "HFileV1:");
count = corruptedHFiles.size();
LOG.info("Count of corrupted files: " + count);
if (count > 0) printSet(corruptedHFiles, "Corrupted Files: ");
count = regionsWithHFileV1.size();
LOG.info("Count of Regions with HFileV1: " + count);
if (count > 0) printSet(regionsWithHFileV1, "Regions to Major Compact: ");
return (hFileV1Set.isEmpty() && corruptedHFiles.isEmpty()) ? 0 : 1;
}
private void printSet(Set<Path> result, String msg) {
LOG.info(msg);
for (Path p : result) {
LOG.info(p);
}
}
/**
* Takes a directory path, and lists out any HFileV1, if present.
* @param targetDir directory to start looking for HFilev1.
* @param exec
* @return set of Regions that have HFileV1
* @throws IOException
*/
private Set<Path> checkForV1Files(Path targetDir, final ExecutorService exec) throws IOException {
if (isTableDir(fs, targetDir)) {
return processTable(targetDir, exec);
}
// user has passed a hbase installation directory.
private Set<Path> checkForV1Files(Path targetDir) throws IOException {
LOG.info("Target dir is: " + targetDir);
if (!fs.exists(targetDir)) {
throw new IOException("The given path does not exist: " + targetDir);
}
if (isTableDir(fs, targetDir)) {
processedTables.add(targetDir);
return processTable(targetDir);
}
Set<Path> regionsWithHFileV1 = new HashSet<Path>();
FileStatus[] fsStats = fs.listStatus(targetDir);
for (FileStatus fsStat : fsStats) {
if (isTableDir(fs, fsStat.getPath())) {
if (isTableDir(fs, fsStat.getPath()) && !isRootTable(fsStat.getPath())) {
processedTables.add(fsStat.getPath());
// look for regions and find out any v1 file.
regionsWithHFileV1.addAll(processTable(fsStat.getPath(), exec));
regionsWithHFileV1.addAll(processTable(fsStat.getPath()));
} else {
LOG.info("Ignoring path: " + fsStat.getPath());
}
@ -184,15 +227,24 @@ public class HFileV1Detector extends Configured implements Tool {
}
/**
* Find out the regions in the table which has an HFile v1 in it.
* Ignore ROOT table as it doesn't exist in 0.96.
* @param path
* @return
*/
private boolean isRootTable(Path path) {
if (path != null && path.toString().endsWith("-ROOT-")) return true;
return false;
}
/**
* Find out regions in the table which have HFileV1.
* @param tableDir
* @param exec
* @return the set of regions containing HFile v1.
* @throws IOException
*/
private Set<Path> processTable(Path tableDir, final ExecutorService exec) throws IOException {
private Set<Path> processTable(Path tableDir) throws IOException {
// list out the regions and then process each file in it.
LOG.info("processing table: " + tableDir);
LOG.debug("processing table: " + tableDir);
List<Future<Path>> regionLevelResults = new ArrayList<Future<Path>>();
Set<Path> regionsWithHFileV1 = new HashSet<Path>();
@ -200,7 +252,7 @@ public class HFileV1Detector extends Configured implements Tool {
for (FileStatus fsStat : fsStats) {
// process each region
if (isRegionDir(fs, fsStat.getPath())) {
regionLevelResults.add(processRegion(fsStat.getPath(), exec));
regionLevelResults.add(processRegion(fsStat.getPath()));
}
}
for (Future<Path> f : regionLevelResults) {
@ -209,9 +261,9 @@ public class HFileV1Detector extends Configured implements Tool {
regionsWithHFileV1.add(f.get());
}
} catch (InterruptedException e) {
System.err.println(e);
LOG.error(e);
} catch (ExecutionException e) {
System.err.println(e); // might be a bad hfile. We print it at the end.
LOG.error(e); // might be a bad hfile. We print it at the end.
}
}
return regionsWithHFileV1;
@ -221,11 +273,10 @@ public class HFileV1Detector extends Configured implements Tool {
* Each region is processed by a separate handler. If a HRegion has a hfileV1, its path is
* returned as the future result, otherwise, a null value is returned.
* @param regionDir Region to process.
* @param exec
* @return corresponding Future object.
*/
private Future<Path> processRegion(final Path regionDir, final ExecutorService exec) {
LOG.info("processing region: " + regionDir);
private Future<Path> processRegion(final Path regionDir) {
LOG.debug("processing region: " + regionDir);
Callable<Path> regionCallable = new Callable<Path>() {
@Override
public Path call() throws Exception {
@ -249,15 +300,17 @@ public class HFileV1Detector extends Configured implements Tool {
fsdis = fs.open(storeFilePath);
lenToRead = storeFile.getLen();
}
FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, lenToRead);
int version = trailer.getMajorVersion();
if (version == 1) {
int majorVersion = computeMajorVersion(fsdis, lenToRead);
if (majorVersion == 1) {
hFileV1Set.add(storeFilePath);
// return this region path, as it needs to be compacted.
return regionDir;
}
if (majorVersion > 2 || majorVersion < 1) throw new IllegalArgumentException(
"Incorrect major version: " + majorVersion);
} catch (Exception iae) {
corruptedHFiles.add(storeFilePath);
LOG.error("Got exception while reading trailer for file: "+ storeFilePath, iae);
} finally {
if (fsdis != null) fsdis.close();
}
@ -265,13 +318,30 @@ public class HFileV1Detector extends Configured implements Tool {
}
return null;
}
private int computeMajorVersion(FSDataInputStream istream, long fileSize)
throws IOException {
//read up the last int of the file. Major version is in the last 3 bytes.
long seekPoint = fileSize - Bytes.SIZEOF_INT;
if (seekPoint < 0)
throw new IllegalArgumentException("File too small, no major version found");
// Read the version from the last int of the file.
istream.seek(seekPoint);
int version = istream.readInt();
// Extract and return the major version
return version & 0x00ffffff;
}
};
Future<Path> f = exec.submit(regionCallable);
return f;
}
private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException {
return FSTableDescriptors.getTableInfoPath(fs, path) != null;
// check for old format, of having /table/.tableinfo; .META. doesn't has .tableinfo,
// include it.
return (FSTableDescriptors.getTableInfoPath(fs, path) != null || FSTableDescriptors
.getCurrentTableInfoStatus(fs, path, false) != null) || path.toString().endsWith(".META.");
}
private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException {
@ -280,43 +350,6 @@ public class HFileV1Detector extends Configured implements Tool {
}
private void printHRegionsWithHFileV1(Set<Path> regionsHavingHFileV1) {
if (!regionsHavingHFileV1.isEmpty()) {
System.out.println();
System.out.println("Following regions has HFileV1 and needs to be Major Compacted:");
System.out.println();
for (Path r : regionsHavingHFileV1) {
System.out.println(r);
}
System.out.println();
}
}
private void printAllHFileV1() {
if (!hFileV1Set.isEmpty()) {
System.out.println();
System.out.println("Following HFileV1 are found:");
System.out.println();
for (Path r : hFileV1Set) {
System.out.println(r);
}
System.out.println();
}
}
private void printCorruptedHFiles() {
if (!corruptedHFiles.isEmpty()) {
System.out.println();
System.out.println("Following HFiles are corrupted as their version is unknown:");
System.out.println();
for (Path r : corruptedHFiles) {
System.out.println(r);
}
System.out.println();
}
}
public static void main(String args[]) throws Exception {
System.exit(ToolRunner.run(HBaseConfiguration.create(), new HFileV1Detector(), args));
}

View File

@ -0,0 +1,261 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
/**
* Tool to migrate zookeeper data of older hbase versions(<0.95.0) to PB.
*/
public class ZKDataMigrator extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(ZKDataMigrator.class);
@Override
public int run(String[] as) throws Exception {
Configuration conf = getConf();
ZooKeeperWatcher zkw = null;
try {
zkw = new ZooKeeperWatcher(getConf(), "Migrate ZK data to PB.",
new ZKDataMigratorAbortable());
if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) {
LOG.info("No hbase related data available in zookeeper. returning..");
return 0;
}
List<String> children = ZKUtil.listChildrenNoWatch(zkw, zkw.baseZNode);
if (children == null) {
LOG.info("No child nodes to mirgrate. returning..");
return 0;
}
String childPath = null;
for (String child : children) {
childPath = ZKUtil.joinZNode(zkw.baseZNode, child);
if (child.equals(conf.get("zookeeper.znode.rootserver", "root-region-server"))) {
// -ROOT- region no longer present from 0.95.0, so we can remove this
// znode
ZKUtil.deleteNodeRecursively(zkw, childPath);
// TODO delete root table path from file system.
} else if (child.equals(conf.get("zookeeper.znode.rs", "rs"))) {
// Since there is no live region server instance during migration, we
// can remove this znode as well.
ZKUtil.deleteNodeRecursively(zkw, childPath);
} else if (child.equals(conf.get("zookeeper.znode.draining.rs", "draining"))) {
// If we want to migrate to 0.95.0 from older versions we need to stop
// the existing cluster. So there wont be any draining servers so we
// can
// remove it.
ZKUtil.deleteNodeRecursively(zkw, childPath);
} else if (child.equals(conf.get("zookeeper.znode.master", "master"))) {
// Since there is no live master instance during migration, we can
// remove this znode as well.
ZKUtil.deleteNodeRecursively(zkw, childPath);
} else if (child.equals(conf.get("zookeeper.znode.backup.masters", "backup-masters"))) {
// Since there is no live backup master instances during migration, we
// can remove this znode as well.
ZKUtil.deleteNodeRecursively(zkw, childPath);
} else if (child.equals(conf.get("zookeeper.znode.state", "shutdown"))) {
// shutdown node is not present from 0.95.0 onwards. Its renamed to
// "running". We can delete it.
ZKUtil.deleteNodeRecursively(zkw, childPath);
} else if (child.equals(conf.get("zookeeper.znode.unassigned", "unassigned"))) {
// Any way during clean cluster startup we will remove all unassigned
// region nodes. we can delete all children nodes as well. This znode
// is
// renamed to "regions-in-transition" from 0.95.0 onwards.
ZKUtil.deleteNodeRecursively(zkw, childPath);
} else if (child.equals(conf.get("zookeeper.znode.tableEnableDisable", "table"))
|| child.equals(conf.get("zookeeper.znode.masterTableEnableDisable", "table"))) {
checkAndMigrateTableStatesToPB(zkw);
} else if (child.equals(conf.get("zookeeper.znode.masterTableEnableDisable92",
"table92"))) {
// This is replica of table states from tableZnode so we can remove
// this.
ZKUtil.deleteNodeRecursively(zkw, childPath);
} else if (child.equals(conf.get("zookeeper.znode.splitlog", "splitlog"))) {
// This znode no longer available from 0.95.0 onwards, we can remove
// it.
ZKUtil.deleteNodeRecursively(zkw, childPath);
} else if (child.equals(conf.get("zookeeper.znode.replication", "replication"))) {
checkAndMigrateReplicationNodesToPB(zkw);
} else if (child.equals(conf.get("zookeeper.znode.clusterId", "hbaseid"))) {
// it will be re-created by master.
ZKUtil.deleteNodeRecursively(zkw, childPath);
} else if (child.equals(SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION)) {
// not needed as it is transient.
ZKUtil.deleteNodeRecursively(zkw, childPath);
}
}
} catch (Exception e) {
LOG.error("Got exception while updating znodes ", e);
throw new IOException(e);
} finally {
if (zkw != null) {
zkw.close();
}
}
return 0;
}
private void checkAndMigrateTableStatesToPB(ZooKeeperWatcher zkw) throws KeeperException {
List<String> tables = ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode);
if (tables == null) {
LOG.info("No table present to migrate table state to PB. returning..");
}
for (String table : tables) {
String znode = ZKUtil.joinZNode(zkw.tableZNode, table);
// Delete -ROOT- table state znode since its no longer present in 0.95.0
// onwards.
if (table.equals("-ROOT-") || table.equals(".META.")) {
ZKUtil.deleteNode(zkw, znode);
continue;
}
byte[] data = ZKUtil.getData(zkw, znode);
if (ProtobufUtil.isPBMagicPrefix(data)) continue;
ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder();
builder.setState(ZooKeeperProtos.Table.State.valueOf(Bytes.toString(data)));
data = ProtobufUtil.prependPBMagic(builder.build().toByteArray());
ZKUtil.setData(zkw, znode, data);
}
}
private void checkAndMigrateReplicationNodesToPB(ZooKeeperWatcher zkw) throws KeeperException {
String replicationZnodeName = getConf().get("zookeeper.znode.replication", "replication");
String replicationPath = ZKUtil.joinZNode(zkw.baseZNode, replicationZnodeName);
List<String> replicationZnodes = ZKUtil.listChildrenNoWatch(zkw, replicationPath);
if (replicationZnodes == null) {
LOG.info("No replication related znodes present to migrate. returning..");
}
for (String child : replicationZnodes) {
String znode = ZKUtil.joinZNode(replicationPath, child);
if (child.equals(getConf().get("zookeeper.znode.replication.peers", "peers"))) {
List<String> peers = ZKUtil.listChildrenNoWatch(zkw, znode);
if (peers == null || peers.isEmpty()) {
LOG.info("No peers present to migrate. returning..");
continue;
}
checkAndMigratePeerZnodesToPB(zkw, znode, peers);
} else if (child.equals(getConf().get("zookeeper.znode.replication.state", "state"))) {
// This is no longer used in >=0.95.x
ZKUtil.deleteNodeRecursively(zkw, znode);
} else if (child.equals(getConf().get("zookeeper.znode.replication.rs", "rs"))) {
List<String> rsList = ZKUtil.listChildrenNoWatch(zkw, znode);
if (rsList == null || rsList.isEmpty()) continue;
for (String rs : rsList) {
checkAndMigrateQueuesToPB(zkw, znode, rs);
}
}
}
}
private void checkAndMigrateQueuesToPB(ZooKeeperWatcher zkw, String znode, String rs)
throws KeeperException, NoNodeException {
String rsPath = ZKUtil.joinZNode(znode, rs);
List<String> peers = ZKUtil.listChildrenNoWatch(zkw, rsPath);
if (peers == null || peers.isEmpty()) return;
String peerPath = null;
for (String peer : peers) {
peerPath = ZKUtil.joinZNode(rsPath, peer);
List<String> files = ZKUtil.listChildrenNoWatch(zkw, peerPath);
if (files == null || files.isEmpty()) continue;
String filePath = null;
for (String file : files) {
filePath = ZKUtil.joinZNode(peerPath, file);
byte[] data = ZKUtil.getData(zkw, filePath);
if (data == null || Bytes.equals(data, HConstants.EMPTY_BYTE_ARRAY)) continue;
if (ProtobufUtil.isPBMagicPrefix(data)) continue;
ZKUtil.setData(zkw, filePath,
ZKUtil.positionToByteArray(Long.parseLong(Bytes.toString(data))));
}
}
}
private void checkAndMigratePeerZnodesToPB(ZooKeeperWatcher zkw, String znode,
List<String> peers) throws KeeperException, NoNodeException {
for (String peer : peers) {
String peerZnode = ZKUtil.joinZNode(znode, peer);
byte[] data = ZKUtil.getData(zkw, peerZnode);
if (!ProtobufUtil.isPBMagicPrefix(data)) {
migrateClusterKeyToPB(zkw, peerZnode, data);
}
String peerStatePath = ZKUtil.joinZNode(peerZnode,
getConf().get("zookeeper.znode.replication.peers.state", "peer-state"));
if (ZKUtil.checkExists(zkw, peerStatePath) != -1) {
data = ZKUtil.getData(zkw, peerStatePath);
if (ProtobufUtil.isPBMagicPrefix(data)) continue;
migratePeerStateToPB(zkw, data, peerStatePath);
}
}
}
private void migrateClusterKeyToPB(ZooKeeperWatcher zkw, String peerZnode, byte[] data)
throws KeeperException, NoNodeException {
ReplicationPeer peer = ZooKeeperProtos.ReplicationPeer.newBuilder()
.setClusterkey(Bytes.toString(data)).build();
ZKUtil.setData(zkw, peerZnode, ProtobufUtil.prependPBMagic(peer.toByteArray()));
}
private void migratePeerStateToPB(ZooKeeperWatcher zkw, byte[] data,
String peerStatePath)
throws KeeperException, NoNodeException {
String state = Bytes.toString(data);
if (ZooKeeperProtos.ReplicationState.State.ENABLED.name().equals(state)) {
ZKUtil.setData(zkw, peerStatePath, ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
} else if (ZooKeeperProtos.ReplicationState.State.DISABLED.name().equals(state)) {
ZKUtil.setData(zkw, peerStatePath, ReplicationStateZKBase.DISABLED_ZNODE_BYTES);
}
}
public static void main(String args[]) throws Exception {
System.exit(ToolRunner.run(HBaseConfiguration.create(), new ZKDataMigrator(), args));
}
static class ZKDataMigratorAbortable implements Abortable {
private boolean aborted = false;
@Override
public void abort(String why, Throwable e) {
LOG.error("Got aborted with reason: " + why + ", and error: " + e);
this.aborted = true;
}
@Override
public boolean isAborted() {
return this.aborted;
}
}
}

View File

@ -150,7 +150,7 @@ public class TestNamespaceUpgrade {
}
}
private static File untar(final File testdir) throws IOException {
static File untar(final File testdir) throws IOException {
// Find the src data under src/test/data
final String datafile = "TestNamespaceUpgrade";
File srcTarFile = new File(

View File

@ -0,0 +1,249 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.migration;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import java.io.File;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.master.TableNamespaceManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table.State;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileV1Detector;
import org.apache.hadoop.hbase.util.ZKDataMigrator;
import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.server.ZKDatabase;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* Upgrade to 0.96 involves detecting HFileV1 in existing cluster, updating namespace and
* updating znodes. This class tests for HFileV1 detection and upgrading znodes.
* Uprading namespace is tested in {@link TestNamespaceUpgrade}.
*/
@Category(MediumTests.class)
public class TestUpgradeTo96 {
static final Log LOG = LogFactory.getLog(TestUpgradeTo96.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
/**
* underlying file system instance
*/
private static FileSystem fs;
/**
* hbase root dir
*/
private static Path hbaseRootDir;
private static ZooKeeperWatcher zkw;
/**
* replication peer znode (/hbase/replication/peers)
*/
private static String replicationPeerZnode;
/**
* znode of a table
*/
private static String tableAZnode;
private static ReplicationPeer peer1;
/**
* znode for replication peer1 (/hbase/replication/peers/1)
*/
private static String peer1Znode;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// Start up the mini cluster on top of an 0.94 root.dir that has data from
// a 0.94 hbase run and see if we can migrate to 0.96
TEST_UTIL.startMiniZKCluster();
TEST_UTIL.startMiniDFSCluster(1);
hbaseRootDir = TEST_UTIL.getDefaultRootDirPath();
fs = FileSystem.get(TEST_UTIL.getConfiguration());
FSUtils.setRootDir(TEST_UTIL.getConfiguration(), hbaseRootDir);
zkw = TEST_UTIL.getZooKeeperWatcher();
Path testdir = TEST_UTIL.getDataTestDir("TestUpgradeTo96");
// get the untar 0.94 file structure
set94FSLayout(testdir);
setUp94Znodes();
}
/**
* Lays out 0.94 file system layout using {@link TestNamespaceUpgrade} apis.
* @param testdir
* @throws IOException
* @throws Exception
*/
private static void set94FSLayout(Path testdir) throws IOException, Exception {
File untar = TestNamespaceUpgrade.untar(new File(testdir.toString()));
if (!fs.exists(hbaseRootDir.getParent())) {
// mkdir at first
fs.mkdirs(hbaseRootDir.getParent());
}
FsShell shell = new FsShell(TEST_UTIL.getConfiguration());
shell.run(new String[] { "-put", untar.toURI().toString(), hbaseRootDir.toString() });
// See whats in minihdfs.
shell.run(new String[] { "-lsr", "/" });
}
/**
* Sets znodes used in 0.94 version. Only table and replication znodes will be upgraded to PB,
* others would be deleted.
* @throws KeeperException
*/
private static void setUp94Znodes() throws IOException, KeeperException {
// add some old znodes, which would be deleted after upgrade.
String rootRegionServerZnode = ZKUtil.joinZNode(zkw.baseZNode, "root-region-server");
ZKUtil.createWithParents(zkw, rootRegionServerZnode);
ZKUtil.createWithParents(zkw, zkw.backupMasterAddressesZNode);
// add table znode, data of its children would be protobuffized
tableAZnode = ZKUtil.joinZNode(zkw.tableZNode, "a");
ZKUtil.createWithParents(zkw, tableAZnode,
Bytes.toBytes(ZooKeeperProtos.Table.State.ENABLED.toString()));
// add replication znodes, data of its children would be protobuffized
String replicationZnode = ZKUtil.joinZNode(zkw.baseZNode, "replication");
replicationPeerZnode = ZKUtil.joinZNode(replicationZnode, "peers");
peer1Znode = ZKUtil.joinZNode(replicationPeerZnode, "1");
peer1 = ReplicationPeer.newBuilder().setClusterkey("abc:123:/hbase").build();
ZKUtil.createWithParents(zkw, peer1Znode, Bytes.toBytes(peer1.getClusterkey()));
}
/**
* Tests a 0.94 filesystem for any HFileV1.
* @throws Exception
*/
@Test
public void testHFileV1Detector() throws Exception {
assertEquals(0, ToolRunner.run(TEST_UTIL.getConfiguration(), new HFileV1Detector(), null));
}
/**
* Creates a corrupt file, and run HFileV1 detector tool
* @throws Exception
*/
@Test
public void testHFileV1DetectorWithCorruptFiles() throws Exception {
// add a corrupt file.
Path tablePath = new Path(hbaseRootDir, "foo");
FileStatus[] regionsDir = fs.listStatus(tablePath);
if (regionsDir == null) throw new IOException("No Regions found for table " + "foo");
Path columnFamilyDir = null;
Path targetRegion = null;
for (FileStatus s : regionsDir) {
if (fs.exists(new Path(s.getPath(), HRegionFileSystem.REGION_INFO_FILE))) {
targetRegion = s.getPath();
break;
}
}
FileStatus[] cfs = fs.listStatus(targetRegion);
for (FileStatus f : cfs) {
if (f.isDir()) {
columnFamilyDir = f.getPath();
break;
}
}
LOG.debug("target columnFamilyDir: " + columnFamilyDir);
// now insert a corrupt file in the columnfamily.
Path corruptFile = new Path(columnFamilyDir, "corrupt_file");
if (!fs.createNewFile(corruptFile)) throw new IOException("Couldn't create corrupt file: "
+ corruptFile);
assertEquals(1, ToolRunner.run(TEST_UTIL.getConfiguration(), new HFileV1Detector(), null));
// remove the corrupt file
FileSystem.get(TEST_UTIL.getConfiguration()).delete(corruptFile, false);
}
@Test
public void testADirForHFileV1() throws Exception {
Path tablePath = new Path(hbaseRootDir, "foo");
System.out.println("testADirForHFileV1: " + tablePath.makeQualified(fs));
System.out.println("Passed: " + hbaseRootDir + "/foo");
assertEquals(0,
ToolRunner.run(TEST_UTIL.getConfiguration(), new HFileV1Detector(), new String[] { "-p"
+ "foo" }));
}
@Test
public void testZnodeMigration() throws Exception {
String rootRSZnode = ZKUtil.joinZNode(zkw.baseZNode, "root-region-server");
assertTrue(ZKUtil.checkExists(zkw, rootRSZnode) > -1);
ToolRunner.run(TEST_UTIL.getConfiguration(), new UpgradeTo96(), new String[] { "-execute" });
assertEquals(-1, ZKUtil.checkExists(zkw, rootRSZnode));
byte[] data = ZKUtil.getData(zkw, tableAZnode);
assertTrue(ProtobufUtil.isPBMagicPrefix(data));
checkTableState(data, ZooKeeperProtos.Table.State.ENABLED);
// ensure replication znodes are there, and protobuffed.
data = ZKUtil.getData(zkw, peer1Znode);
assertTrue(ProtobufUtil.isPBMagicPrefix(data));
checkReplicationPeerData(data, peer1);
}
private void checkTableState(byte[] data, State expectedState)
throws InvalidProtocolBufferException {
ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder();
int magicLen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.Table t = builder.mergeFrom(data, magicLen, data.length - magicLen).build();
assertTrue(t.getState() == expectedState);
}
private void checkReplicationPeerData(byte[] data, ReplicationPeer peer)
throws InvalidProtocolBufferException {
int magicLen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder();
assertEquals(builder.mergeFrom(data, magicLen, data.length - magicLen).build().getClusterkey(),
peer.getClusterkey());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniHBaseCluster();
TEST_UTIL.shutdownMiniDFSCluster();
TEST_UTIL.shutdownMiniZKCluster();
}
}