From 08247bfe76d98cbeb83060e2d6e4d78dd2cb0f9d Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Thu, 22 Aug 2013 22:46:14 +0000 Subject: [PATCH] HBASE-9311 Create a migration script that will move data from 0.94.x to 0.96 git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1516648 13f79535-47bb-0310-9956-ffa450edef68 --- bin/hbase | 3 + .../replication/ReplicationStateZKBase.java | 2 +- .../hbase/migration/NamespaceUpgrade.java | 1 + .../hadoop/hbase/migration/UpgradeTo96.java | 233 ++++++++++++++++ .../hadoop/hbase/util/HFileV1Detector.java | 193 +++++++------ .../hadoop/hbase/util/ZKDataMigrator.java | 261 ++++++++++++++++++ .../hbase/migration/TestNamespaceUpgrade.java | 2 +- .../hbase/migration/TestUpgradeTo96.java | 249 +++++++++++++++++ 8 files changed, 862 insertions(+), 82 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/migration/UpgradeTo96.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/util/ZKDataMigrator.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/migration/TestUpgradeTo96.java diff --git a/bin/hbase b/bin/hbase index ece4d286496..f15db565372 100755 --- a/bin/hbase +++ b/bin/hbase @@ -77,6 +77,7 @@ if [ $# = 0 ]; then echo " hlog write-ahead-log analyzer" echo " hfile store file analyzer" echo " zkcli run the ZooKeeper shell" + echo " upgrade upgrade hbase" echo "" echo "PROCESS MANAGEMENT" echo " master run an HBase HMaster node" @@ -271,6 +272,8 @@ elif [ "$COMMAND" = "hfile" ] ; then CLASS='org.apache.hadoop.hbase.io.hfile.HFile' elif [ "$COMMAND" = "zkcli" ] ; then CLASS="org.apache.hadoop.hbase.zookeeper.ZooKeeperMainServer" +elif [ "$COMMAND" = "upgrade" ] ; then + CLASS="org.apache.hadoop.hbase.migration.UpgradeTo96" elif [ "$COMMAND" = "master" ] ; then CLASS='org.apache.hadoop.hbase.master.HMaster' if [ "$1" != "stop" ] ; then diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java index 9199efee956..3e99687e36b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -54,7 +54,7 @@ public abstract class ReplicationStateZKBase { // Public for testing public static final byte[] ENABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED); - protected static final byte[] DISABLED_ZNODE_BYTES = + public static final byte[] DISABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED); public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java index 3a3abc2ad87..f5f3dcd6748 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java @@ -105,6 +105,7 @@ public class NamespaceUpgrade implements Tool { public void init() throws IOException { this.rootDir = FSUtils.getRootDir(conf); + FSUtils.setFsDefault(getConf(), rootDir); this.fs = FileSystem.get(conf); Path tmpDataDir = new Path(rootDir, TMP_DATA_DIR); sysNsDir = new Path(tmpDataDir, NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/UpgradeTo96.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/UpgradeTo96.java new file mode 100644 index 00000000000..b3676aa9a84 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/UpgradeTo96.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.migration; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HFileV1Detector; +import org.apache.hadoop.hbase.util.ZKDataMigrator; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +public class UpgradeTo96 extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(UpgradeTo96.class); + + private Options options = new Options(); + /** + * whether to do overall upgrade (namespace and znodes) + */ + private boolean upgrade; + /** + * whether to check for HFileV1 + */ + private boolean checkForHFileV1; + /** + * Path of directory to check for HFileV1 + */ + private String dirToCheckForHFileV1; + + UpgradeTo96() { + setOptions(); + } + + private void setOptions() { + options.addOption("h", "help", false, "Help"); + options.addOption(new Option("check", false, "Run upgrade check; looks for HFileV1 " + + " under ${hbase.rootdir} or provided 'dir' directory.")); + options.addOption(new Option("execute", false, "Run upgrade; zk and hdfs must be up, hbase down")); + Option pathOption = new Option("dir", true, + "Relative path of dir to check for HFileV1s."); + pathOption.setRequired(false); + options.addOption(pathOption); + } + + private boolean parseOption(String[] args) throws ParseException { + if (args.length == 0) return false; // no args shows help. + + CommandLineParser parser = new GnuParser(); + CommandLine cmd = parser.parse(options, args); + if (cmd.hasOption("h")) { + printUsage(); + return false; + } + if (cmd.hasOption("execute")) upgrade = true; + if (cmd.hasOption("check")) checkForHFileV1 = true; + if (checkForHFileV1 && cmd.hasOption("dir")) { + this.dirToCheckForHFileV1 = cmd.getOptionValue("dir"); + } + return true; + } + + private void printUsage() { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("$bin/hbase upgrade -check [-dir DIR]|-execute", options); + System.out.println("Read http://hbase.apache.org/book.html#upgrade0.96 before attempting upgrade"); + System.out.println(); + System.out.println("Example usage:"); + System.out.println(); + System.out.println("Run upgrade check; looks for HFileV1s under ${hbase.rootdir}:"); + System.out.println(" $ bin/hbase upgrade -check"); + System.out.println(); + System.out.println("Run the upgrade: "); + System.out.println(" $ bin/hbase upgrade -execute"); + } + + @Override + public int run(String[] args) throws Exception { + if (!parseOption(args)) { + printUsage(); + return -1; + } + if (checkForHFileV1) { + int res = doHFileV1Check(); + if (res == 0) LOG.info("No HFileV1 found."); + else { + LOG.warn("There are some HFileV1, or corrupt files (files with incorrect major version)."); + return -1; + } + } + // if the user wants to upgrade, check for any HBase live process. + // If yes, prompt the user to stop them + if (upgrade) { + if (isAnyHBaseProcessAlive()) { + LOG.error("Some HBase processes are still alive, or znodes not expired yet. " + + "Please stop them before upgrade or try after some time."); + throw new IOException("Some HBase processes are still alive, or znodes not expired yet"); + } + return upgradeNamespaceAndZnodes(); + } + return -1; + } + + private boolean isAnyHBaseProcessAlive() throws IOException { + ZooKeeperWatcher zkw = null; + try { + zkw = new ZooKeeperWatcher(getConf(), "Check Live Processes.", new Abortable() { + private boolean aborted = false; + + @Override + public void abort(String why, Throwable e) { + LOG.warn("Got aborted with reason: " + why + ", and error: " + e); + this.aborted = true; + } + + @Override + public boolean isAborted() { + return this.aborted; + } + + }); + boolean liveProcessesExists = false; + if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) { + return false; + } + if (ZKUtil.checkExists(zkw, zkw.backupMasterAddressesZNode) != -1) { + List backupMasters = ZKUtil + .listChildrenNoWatch(zkw, zkw.backupMasterAddressesZNode); + if (!backupMasters.isEmpty()) { + LOG.warn("Backup master(s) " + backupMasters + + " are alive or backup-master znodes not expired."); + liveProcessesExists = true; + } + } + if (ZKUtil.checkExists(zkw, zkw.rsZNode) != -1) { + List regionServers = ZKUtil.listChildrenNoWatch(zkw, zkw.rsZNode); + if (!regionServers.isEmpty()) { + LOG.warn("Region server(s) " + regionServers + " are alive or rs znodes not expired."); + liveProcessesExists = true; + } + } + if (ZKUtil.checkExists(zkw, zkw.getMasterAddressZNode()) != -1) { + byte[] data = ZKUtil.getData(zkw, zkw.getMasterAddressZNode()); + if (data != null && !Bytes.equals(data, HConstants.EMPTY_BYTE_ARRAY)) { + LOG.warn("Active master at address " + Bytes.toString(data) + + " is still alive or master znode not expired."); + liveProcessesExists = true; + } + } + return liveProcessesExists; + } catch (Exception e) { + LOG.error("Got exception while checking live hbase processes", e); + throw new IOException(e); + } finally { + if (zkw != null) { + zkw.close(); + } + } + } + + private int doHFileV1Check() throws Exception { + String[] args = null; + if (dirToCheckForHFileV1 != null) args = new String[] { "-p" + dirToCheckForHFileV1 }; + return ToolRunner.run(getConf(), new HFileV1Detector(), args); + } + + private int upgradeNamespaceAndZnodes() throws Exception { + int res = upgradeNamespace(); + if (res == 0) return upgradeZnodes();//upgrade znodes only if we succeed in first step. + else { + LOG.warn("Namespace upgrade returned: "+res +", expected 0. Aborting the upgrade"); + throw new Exception("Unexpected return code from Namespace upgrade"); + } + } + + private int upgradeNamespace() throws Exception { + LOG.info("Upgrading Namespace"); + try { + int res = ToolRunner.run(getConf(), new NamespaceUpgrade(), new String[] { "--upgrade" }); + LOG.info("Successfully Upgraded NameSpace."); + return res; + } catch (Exception e) { + LOG.error("Got exception while upgrading Namespace", e); + throw e; + } + } + + private int upgradeZnodes() throws Exception { + LOG.info("Upgrading Znodes"); + try { + int res = ToolRunner.run(getConf(), new ZKDataMigrator(), null); + LOG.info("Succesfully upgraded znodes."); + return res; + } catch (Exception e) { + LOG.error("Got exception while upgrading Znodes", e); + throw e; + } + } + + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(HBaseConfiguration.create(), new UpgradeTo96(), args)); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileV1Detector.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileV1Detector.java index af7a95035ae..a35adbec2c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileV1Detector.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileV1Detector.java @@ -46,7 +46,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.io.HFileLink; -import org.apache.hadoop.hbase.io.hfile.FixedFileTrailer; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.util.Tool; @@ -69,9 +68,27 @@ public class HFileV1Detector extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(HFileV1Detector.class); private static final int DEFAULT_NUM_OF_THREADS = 10; private int numOfThreads; - private Path dirToProcess; + /** + * directory to start the processing. + */ + private Path targetDirPath; + /** + * executor for processing regions. + */ + private ExecutorService exec; + + /** + * Keeps record of processed tables. + */ + private final Set processedTables = new HashSet(); + /** + * set of corrupted HFiles (with undetermined major version) + */ private final Set corruptedHFiles = Collections .newSetFromMap(new ConcurrentHashMap()); + /** + * set of HfileV1; + */ private final Set hFileV1Set = Collections .newSetFromMap(new ConcurrentHashMap()); @@ -107,75 +124,101 @@ public class HFileV1Detector extends Configured implements Tool { } if (cmd.hasOption("p")) { - dirToProcess = new Path(cmd.getOptionValue("p")); + this.targetDirPath = new Path(FSUtils.getRootDir(getConf()), cmd.getOptionValue("p")); } try { if (cmd.hasOption("n")) { int n = Integer.parseInt(cmd.getOptionValue("n")); if (n < 0 || n > 100) { - System.out.println("Please use a positive number <= 100 for number of threads." + LOG.warn("Please use a positive number <= 100 for number of threads." + " Continuing with default value " + DEFAULT_NUM_OF_THREADS); return true; } - numOfThreads = n; + this.numOfThreads = n; } } catch (NumberFormatException nfe) { - System.err.println("Please select a valid number for threads"); + LOG.error("Please select a valid number for threads"); return false; } return true; } + /** + * Checks for HFileV1. + * @return 0 when no HFileV1 is present. + * 1 when a HFileV1 is present or, when there is a file with corrupt major version + * (neither V1 nor V2). + * -1 in case of any error/exception + */ @Override public int run(String args[]) throws IOException, ParseException { + FSUtils.setFsDefault(getConf(), new Path(FSUtils.getRootDir(getConf()).toUri())); fs = FileSystem.get(getConf()); numOfThreads = DEFAULT_NUM_OF_THREADS; - dirToProcess = FSUtils.getRootDir(getConf()); + targetDirPath = FSUtils.getRootDir(getConf()); if (!parseOption(args)) { - System.exit(1); + System.exit(-1); } - ExecutorService exec = Executors.newFixedThreadPool(numOfThreads); - Set regionsWithHFileV1; + this.exec = Executors.newFixedThreadPool(numOfThreads); try { - regionsWithHFileV1 = checkForV1Files(dirToProcess, exec); - printHRegionsWithHFileV1(regionsWithHFileV1); - printAllHFileV1(); - printCorruptedHFiles(); - if (hFileV1Set.isEmpty() && corruptedHFiles.isEmpty()) { - // all clear. - System.out.println("No HFile V1 Found"); - } + return processResult(checkForV1Files(targetDirPath)); } catch (Exception e) { - System.err.println(e); - return 1; + LOG.error(e); } finally { exec.shutdown(); fs.close(); } - return 0; + return -1; + } + + private int processResult(Set regionsWithHFileV1) { + LOG.info("Result: \n"); + printSet(processedTables, "Tables Processed: "); + + int count = hFileV1Set.size(); + LOG.info("Count of HFileV1: " + count); + if (count > 0) printSet(hFileV1Set, "HFileV1:"); + + count = corruptedHFiles.size(); + LOG.info("Count of corrupted files: " + count); + if (count > 0) printSet(corruptedHFiles, "Corrupted Files: "); + + count = regionsWithHFileV1.size(); + LOG.info("Count of Regions with HFileV1: " + count); + if (count > 0) printSet(regionsWithHFileV1, "Regions to Major Compact: "); + + return (hFileV1Set.isEmpty() && corruptedHFiles.isEmpty()) ? 0 : 1; + } + + private void printSet(Set result, String msg) { + LOG.info(msg); + for (Path p : result) { + LOG.info(p); + } } /** * Takes a directory path, and lists out any HFileV1, if present. * @param targetDir directory to start looking for HFilev1. - * @param exec * @return set of Regions that have HFileV1 * @throws IOException */ - private Set checkForV1Files(Path targetDir, final ExecutorService exec) throws IOException { - if (isTableDir(fs, targetDir)) { - return processTable(targetDir, exec); - } - // user has passed a hbase installation directory. + private Set checkForV1Files(Path targetDir) throws IOException { + LOG.info("Target dir is: " + targetDir); if (!fs.exists(targetDir)) { throw new IOException("The given path does not exist: " + targetDir); } + if (isTableDir(fs, targetDir)) { + processedTables.add(targetDir); + return processTable(targetDir); + } Set regionsWithHFileV1 = new HashSet(); FileStatus[] fsStats = fs.listStatus(targetDir); for (FileStatus fsStat : fsStats) { - if (isTableDir(fs, fsStat.getPath())) { + if (isTableDir(fs, fsStat.getPath()) && !isRootTable(fsStat.getPath())) { + processedTables.add(fsStat.getPath()); // look for regions and find out any v1 file. - regionsWithHFileV1.addAll(processTable(fsStat.getPath(), exec)); + regionsWithHFileV1.addAll(processTable(fsStat.getPath())); } else { LOG.info("Ignoring path: " + fsStat.getPath()); } @@ -184,15 +227,24 @@ public class HFileV1Detector extends Configured implements Tool { } /** - * Find out the regions in the table which has an HFile v1 in it. + * Ignore ROOT table as it doesn't exist in 0.96. + * @param path + * @return + */ + private boolean isRootTable(Path path) { + if (path != null && path.toString().endsWith("-ROOT-")) return true; + return false; + } + + /** + * Find out regions in the table which have HFileV1. * @param tableDir - * @param exec * @return the set of regions containing HFile v1. * @throws IOException */ - private Set processTable(Path tableDir, final ExecutorService exec) throws IOException { + private Set processTable(Path tableDir) throws IOException { // list out the regions and then process each file in it. - LOG.info("processing table: " + tableDir); + LOG.debug("processing table: " + tableDir); List> regionLevelResults = new ArrayList>(); Set regionsWithHFileV1 = new HashSet(); @@ -200,7 +252,7 @@ public class HFileV1Detector extends Configured implements Tool { for (FileStatus fsStat : fsStats) { // process each region if (isRegionDir(fs, fsStat.getPath())) { - regionLevelResults.add(processRegion(fsStat.getPath(), exec)); + regionLevelResults.add(processRegion(fsStat.getPath())); } } for (Future f : regionLevelResults) { @@ -209,9 +261,9 @@ public class HFileV1Detector extends Configured implements Tool { regionsWithHFileV1.add(f.get()); } } catch (InterruptedException e) { - System.err.println(e); + LOG.error(e); } catch (ExecutionException e) { - System.err.println(e); // might be a bad hfile. We print it at the end. + LOG.error(e); // might be a bad hfile. We print it at the end. } } return regionsWithHFileV1; @@ -221,11 +273,10 @@ public class HFileV1Detector extends Configured implements Tool { * Each region is processed by a separate handler. If a HRegion has a hfileV1, its path is * returned as the future result, otherwise, a null value is returned. * @param regionDir Region to process. - * @param exec * @return corresponding Future object. */ - private Future processRegion(final Path regionDir, final ExecutorService exec) { - LOG.info("processing region: " + regionDir); + private Future processRegion(final Path regionDir) { + LOG.debug("processing region: " + regionDir); Callable regionCallable = new Callable() { @Override public Path call() throws Exception { @@ -249,15 +300,17 @@ public class HFileV1Detector extends Configured implements Tool { fsdis = fs.open(storeFilePath); lenToRead = storeFile.getLen(); } - FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, lenToRead); - int version = trailer.getMajorVersion(); - if (version == 1) { + int majorVersion = computeMajorVersion(fsdis, lenToRead); + if (majorVersion == 1) { hFileV1Set.add(storeFilePath); // return this region path, as it needs to be compacted. return regionDir; } + if (majorVersion > 2 || majorVersion < 1) throw new IllegalArgumentException( + "Incorrect major version: " + majorVersion); } catch (Exception iae) { corruptedHFiles.add(storeFilePath); + LOG.error("Got exception while reading trailer for file: "+ storeFilePath, iae); } finally { if (fsdis != null) fsdis.close(); } @@ -265,13 +318,30 @@ public class HFileV1Detector extends Configured implements Tool { } return null; } + + private int computeMajorVersion(FSDataInputStream istream, long fileSize) + throws IOException { + //read up the last int of the file. Major version is in the last 3 bytes. + long seekPoint = fileSize - Bytes.SIZEOF_INT; + if (seekPoint < 0) + throw new IllegalArgumentException("File too small, no major version found"); + + // Read the version from the last int of the file. + istream.seek(seekPoint); + int version = istream.readInt(); + // Extract and return the major version + return version & 0x00ffffff; + } }; Future f = exec.submit(regionCallable); return f; } private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException { - return FSTableDescriptors.getTableInfoPath(fs, path) != null; + // check for old format, of having /table/.tableinfo; .META. doesn't has .tableinfo, + // include it. + return (FSTableDescriptors.getTableInfoPath(fs, path) != null || FSTableDescriptors + .getCurrentTableInfoStatus(fs, path, false) != null) || path.toString().endsWith(".META."); } private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException { @@ -280,43 +350,6 @@ public class HFileV1Detector extends Configured implements Tool { } - private void printHRegionsWithHFileV1(Set regionsHavingHFileV1) { - if (!regionsHavingHFileV1.isEmpty()) { - System.out.println(); - System.out.println("Following regions has HFileV1 and needs to be Major Compacted:"); - System.out.println(); - for (Path r : regionsHavingHFileV1) { - System.out.println(r); - } - System.out.println(); - } - } - - private void printAllHFileV1() { - if (!hFileV1Set.isEmpty()) { - System.out.println(); - System.out.println("Following HFileV1 are found:"); - System.out.println(); - for (Path r : hFileV1Set) { - System.out.println(r); - } - System.out.println(); - } - - } - - private void printCorruptedHFiles() { - if (!corruptedHFiles.isEmpty()) { - System.out.println(); - System.out.println("Following HFiles are corrupted as their version is unknown:"); - System.out.println(); - for (Path r : corruptedHFiles) { - System.out.println(r); - } - System.out.println(); - } - } - public static void main(String args[]) throws Exception { System.exit(ToolRunner.run(HBaseConfiguration.create(), new HFileV1Detector(), args)); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ZKDataMigrator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ZKDataMigrator.java new file mode 100644 index 00000000000..311a4eda158 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ZKDataMigrator.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer; +import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; + +/** + * Tool to migrate zookeeper data of older hbase versions(<0.95.0) to PB. + */ +public class ZKDataMigrator extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(ZKDataMigrator.class); + + @Override + public int run(String[] as) throws Exception { + Configuration conf = getConf(); + ZooKeeperWatcher zkw = null; + try { + zkw = new ZooKeeperWatcher(getConf(), "Migrate ZK data to PB.", + new ZKDataMigratorAbortable()); + if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) { + LOG.info("No hbase related data available in zookeeper. returning.."); + return 0; + } + List children = ZKUtil.listChildrenNoWatch(zkw, zkw.baseZNode); + if (children == null) { + LOG.info("No child nodes to mirgrate. returning.."); + return 0; + } + String childPath = null; + for (String child : children) { + childPath = ZKUtil.joinZNode(zkw.baseZNode, child); + if (child.equals(conf.get("zookeeper.znode.rootserver", "root-region-server"))) { + // -ROOT- region no longer present from 0.95.0, so we can remove this + // znode + ZKUtil.deleteNodeRecursively(zkw, childPath); + // TODO delete root table path from file system. + } else if (child.equals(conf.get("zookeeper.znode.rs", "rs"))) { + // Since there is no live region server instance during migration, we + // can remove this znode as well. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } else if (child.equals(conf.get("zookeeper.znode.draining.rs", "draining"))) { + // If we want to migrate to 0.95.0 from older versions we need to stop + // the existing cluster. So there wont be any draining servers so we + // can + // remove it. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } else if (child.equals(conf.get("zookeeper.znode.master", "master"))) { + // Since there is no live master instance during migration, we can + // remove this znode as well. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } else if (child.equals(conf.get("zookeeper.znode.backup.masters", "backup-masters"))) { + // Since there is no live backup master instances during migration, we + // can remove this znode as well. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } else if (child.equals(conf.get("zookeeper.znode.state", "shutdown"))) { + // shutdown node is not present from 0.95.0 onwards. Its renamed to + // "running". We can delete it. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } else if (child.equals(conf.get("zookeeper.znode.unassigned", "unassigned"))) { + // Any way during clean cluster startup we will remove all unassigned + // region nodes. we can delete all children nodes as well. This znode + // is + // renamed to "regions-in-transition" from 0.95.0 onwards. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } else if (child.equals(conf.get("zookeeper.znode.tableEnableDisable", "table")) + || child.equals(conf.get("zookeeper.znode.masterTableEnableDisable", "table"))) { + checkAndMigrateTableStatesToPB(zkw); + } else if (child.equals(conf.get("zookeeper.znode.masterTableEnableDisable92", + "table92"))) { + // This is replica of table states from tableZnode so we can remove + // this. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } else if (child.equals(conf.get("zookeeper.znode.splitlog", "splitlog"))) { + // This znode no longer available from 0.95.0 onwards, we can remove + // it. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } else if (child.equals(conf.get("zookeeper.znode.replication", "replication"))) { + checkAndMigrateReplicationNodesToPB(zkw); + } else if (child.equals(conf.get("zookeeper.znode.clusterId", "hbaseid"))) { + // it will be re-created by master. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } else if (child.equals(SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION)) { + // not needed as it is transient. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } + } + } catch (Exception e) { + LOG.error("Got exception while updating znodes ", e); + throw new IOException(e); + } finally { + if (zkw != null) { + zkw.close(); + } + } + return 0; + } + + private void checkAndMigrateTableStatesToPB(ZooKeeperWatcher zkw) throws KeeperException { + List tables = ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode); + if (tables == null) { + LOG.info("No table present to migrate table state to PB. returning.."); + } + for (String table : tables) { + String znode = ZKUtil.joinZNode(zkw.tableZNode, table); + // Delete -ROOT- table state znode since its no longer present in 0.95.0 + // onwards. + if (table.equals("-ROOT-") || table.equals(".META.")) { + ZKUtil.deleteNode(zkw, znode); + continue; + } + byte[] data = ZKUtil.getData(zkw, znode); + if (ProtobufUtil.isPBMagicPrefix(data)) continue; + ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder(); + builder.setState(ZooKeeperProtos.Table.State.valueOf(Bytes.toString(data))); + data = ProtobufUtil.prependPBMagic(builder.build().toByteArray()); + ZKUtil.setData(zkw, znode, data); + } + } + + private void checkAndMigrateReplicationNodesToPB(ZooKeeperWatcher zkw) throws KeeperException { + String replicationZnodeName = getConf().get("zookeeper.znode.replication", "replication"); + String replicationPath = ZKUtil.joinZNode(zkw.baseZNode, replicationZnodeName); + List replicationZnodes = ZKUtil.listChildrenNoWatch(zkw, replicationPath); + if (replicationZnodes == null) { + LOG.info("No replication related znodes present to migrate. returning.."); + } + for (String child : replicationZnodes) { + String znode = ZKUtil.joinZNode(replicationPath, child); + if (child.equals(getConf().get("zookeeper.znode.replication.peers", "peers"))) { + List peers = ZKUtil.listChildrenNoWatch(zkw, znode); + if (peers == null || peers.isEmpty()) { + LOG.info("No peers present to migrate. returning.."); + continue; + } + checkAndMigratePeerZnodesToPB(zkw, znode, peers); + } else if (child.equals(getConf().get("zookeeper.znode.replication.state", "state"))) { + // This is no longer used in >=0.95.x + ZKUtil.deleteNodeRecursively(zkw, znode); + } else if (child.equals(getConf().get("zookeeper.znode.replication.rs", "rs"))) { + List rsList = ZKUtil.listChildrenNoWatch(zkw, znode); + if (rsList == null || rsList.isEmpty()) continue; + for (String rs : rsList) { + checkAndMigrateQueuesToPB(zkw, znode, rs); + } + } + } + } + + private void checkAndMigrateQueuesToPB(ZooKeeperWatcher zkw, String znode, String rs) + throws KeeperException, NoNodeException { + String rsPath = ZKUtil.joinZNode(znode, rs); + List peers = ZKUtil.listChildrenNoWatch(zkw, rsPath); + if (peers == null || peers.isEmpty()) return; + String peerPath = null; + for (String peer : peers) { + peerPath = ZKUtil.joinZNode(rsPath, peer); + List files = ZKUtil.listChildrenNoWatch(zkw, peerPath); + if (files == null || files.isEmpty()) continue; + String filePath = null; + for (String file : files) { + filePath = ZKUtil.joinZNode(peerPath, file); + byte[] data = ZKUtil.getData(zkw, filePath); + if (data == null || Bytes.equals(data, HConstants.EMPTY_BYTE_ARRAY)) continue; + if (ProtobufUtil.isPBMagicPrefix(data)) continue; + ZKUtil.setData(zkw, filePath, + ZKUtil.positionToByteArray(Long.parseLong(Bytes.toString(data)))); + } + } + } + + private void checkAndMigratePeerZnodesToPB(ZooKeeperWatcher zkw, String znode, + List peers) throws KeeperException, NoNodeException { + for (String peer : peers) { + String peerZnode = ZKUtil.joinZNode(znode, peer); + byte[] data = ZKUtil.getData(zkw, peerZnode); + if (!ProtobufUtil.isPBMagicPrefix(data)) { + migrateClusterKeyToPB(zkw, peerZnode, data); + } + String peerStatePath = ZKUtil.joinZNode(peerZnode, + getConf().get("zookeeper.znode.replication.peers.state", "peer-state")); + if (ZKUtil.checkExists(zkw, peerStatePath) != -1) { + data = ZKUtil.getData(zkw, peerStatePath); + if (ProtobufUtil.isPBMagicPrefix(data)) continue; + migratePeerStateToPB(zkw, data, peerStatePath); + } + } + } + + private void migrateClusterKeyToPB(ZooKeeperWatcher zkw, String peerZnode, byte[] data) + throws KeeperException, NoNodeException { + ReplicationPeer peer = ZooKeeperProtos.ReplicationPeer.newBuilder() + .setClusterkey(Bytes.toString(data)).build(); + ZKUtil.setData(zkw, peerZnode, ProtobufUtil.prependPBMagic(peer.toByteArray())); + } + + private void migratePeerStateToPB(ZooKeeperWatcher zkw, byte[] data, + String peerStatePath) + throws KeeperException, NoNodeException { + String state = Bytes.toString(data); + if (ZooKeeperProtos.ReplicationState.State.ENABLED.name().equals(state)) { + ZKUtil.setData(zkw, peerStatePath, ReplicationStateZKBase.ENABLED_ZNODE_BYTES); + } else if (ZooKeeperProtos.ReplicationState.State.DISABLED.name().equals(state)) { + ZKUtil.setData(zkw, peerStatePath, ReplicationStateZKBase.DISABLED_ZNODE_BYTES); + } + } + + public static void main(String args[]) throws Exception { + System.exit(ToolRunner.run(HBaseConfiguration.create(), new ZKDataMigrator(), args)); + } + + static class ZKDataMigratorAbortable implements Abortable { + private boolean aborted = false; + + @Override + public void abort(String why, Throwable e) { + LOG.error("Got aborted with reason: " + why + ", and error: " + e); + this.aborted = true; + } + + @Override + public boolean isAborted() { + return this.aborted; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/migration/TestNamespaceUpgrade.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/migration/TestNamespaceUpgrade.java index 8232a748f34..04676d7775d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/migration/TestNamespaceUpgrade.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/migration/TestNamespaceUpgrade.java @@ -150,7 +150,7 @@ public class TestNamespaceUpgrade { } } - private static File untar(final File testdir) throws IOException { + static File untar(final File testdir) throws IOException { // Find the src data under src/test/data final String datafile = "TestNamespaceUpgrade"; File srcTarFile = new File( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/migration/TestUpgradeTo96.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/migration/TestUpgradeTo96.java new file mode 100644 index 00000000000..b18c2d4438a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/migration/TestUpgradeTo96.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.migration; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.master.TableNamespaceManager; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table.State; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileV1Detector; +import org.apache.hadoop.hbase.util.ZKDataMigrator; +import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.ToolRunner; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.server.ZKDatabase; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * Upgrade to 0.96 involves detecting HFileV1 in existing cluster, updating namespace and + * updating znodes. This class tests for HFileV1 detection and upgrading znodes. + * Uprading namespace is tested in {@link TestNamespaceUpgrade}. + */ +@Category(MediumTests.class) +public class TestUpgradeTo96 { + + static final Log LOG = LogFactory.getLog(TestUpgradeTo96.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + /** + * underlying file system instance + */ + private static FileSystem fs; + /** + * hbase root dir + */ + private static Path hbaseRootDir; + private static ZooKeeperWatcher zkw; + /** + * replication peer znode (/hbase/replication/peers) + */ + private static String replicationPeerZnode; + /** + * znode of a table + */ + private static String tableAZnode; + private static ReplicationPeer peer1; + /** + * znode for replication peer1 (/hbase/replication/peers/1) + */ + private static String peer1Znode; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Start up the mini cluster on top of an 0.94 root.dir that has data from + // a 0.94 hbase run and see if we can migrate to 0.96 + TEST_UTIL.startMiniZKCluster(); + TEST_UTIL.startMiniDFSCluster(1); + + hbaseRootDir = TEST_UTIL.getDefaultRootDirPath(); + fs = FileSystem.get(TEST_UTIL.getConfiguration()); + FSUtils.setRootDir(TEST_UTIL.getConfiguration(), hbaseRootDir); + zkw = TEST_UTIL.getZooKeeperWatcher(); + + Path testdir = TEST_UTIL.getDataTestDir("TestUpgradeTo96"); + // get the untar 0.94 file structure + + set94FSLayout(testdir); + setUp94Znodes(); + } + + /** + * Lays out 0.94 file system layout using {@link TestNamespaceUpgrade} apis. + * @param testdir + * @throws IOException + * @throws Exception + */ + private static void set94FSLayout(Path testdir) throws IOException, Exception { + File untar = TestNamespaceUpgrade.untar(new File(testdir.toString())); + if (!fs.exists(hbaseRootDir.getParent())) { + // mkdir at first + fs.mkdirs(hbaseRootDir.getParent()); + } + FsShell shell = new FsShell(TEST_UTIL.getConfiguration()); + shell.run(new String[] { "-put", untar.toURI().toString(), hbaseRootDir.toString() }); + // See whats in minihdfs. + shell.run(new String[] { "-lsr", "/" }); + } + + /** + * Sets znodes used in 0.94 version. Only table and replication znodes will be upgraded to PB, + * others would be deleted. + * @throws KeeperException + */ + private static void setUp94Znodes() throws IOException, KeeperException { + // add some old znodes, which would be deleted after upgrade. + String rootRegionServerZnode = ZKUtil.joinZNode(zkw.baseZNode, "root-region-server"); + ZKUtil.createWithParents(zkw, rootRegionServerZnode); + ZKUtil.createWithParents(zkw, zkw.backupMasterAddressesZNode); + // add table znode, data of its children would be protobuffized + tableAZnode = ZKUtil.joinZNode(zkw.tableZNode, "a"); + ZKUtil.createWithParents(zkw, tableAZnode, + Bytes.toBytes(ZooKeeperProtos.Table.State.ENABLED.toString())); + // add replication znodes, data of its children would be protobuffized + String replicationZnode = ZKUtil.joinZNode(zkw.baseZNode, "replication"); + replicationPeerZnode = ZKUtil.joinZNode(replicationZnode, "peers"); + peer1Znode = ZKUtil.joinZNode(replicationPeerZnode, "1"); + peer1 = ReplicationPeer.newBuilder().setClusterkey("abc:123:/hbase").build(); + ZKUtil.createWithParents(zkw, peer1Znode, Bytes.toBytes(peer1.getClusterkey())); + } + + /** + * Tests a 0.94 filesystem for any HFileV1. + * @throws Exception + */ + @Test + public void testHFileV1Detector() throws Exception { + assertEquals(0, ToolRunner.run(TEST_UTIL.getConfiguration(), new HFileV1Detector(), null)); + } + + /** + * Creates a corrupt file, and run HFileV1 detector tool + * @throws Exception + */ + @Test + public void testHFileV1DetectorWithCorruptFiles() throws Exception { + // add a corrupt file. + Path tablePath = new Path(hbaseRootDir, "foo"); + FileStatus[] regionsDir = fs.listStatus(tablePath); + if (regionsDir == null) throw new IOException("No Regions found for table " + "foo"); + Path columnFamilyDir = null; + Path targetRegion = null; + for (FileStatus s : regionsDir) { + if (fs.exists(new Path(s.getPath(), HRegionFileSystem.REGION_INFO_FILE))) { + targetRegion = s.getPath(); + break; + } + } + FileStatus[] cfs = fs.listStatus(targetRegion); + for (FileStatus f : cfs) { + if (f.isDir()) { + columnFamilyDir = f.getPath(); + break; + } + } + LOG.debug("target columnFamilyDir: " + columnFamilyDir); + // now insert a corrupt file in the columnfamily. + Path corruptFile = new Path(columnFamilyDir, "corrupt_file"); + if (!fs.createNewFile(corruptFile)) throw new IOException("Couldn't create corrupt file: " + + corruptFile); + assertEquals(1, ToolRunner.run(TEST_UTIL.getConfiguration(), new HFileV1Detector(), null)); + // remove the corrupt file + FileSystem.get(TEST_UTIL.getConfiguration()).delete(corruptFile, false); + } + + @Test + public void testADirForHFileV1() throws Exception { + Path tablePath = new Path(hbaseRootDir, "foo"); + System.out.println("testADirForHFileV1: " + tablePath.makeQualified(fs)); + System.out.println("Passed: " + hbaseRootDir + "/foo"); + assertEquals(0, + ToolRunner.run(TEST_UTIL.getConfiguration(), new HFileV1Detector(), new String[] { "-p" + + "foo" })); + } + + @Test + public void testZnodeMigration() throws Exception { + String rootRSZnode = ZKUtil.joinZNode(zkw.baseZNode, "root-region-server"); + assertTrue(ZKUtil.checkExists(zkw, rootRSZnode) > -1); + ToolRunner.run(TEST_UTIL.getConfiguration(), new UpgradeTo96(), new String[] { "-execute" }); + assertEquals(-1, ZKUtil.checkExists(zkw, rootRSZnode)); + byte[] data = ZKUtil.getData(zkw, tableAZnode); + assertTrue(ProtobufUtil.isPBMagicPrefix(data)); + checkTableState(data, ZooKeeperProtos.Table.State.ENABLED); + // ensure replication znodes are there, and protobuffed. + data = ZKUtil.getData(zkw, peer1Znode); + assertTrue(ProtobufUtil.isPBMagicPrefix(data)); + checkReplicationPeerData(data, peer1); + } + + private void checkTableState(byte[] data, State expectedState) + throws InvalidProtocolBufferException { + ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder(); + int magicLen = ProtobufUtil.lengthOfPBMagic(); + ZooKeeperProtos.Table t = builder.mergeFrom(data, magicLen, data.length - magicLen).build(); + assertTrue(t.getState() == expectedState); + } + + private void checkReplicationPeerData(byte[] data, ReplicationPeer peer) + throws InvalidProtocolBufferException { + int magicLen = ProtobufUtil.lengthOfPBMagic(); + ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder(); + assertEquals(builder.mergeFrom(data, magicLen, data.length - magicLen).build().getClusterkey(), + peer.getClusterkey()); + + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniHBaseCluster(); + TEST_UTIL.shutdownMiniDFSCluster(); + TEST_UTIL.shutdownMiniZKCluster(); + } + +}