HBASE-5547 Don't delete HFiles when in "backup mode" (Jesse Yates)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1364203 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2012-07-22 01:11:36 +00:00
parent c22888d6d8
commit ab96788641
38 changed files with 3649 additions and 259 deletions

View File

@ -676,7 +676,7 @@ public final class HConstants {
public static final String ENABLE_WAL_COMPRESSION =
"hbase.regionserver.wal.enablecompression";
/** Region in Transition metrics threshold time */
/** Region in Transition metrics threshold time */
public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD="hbase.metrics.rit.stuck.warning.threshold";
public static final String LOAD_BALANCER_SLOP_KEY = "hbase.regions.slop";
@ -689,6 +689,9 @@ public final class HConstants {
/** delimiter used between portions of a region name */
public static final int DELIMITER = ',';
/** Configuration key for the directory to backup HFiles for a table */
public static final String HFILE_ARCHIVE_DIRECTORY = "hbase.table.archive.directory";
private HConstants() {
// Can't be instantiated with this ctor.
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
/**
* HBase version of Hadoop's Configured class that doesn't initialize the
* configuration via {@link #setConf(Configuration)} in the constructor, but
* only sets the configuration through the {@link #setConf(Configuration)}
* method
*/
public class BaseConfigurable implements Configurable {
private Configuration conf;
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return this.conf;
}
}

View File

@ -80,6 +80,7 @@ public abstract class Chore extends HasThread {
LOG.fatal(getName() + "error", t);
} finally {
LOG.info(getName() + " exiting");
cleanup();
}
}
@ -112,4 +113,11 @@ public abstract class Chore extends HasThread {
protected void sleep() {
this.sleeper.sleep();
}
/**
* Called when the chore has completed, allowing subclasses to cleanup any
* extra overhead
*/
protected void cleanup() {
}
}

View File

@ -0,0 +1,625 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.backup;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.io.MultipleIOException;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
/**
* Utility class to handle the removal of HFiles (or the respective {@link StoreFile StoreFiles})
* for a HRegion from the {@link FileSystem}. The hfiles will be archived or deleted, depending on
* the state of the system.
*/
public class HFileArchiver {
private static final Log LOG = LogFactory.getLog(HFileArchiver.class);
private static final String SEPARATOR = ".";
private HFileArchiver() {
// hidden ctor since this is just a util
}
/**
* Cleans up all the files for a HRegion by archiving the HFiles to the
* archive directory
* @param fs the file system object
* @param info HRegionInfo for region to be deleted
* @throws IOException
*/
public static void archiveRegion(FileSystem fs, HRegionInfo info)
throws IOException {
Path rootDir = FSUtils.getRootDir(fs.getConf());
archiveRegion(fs, rootDir, HTableDescriptor.getTableDir(rootDir, info.getTableName()),
HRegion.getRegionDir(rootDir, info));
}
/**
* Remove an entire region from the table directory via archiving the region's hfiles.
* @param fs {@link FileSystem} from which to remove the region
* @param rootdir {@link Path} to the root directory where hbase files are stored (for building
* the archive path)
* @param tableDir {@link Path} to where the table is being stored (for building the archive path)
* @param regionDir {@link Path} to where a region is being stored (for building the archive path)
* @return <tt>true</tt> if the region was sucessfully deleted. <tt>false</tt> if the filesystem
* operations could not complete.
* @throws IOException if the request cannot be completed
*/
public static boolean archiveRegion(FileSystem fs, Path rootdir, Path tableDir, Path regionDir)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("ARCHIVING region " + regionDir.toString());
}
// otherwise, we archive the files
// make sure we can archive
if (tableDir == null || regionDir == null) {
LOG.error("No archive directory could be found because tabledir (" + tableDir
+ ") or regiondir (" + regionDir + "was null. Deleting files instead.");
deleteRegionWithoutArchiving(fs, regionDir);
// we should have archived, but failed to. Doesn't matter if we deleted
// the archived files correctly or not.
return false;
}
// make sure the regiondir lives under the tabledir
Preconditions.checkArgument(regionDir.toString().startsWith(tableDir.toString()));
Path regionArchiveDir = HFileArchiveUtil.getRegionArchiveDir(fs.getConf(), tableDir, regionDir);
LOG.debug("Have an archive directory, preparing to move files");
FileStatusConverter getAsFile = new FileStatusConverter(fs);
// otherwise, we attempt to archive the store files
// build collection of just the store directories to archive
Collection<File> toArchive = new ArrayList<File>();
final PathFilter dirFilter = new FSUtils.DirFilter(fs);
PathFilter nonHidden = new PathFilter() {
@Override
public boolean accept(Path file) {
return dirFilter.accept(file) && !file.getName().toString().startsWith(".");
}
};
FileStatus[] storeDirs = FSUtils.listStatus(fs, regionDir, nonHidden);
// if there no files, we can just delete the directory and return;
if (storeDirs == null) {
LOG.debug("Region directory (" + regionDir + ") was empty, just deleting and returning!");
return deleteRegionWithoutArchiving(fs, regionDir);
}
// convert the files in the region to a File
toArchive.addAll(Lists.transform(Arrays.asList(storeDirs), getAsFile));
LOG.debug("Archiving:" + toArchive);
boolean success = false;
try {
success = resolveAndArchive(fs, regionArchiveDir, toArchive);
} catch (IOException e) {
success = false;
}
// if that was successful, then we delete the region
if (success) {
LOG.debug("Successfully resolved and archived, now can just delete region.");
return deleteRegionWithoutArchiving(fs, regionDir);
}
throw new IOException("Received error when attempting to archive files (" + toArchive
+ "), cannot delete region directory.");
}
/**
* Remove the store files, either by archiving them or outright deletion
* @param fs the filesystem where the store files live
* @param parent Parent region hosting the store files
* @param conf {@link Configuration} to examine to determine the archive directory
* @param family the family hosting the store files
* @param compactedFiles files to be disposed of. No further reading of these files should be
* attempted; otherwise likely to cause an {@link IOException}
* @throws IOException if the files could not be correctly disposed.
*/
public static void archiveStoreFiles(FileSystem fs, HRegion parent,
Configuration conf, byte[] family, Collection<StoreFile> compactedFiles) throws IOException {
// sometimes in testing, we don't have rss, so we need to check for that
if (fs == null) {
LOG.warn("Passed filesystem is null, so just deleting the files without archiving for region:"
+ Bytes.toString(parent.getRegionName()) + ", family:" + Bytes.toString(family));
deleteStoreFilesWithoutArchiving(compactedFiles);
return;
}
// short circuit if we don't have any files to delete
if (compactedFiles.size() == 0) {
LOG.debug("No store files to dispose, done!");
return;
}
// build the archive path
if (parent == null || family == null) throw new IOException(
"Need to have a parent region and a family to archive from.");
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, parent, family);
// make sure we don't archive if we can't and that the archive dir exists
if (!fs.mkdirs(storeArchiveDir)) {
throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:"
+ Bytes.toString(family) + ", deleting compacted files instead.");
}
// otherwise we attempt to archive the store files
LOG.debug("Archiving compacted store files.");
// wrap the storefile into a File
StoreToFile getStorePath = new StoreToFile(fs);
Collection<File> storeFiles = Collections2.transform(compactedFiles, getStorePath);
// do the actual archive
if (!resolveAndArchive(fs, storeArchiveDir, storeFiles)) {
throw new IOException("Failed to archive/delete all the files for region:"
+ Bytes.toString(parent.getRegionName()) + ", family:" + Bytes.toString(family)
+ " into " + storeArchiveDir + "Something is probably arwy on the filesystem.");
}
}
/**
* Archive the given files and resolve any conflicts with existing files via appending the time
* archiving started (so all conflicts in the same group have the same timestamp appended).
* <p>
* If any of the passed files to archive are directories, archives the all files under that
* directory. Archive directory structure for children is the base archive directory name + the
* parent directory and is built recursively is passed files are directories themselves.
* @param fs {@link FileSystem} on which to archive the files
* @param baseArchiveDir base archive directory to archive the given files
* @param toArchive files to be archived
* @return <tt>true</tt> on success, <tt>false</tt> otherwise
* @throws IOException on unexpected failure
*/
private static boolean resolveAndArchive(FileSystem fs, Path baseArchiveDir,
Collection<File> toArchive) throws IOException {
LOG.debug("Starting to archive files:" + toArchive);
long start = EnvironmentEdgeManager.currentTimeMillis();
List<File> failures = resolveAndArchive(fs, baseArchiveDir, toArchive, start);
// clean out the failures by just deleting them
if (failures.size() > 0) {
try {
LOG.error("Failed to complete archive, deleting extra store files.");
deleteFilesWithoutArchiving(failures);
} catch (IOException e) {
LOG.warn("Failed to delete store file(s) when archiving failed", e);
}
return false;
}
return true;
}
/**
* Resolve any conflict with an existing archive file via timestamp-append
* renaming of the existing file and then archive the passed in files.
* @param fs {@link FileSystem} on which to archive the files
* @param baseArchiveDir base archive directory to store the files. If any of
* the files to archive are directories, will append the name of the
* directory to the base archive directory name, creating a parallel
* structure.
* @param toArchive files/directories that need to be archvied
* @param start time the archiving started - used for resolving archive
* conflicts.
* @return the list of failed to archive files.
* @throws IOException if an unexpected file operation exception occured
*/
private static List<File> resolveAndArchive(FileSystem fs, Path baseArchiveDir,
Collection<File> toArchive, long start) throws IOException {
// short circuit if no files to move
if (toArchive.size() == 0) return Collections.emptyList();
LOG.debug("moving files to the archive directory: " + baseArchiveDir);
// make sure the archive directory exists
if (!fs.exists(baseArchiveDir)) {
if (!fs.mkdirs(baseArchiveDir)) {
throw new IOException("Failed to create the archive directory:" + baseArchiveDir
+ ", quitting archive attempt.");
}
LOG.debug("Created archive directory:" + baseArchiveDir);
}
List<File> failures = new ArrayList<File>();
String startTime = Long.toString(start);
for (File file : toArchive) {
// if its a file archive it
try {
LOG.debug("Archiving:" + file);
if (file.isFile()) {
// attempt to archive the file
if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) {
LOG.warn("Couldn't archive " + file + " into backup directory: " + baseArchiveDir);
failures.add(file);
}
} else {
// otherwise its a directory and we need to archive all files
LOG.debug(file + " is a directory, archiving children files");
// so we add the directory name to the one base archive
Path parentArchiveDir = new Path(baseArchiveDir, file.getName());
// and then get all the files from that directory and attempt to
// archive those too
Collection<File> children = file.getChildren();
failures.addAll(resolveAndArchive(fs, parentArchiveDir, children, start));
}
} catch (IOException e) {
LOG.warn("Failed to archive file: " + file, e);
failures.add(file);
}
}
return failures;
}
/**
* Attempt to archive the passed in file to the archive directory.
* <p>
* If the same file already exists in the archive, it is moved to a timestamped directory under
* the archive directory and the new file is put in its place.
* @param archiveDir {@link Path} to the directory that stores the archives of the hfiles
* @param currentFile {@link Path} to the original HFile that will be archived
* @param archiveStartTime time the archiving started, to resolve naming conflicts
* @return <tt>true</tt> if the file is successfully archived. <tt>false</tt> if there was a
* problem, but the operation still completed.
* @throws IOException on failure to complete {@link FileSystem} operations.
*/
private static boolean resolveAndArchiveFile(Path archiveDir, File currentFile,
String archiveStartTime) throws IOException {
// build path as it should be in the archive
String filename = currentFile.getName();
Path archiveFile = new Path(archiveDir, filename);
FileSystem fs = currentFile.getFileSystem();
// if the file already exists in the archive, move that one to a timestamped backup. This is a
// really, really unlikely situtation, where we get the same name for the existing file, but
// is included just for that 1 in trillion chance.
if (fs.exists(archiveFile)) {
if (LOG.isDebugEnabled()) {
LOG.debug("File:" + archiveFile + " already exists in archive, moving to "
+ "timestamped backup and overwriting current.");
}
// move the archive file to the stamped backup
Path backedupArchiveFile = new Path(archiveDir, filename + SEPARATOR + archiveStartTime);
if (!fs.rename(archiveFile, backedupArchiveFile)) {
LOG.error("Could not rename archive file to backup: " + backedupArchiveFile
+ ", deleting existing file in favor of newer.");
// try to delete the exisiting file, if we can't rename it
if (!fs.delete(archiveFile, false)) {
throw new IOException("Couldn't delete existing archive file (" + archiveFile
+ ") or rename it to the backup file (" + backedupArchiveFile
+ ")to make room for similarly named file.");
}
}
LOG.debug("Backed up archive file from: " + archiveFile);
}
LOG.debug("No existing file in archive for:" + archiveFile + ", free to archive original file.");
// at this point, we should have a free spot for the archive file
if (currentFile.moveAndClose(archiveFile)) {
LOG.error("Failed to archive file:" + currentFile);
return false;
} else if (LOG.isDebugEnabled()) {
LOG.debug("Finished archiving file from: " + currentFile + ", to: " + archiveFile);
}
return true;
}
/**
* Simple delete of regular files from the {@link FileSystem}.
* <p>
* This method is a more generic implementation that the other deleteXXX
* methods in this class, allowing more code reuse at the cost of a couple
* more, short-lived objects (which should have minimum impact on the jvm).
* @param fs {@link FileSystem} where the files live
* @param files {@link Collection} of files to be deleted
* @throws IOException if a file cannot be deleted. All files will be
* attempted to deleted before throwing the exception, rather than
* failing at the first file.
*/
private static void deleteFilesWithoutArchiving(Collection<File> files) throws IOException {
List<IOException> errors = new ArrayList<IOException>(0);
for (File file : files) {
try {
LOG.debug("Deleting region file:" + file);
file.delete();
} catch (IOException e) {
LOG.error("Failed to delete file:" + file);
errors.add(e);
}
}
if (errors.size() > 0) {
throw MultipleIOException.createIOException(errors);
}
}
/**
* Without regard for backup, delete a region. Should be used with caution.
* @param regionDir {@link Path} to the region to be deleted.
* @param fs FileSystem from which to delete the region
* @return <tt>true</tt> on successful deletion, <tt>false</tt> otherwise
* @throws IOException on filesystem operation failure
*/
private static boolean deleteRegionWithoutArchiving(FileSystem fs, Path regionDir)
throws IOException {
if (fs.delete(regionDir, true)) {
LOG.debug("Deleted all region files in: " + regionDir);
return true;
}
LOG.debug("Failed to delete region directory:" + regionDir);
return false;
}
/**
* Just do a simple delete of the given store files
* <p>
* A best effort is made to delete each of the files, rather than bailing on the first failure.
* <p>
* This method is preferable to {@link #deleteFilesWithoutArchiving(Collection)} since it consumes
* less resources, but is limited in terms of usefulness
* @param compactedFiles store files to delete from the file system.
* @throws IOException if a file cannot be deleted. All files will be attempted to deleted before
* throwing the exception, rather than failing at the first file.
*/
private static void deleteStoreFilesWithoutArchiving(Collection<StoreFile> compactedFiles)
throws IOException {
LOG.debug("Deleting store files without archiving.");
List<IOException> errors = new ArrayList<IOException>(0);
for (StoreFile hsf : compactedFiles) {
try {
hsf.deleteReader();
} catch (IOException e) {
LOG.error("Failed to delete store file:" + hsf.getPath());
errors.add(e);
}
}
if (errors.size() > 0) {
throw MultipleIOException.createIOException(errors);
}
}
/**
* Adapt a type to match the {@link File} interface, which is used internally for handling
* archival/removal of files
* @param <T> type to adapt to the {@link File} interface
*/
private static abstract class FileConverter<T> implements Function<T, File> {
protected final FileSystem fs;
public FileConverter(FileSystem fs) {
this.fs = fs;
}
}
/**
* Convert a FileStatus to something we can manage in the archiving
*/
private static class FileStatusConverter extends FileConverter<FileStatus> {
public FileStatusConverter(FileSystem fs) {
super(fs);
}
@Override
public File apply(FileStatus input) {
return new FileablePath(fs, input.getPath());
}
}
/**
* Convert the {@link StoreFile} into something we can manage in the archive
* methods
*/
private static class StoreToFile extends FileConverter<StoreFile> {
public StoreToFile(FileSystem fs) {
super(fs);
}
@Override
public File apply(StoreFile input) {
return new FileableStoreFile(fs, input);
}
}
/**
* Wrapper to handle file operations uniformly
*/
private static abstract class File {
protected final FileSystem fs;
public File(FileSystem fs) {
this.fs = fs;
}
/**
* Delete the file
* @throws IOException on failure
*/
abstract void delete() throws IOException;
/**
* Check to see if this is a file or a directory
* @return <tt>true</tt> if it is a file, <tt>false</tt> otherwise
* @throws IOException on {@link FileSystem} connection error
*/
abstract boolean isFile() throws IOException;
/**
* @return if this is a directory, returns all the children in the
* directory, otherwise returns an empty list
* @throws IOException
*/
abstract Collection<File> getChildren() throws IOException;
/**
* close any outside readers of the file
* @throws IOException
*/
abstract void close() throws IOException;
/**
* @return the name of the file (not the full fs path, just the individual
* file name)
*/
abstract String getName();
/**
* @return the path to this file
*/
abstract Path getPath();
/**
* Move the file to the given destination
* @param dest
* @return <tt>true</tt> on success
* @throws IOException
*/
public boolean moveAndClose(Path dest) throws IOException {
this.close();
Path p = this.getPath();
return !fs.rename(p, dest);
}
/**
* @return the {@link FileSystem} on which this file resides
*/
public FileSystem getFileSystem() {
return this.fs;
}
@Override
public String toString() {
return this.getClass() + ", file:" + getPath().toString();
}
}
/**
* A {@link File} that wraps a simple {@link Path} on a {@link FileSystem}.
*/
private static class FileablePath extends File {
private final Path file;
private final FileStatusConverter getAsFile;
public FileablePath(FileSystem fs, Path file) {
super(fs);
this.file = file;
this.getAsFile = new FileStatusConverter(fs);
}
@Override
public void delete() throws IOException {
if (!fs.delete(file, true)) throw new IOException("Failed to delete:" + this.file);
}
@Override
public String getName() {
return file.getName();
}
@Override
public Collection<File> getChildren() throws IOException {
if (fs.isFile(file)) return Collections.emptyList();
return Collections2.transform(Arrays.asList(fs.listStatus(file)), getAsFile);
}
@Override
public boolean isFile() throws IOException {
return fs.isFile(file);
}
@Override
public void close() throws IOException {
// NOOP - files are implicitly closed on removal
}
@Override
Path getPath() {
return file;
}
}
/**
* {@link File} adapter for a {@link StoreFile} living on a {@link FileSystem}
* .
*/
private static class FileableStoreFile extends File {
StoreFile file;
public FileableStoreFile(FileSystem fs, StoreFile store) {
super(fs);
this.file = store;
}
@Override
public void delete() throws IOException {
file.deleteReader();
}
@Override
public String getName() {
return file.getPath().getName();
}
@Override
public boolean isFile() {
return true;
}
@Override
public Collection<File> getChildren() throws IOException {
// storefiles don't have children
return Collections.emptyList();
}
@Override
public void close() throws IOException {
file.closeReader(true);
}
@Override
Path getPath() {
return file.getPath();
}
}
}

View File

@ -0,0 +1,169 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.backup.example;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/**
* Client-side manager for which table's hfiles should be preserved for long-term archive.
* @see ZKTableArchiveClient
* @see HFileArchiveTableMonitor
* @see LongTermArchivingHFileCleaner
*/
@InterfaceAudience.Private
class HFileArchiveManager {
private final String archiveZnode;
private static final Log LOG = LogFactory.getLog(HFileArchiveManager.class);
private final ZooKeeperWatcher zooKeeper;
private volatile boolean stopped = false;
public HFileArchiveManager(HConnection connection, Configuration conf)
throws ZooKeeperConnectionException, IOException {
this.zooKeeper = new ZooKeeperWatcher(conf, "hfileArchiveManger-on-" + connection.toString(),
connection);
this.archiveZnode = ZKTableArchiveClient.getArchiveZNode(this.zooKeeper.getConfiguration(),
this.zooKeeper);
}
/**
* Turn on auto-backups of HFiles on the specified table.
* <p>
* When HFiles would be deleted from the hfile archive, they are instead preserved.
* @param table name of the table for which to preserve hfiles.
* @return <tt>this</tt> for chaining.
* @throws KeeperException if we can't reach zookeeper to update the hfile cleaner.
*/
public HFileArchiveManager enableHFileBackup(byte[] table) throws KeeperException {
enable(this.zooKeeper, table);
return this;
}
/**
* Stop retaining HFiles for the given table in the archive. HFiles will be cleaned up on the next
* pass of the {@link HFileCleaner}, if the HFiles are retained by another cleaner.
* @param table name of the table for which to disable hfile retention.
* @return <tt>this</tt> for chaining.
* @throws KeeperException if if we can't reach zookeeper to update the hfile cleaner.
*/
public HFileArchiveManager disableHFileBackup(byte[] table) throws KeeperException {
disable(this.zooKeeper, table);
return this;
}
/**
* Disable long-term archival of all hfiles for all tables in the cluster.
* @return <tt>this</tt> for chaining.
* @throws IOException if the number of attempts is exceeded
*/
public HFileArchiveManager disableHFileBackup() throws IOException {
LOG.debug("Disabling backups on all tables.");
try {
ZKUtil.deleteNodeRecursively(this.zooKeeper, archiveZnode);
return this;
} catch (KeeperException e) {
throw new IOException("Unexpected ZK exception!", e);
}
}
/**
* Perform a best effort enable of hfile retention, which relies on zookeeper communicating the //
* * change back to the hfile cleaner.
* <p>
* No attempt is made to make sure that backups are successfully created - it is inherently an
* <b>asynchronous operation</b>.
* @param zooKeeper watcher connection to zk cluster
* @param table table name on which to enable archiving
* @throws KeeperException
*/
private void enable(ZooKeeperWatcher zooKeeper, byte[] table)
throws KeeperException {
LOG.debug("Ensuring archiving znode exists");
ZKUtil.createAndFailSilent(zooKeeper, archiveZnode);
// then add the table to the list of znodes to archive
String tableNode = this.getTableNode(table);
LOG.debug("Creating: " + tableNode + ", data: []");
ZKUtil.createSetData(zooKeeper, tableNode, new byte[0]);
}
/**
* Disable all archiving of files for a given table
* <p>
* Inherently an <b>asynchronous operation</b>.
* @param zooKeeper watcher for the ZK cluster
* @param table name of the table to disable
* @throws KeeperException if an unexpected ZK connection issues occurs
*/
private void disable(ZooKeeperWatcher zooKeeper, byte[] table) throws KeeperException {
// ensure the latest state of the archive node is found
zooKeeper.sync(archiveZnode);
// if the top-level archive node is gone, then we are done
if (ZKUtil.checkExists(zooKeeper, archiveZnode) < 0) {
return;
}
// delete the table node, from the archive
String tableNode = this.getTableNode(table);
// make sure the table is the latest version so the delete takes
zooKeeper.sync(tableNode);
LOG.debug("Attempting to delete table node:" + tableNode);
ZKUtil.deleteNodeRecursively(zooKeeper, tableNode);
}
public void stop() {
if (!this.stopped) {
this.stopped = true;
LOG.debug("Stopping HFileArchiveManager...");
this.zooKeeper.close();
}
}
/**
* Check to see if the table is currently marked for archiving
* @param table name of the table to check
* @return <tt>true</tt> if the archive znode for that table exists, <tt>false</tt> if not
* @throws KeeperException if an unexpected zookeeper error occurs
*/
public boolean isArchivingEnabled(byte[] table) throws KeeperException {
String tableNode = this.getTableNode(table);
return ZKUtil.checkExists(zooKeeper, tableNode) >= 0;
}
/**
* Get the zookeeper node associated with archiving the given table
* @param table name of the table to check
* @return znode for the table's archive status
*/
private String getTableNode(byte[] table) {
return ZKUtil.joinZNode(archiveZnode, Bytes.toString(table));
}
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.backup.example;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Monitor the actual tables for which HFiles are archived for long-term retention (always kept
* unless ZK state changes).
* <p>
* It is internally synchronized to ensure consistent view of the table state.
*/
public class HFileArchiveTableMonitor {
private static final Log LOG = LogFactory.getLog(HFileArchiveTableMonitor.class);
private final Set<String> archivedTables = new TreeSet<String>();
/**
* Set the tables to be archived. Internally adds each table and attempts to
* register it.
* <p>
* <b>Note: All previous tables will be removed in favor of these tables.<b>
* @param tables add each of the tables to be archived.
*/
public synchronized void setArchiveTables(List<String> tables) {
archivedTables.clear();
archivedTables.addAll(tables);
}
/**
* Add the named table to be those being archived. Attempts to register the
* table
* @param table name of the table to be registered
*/
public synchronized void addTable(String table) {
if (this.shouldArchiveTable(table)) {
LOG.debug("Already archiving table: " + table + ", ignoring it");
return;
}
archivedTables.add(table);
}
public synchronized void removeTable(String table) {
archivedTables.remove(table);
}
public synchronized void clearArchive() {
archivedTables.clear();
}
/**
* Determine if the given table should or should not allow its hfiles to be deleted in the archive
* @param tableName name of the table to check
* @return <tt>true</tt> if its store files should be retained, <tt>false</tt> otherwise
*/
public synchronized boolean shouldArchiveTable(String tableName) {
return archivedTables.contains(tableName);
}
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.backup.example;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.zookeeper.KeeperException;
/**
* {@link BaseHFileCleanerDelegate} that only cleans HFiles that don't belong to a table that is
* currently being archived.
* <p>
* This only works properly if the {@link TimeToLiveHFileCleaner} is also enabled (it always should
* be), since it may take a little time for the ZK notification to propagate, in which case we may
* accidentally delete some files.
*/
@InterfaceAudience.Private
public class LongTermArchivingHFileCleaner extends BaseHFileCleanerDelegate {
private static final Log LOG = LogFactory.getLog(LongTermArchivingHFileCleaner.class);
private TableHFileArchiveTracker archiveTracker;
private FileSystem fs;
@Override
public boolean isFileDeleteable(Path file) {
try {
FileStatus[] deleteStatus = FSUtils.listStatus(this.fs, file, null);
// if the file doesn't exist, then it can be deleted (but should never
// happen since deleted files shouldn't get passed in)
if (deleteStatus == null) return true;
// if its a directory with stuff in it, don't delete
if (deleteStatus.length > 1) return false;
// if its an empty directory, we can definitely delete
if (deleteStatus[0].isDir()) return true;
// otherwise, we need to check the file's table and see its being archived
Path family = file.getParent();
Path region = family.getParent();
Path table = region.getParent();
String tableName = table.getName();
return !archiveTracker.keepHFiles(tableName);
} catch (IOException e) {
LOG.error("Failed to lookup status of:" + file + ", keeping it just incase.", e);
return false;
}
}
@Override
public void setConf(Configuration config) {
// setup our own zookeeper connection
// Make my own Configuration. Then I'll have my own connection to zk that
// I can close myself when comes time.
Configuration conf = new Configuration(config);
super.setConf(conf);
try {
this.fs = FileSystem.get(conf);
this.archiveTracker = TableHFileArchiveTracker.create(conf);
this.archiveTracker.start();
} catch (KeeperException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
} catch (IOException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
}
}
@Override
public void stop(String reason) {
if (this.isStopped()) return;
super.stop(reason);
if (this.archiveTracker != null) {
LOG.info("Stopping " + this.archiveTracker);
this.archiveTracker.stop();
}
}
}

View File

@ -0,0 +1,267 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.backup.example;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/**
* Track HFile archiving state changes in ZooKeeper. Keeps track of the tables whose HFiles should
* be kept in the archive.
* <p>
* {@link TableHFileArchiveTracker#start()} needs to be called to start monitoring for tables to
* archive.
*/
@InterfaceAudience.Private
public class TableHFileArchiveTracker extends ZooKeeperListener {
private static final Log LOG = LogFactory.getLog(TableHFileArchiveTracker.class);
public static final String HFILE_ARCHIVE_ZNODE_PARENT = "hfilearchive";
private HFileArchiveTableMonitor monitor;
private String archiveHFileZNode;
private boolean stopped = false;
private TableHFileArchiveTracker(ZooKeeperWatcher watcher, HFileArchiveTableMonitor monitor) {
super(watcher);
watcher.registerListener(this);
this.monitor = monitor;
this.archiveHFileZNode = ZKTableArchiveClient.getArchiveZNode(watcher.getConfiguration(),
watcher);
}
/**
* Start monitoring for archive updates
* @throws KeeperException on failure to find/create nodes
*/
public void start() throws KeeperException {
// if archiving is enabled, then read in the list of tables to archive
LOG.debug("Starting hfile archive tracker...");
this.checkEnabledAndUpdate();
LOG.debug("Finished starting hfile archive tracker!");
}
@Override
public void nodeCreated(String path) {
// if it is the archive path
if (!path.startsWith(archiveHFileZNode)) return;
LOG.debug("Archive node: " + path + " created");
// since we are already enabled, just update a single table
String table = path.substring(archiveHFileZNode.length());
// the top level node has come up, so read in all the tables
if (table.length() == 0) {
checkEnabledAndUpdate();
return;
}
// find the table that needs to be archived
try {
addAndReWatchTable(path);
} catch (KeeperException e) {
LOG.warn("Couldn't read zookeeper data for table for path:" + path
+ ", not preserving a table.", e);
}
}
@Override
public void nodeChildrenChanged(String path) {
if (!path.startsWith(archiveHFileZNode)) return;
LOG.debug("Archive node: " + path + " children changed.");
// a table was added to the archive
try {
updateWatchedTables();
} catch (KeeperException e) {
LOG.error("Failed to update tables to archive", e);
}
}
/**
* Add this table to the tracker and then read a watch on that node.
* <p>
* Handles situtation where table is deleted in the time between the update and resetting the
* watch by deleting the table via {@link #safeStopTrackingTable(String)}
* @param tableZnode full zookeeper path to the table to be added
* @throws KeeperException if an unexpected zk exception occurs
*/
private void addAndReWatchTable(String tableZnode) throws KeeperException {
getMonitor().addTable(ZKUtil.getNodeName(tableZnode));
// re-add a watch to the table created
// and check to make sure it wasn't deleted
if (!ZKUtil.watchAndCheckExists(watcher, tableZnode)) {
safeStopTrackingTable(tableZnode);
}
}
/**
* Stop tracking a table. Ensures that the table doesn't exist, but if it does, it attempts to add
* the table back via {@link #addAndReWatchTable(String)} - its a 'safe' removal.
* @param tableZnode full zookeeper path to the table to be added
* @throws KeeperException if an unexpected zk exception occurs
*/
private void safeStopTrackingTable(String tableZnode) throws KeeperException {
getMonitor().removeTable(ZKUtil.getNodeName(tableZnode));
// if the table exists, then add and rewatch it
if (ZKUtil.checkExists(watcher, tableZnode) >= 0) {
addAndReWatchTable(tableZnode);
}
}
@Override
public void nodeDeleted(String path) {
if (!path.startsWith(archiveHFileZNode)) return;
LOG.debug("Archive node: " + path + " deleted");
String table = path.substring(archiveHFileZNode.length());
// if we stop archiving all tables
if (table.length() == 0) {
// make sure we have the tracker before deleting the archive
// but if we don't, we don't care about delete
clearTables();
// watches are one-time events, so we need to renew our subscription to
// the archive node and might as well check to make sure archiving
// didn't come back on at the same time
checkEnabledAndUpdate();
return;
}
// just stop archiving one table
// note that we don't attempt to add another watch for that table into zk.
// We have no assurances that the table will be archived again (or even
// exists for that matter), so its better not to add unnecessary load to
// zk for watches. If the table is created again, then we will get the
// notification in childrenChanaged.
getMonitor().removeTable(ZKUtil.getNodeName(path));
}
/**
* Sets the watch on the top-level archive znode, and then updates the montior with the current
* tables that should be archived (and ensures that those nodes are watched as well).
*/
private void checkEnabledAndUpdate() {
try {
if (ZKUtil.watchAndCheckExists(watcher, archiveHFileZNode)) {
LOG.debug(archiveHFileZNode + " znode does exist, checking for tables to archive");
// update the tables we should backup, to get the most recent state.
// This is safer than also watching for children and then hoping we get
// all the updates as it makes sure we get and watch all the children
updateWatchedTables();
} else {
LOG.debug("Archiving not currently enabled, waiting");
}
} catch (KeeperException e) {
LOG.warn("Failed to watch for archiving znode", e);
}
}
/**
* Read the list of children under the archive znode as table names and then sets those tables to
* the list of tables that we should archive
* @throws KeeperException if there is an unexpected zk exception
*/
private void updateWatchedTables() throws KeeperException {
// get the children and watch for new children
LOG.debug("Updating watches on tables to archive.");
// get the children and add watches for each of the children
List<String> tables = ZKUtil.listChildrenAndWatchThem(watcher, archiveHFileZNode);
LOG.debug("Starting archive for tables:" + tables);
// if archiving is still enabled
if (tables != null && tables.size() > 0) {
getMonitor().setArchiveTables(tables);
} else {
LOG.debug("No tables to archive.");
// only if we currently have a tracker, then clear the archive
clearTables();
}
}
/**
* Remove the currently archived tables.
* <p>
* Does some intelligent checking to make sure we don't prematurely create an archive tracker.
*/
private void clearTables() {
getMonitor().clearArchive();
}
/**
* Determine if the given table should or should not allow its hfiles to be deleted
* @param tableName name of the table to check
* @return <tt>true</tt> if its store files should be retained, <tt>false</tt> otherwise
*/
public boolean keepHFiles(String tableName) {
return getMonitor().shouldArchiveTable(tableName);
}
/**
* @return the tracker for which tables should be archived.
*/
public final HFileArchiveTableMonitor getMonitor() {
return this.monitor;
}
/**
* Create an archive tracker for the passed in server
* @param conf to read for zookeeper connection information
* @return ZooKeeper tracker to monitor for this server if this server should archive hfiles for a
* given table
* @throws IOException If a unexpected exception occurs
* @throws ZooKeeperConnectionException if we can't reach zookeeper
*/
public static TableHFileArchiveTracker create(Configuration conf)
throws ZooKeeperConnectionException, IOException {
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "hfileArchiveCleaner", null);
return create(zkw, new HFileArchiveTableMonitor());
}
/**
* Create an archive tracker with the special passed in table monitor. Should only be used in
* special cases (eg. testing)
* @param zkw Watcher for the ZooKeeper cluster that we should track
* @param monitor Monitor for which tables need hfile archiving
* @return ZooKeeper tracker to monitor for this server if this server should archive hfiles for a
* given table
*/
private static TableHFileArchiveTracker create(ZooKeeperWatcher zkw,
HFileArchiveTableMonitor monitor) {
return new TableHFileArchiveTracker(zkw, monitor);
}
public ZooKeeperWatcher getZooKeeperWatcher() {
return this.watcher;
}
/**
* Stop this tracker and the passed zookeeper
*/
public void stop() {
if (this.stopped) return;
this.stopped = true;
this.watcher.close();
}
}

View File

@ -0,0 +1,155 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.backup.example;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/**
* Example class for how to use the table archiving coordinated via zookeeper
*/
@InterfaceAudience.Public
public class ZKTableArchiveClient extends Configured {
/** Configuration key for the archive node. */
private static final String ZOOKEEPER_ZNODE_HFILE_ARCHIVE_KEY = "zookeeper.znode.hfile.archive";
private HConnection connection;
public ZKTableArchiveClient(Configuration conf, HConnection connection) {
super(conf);
this.connection = connection;
}
/**
* Turn on backups for all HFiles for the given table.
* <p>
* All deleted hfiles are moved to the archive directory under the table directory, rather than
* being deleted.
* <p>
* If backups are already enabled for this table, does nothing.
* <p>
* If the table does not exist, the archiving the table's hfiles is still enabled as a future
* table with that name may be created shortly.
* @param table name of the table to start backing up
* @throws IOException if an unexpected exception occurs
* @throws KeeperException if zookeeper can't be reached
*/
public void enableHFileBackupAsync(final byte[] table) throws IOException, KeeperException {
createHFileArchiveManager().enableHFileBackup(table).stop();
}
/**
* Disable hfile backups for the given table.
* <p>
* Previously backed up files are still retained (if present).
* <p>
* Asynchronous operation - some extra HFiles may be retained, in the archive directory after
* disable is called, dependent on the latency in zookeeper to the servers.
* @param table name of the table stop backing up
* @throws IOException if an unexpected exception occurs
* @throws KeeperException if zookeeper can't be reached
*/
public void disableHFileBackup(String table) throws IOException, KeeperException {
disableHFileBackup(Bytes.toBytes(table));
}
/**
* Disable hfile backups for the given table.
* <p>
* Previously backed up files are still retained (if present).
* <p>
* Asynchronous operation - some extra HFiles may be retained, in the archive directory after
* disable is called, dependent on the latency in zookeeper to the servers.
* @param table name of the table stop backing up
* @throws IOException if an unexpected exception occurs
* @throws KeeperException if zookeeper can't be reached
*/
public void disableHFileBackup(final byte[] table) throws IOException, KeeperException {
createHFileArchiveManager().disableHFileBackup(table).stop();
}
/**
* Disable hfile backups for all tables.
* <p>
* Previously backed up files are still retained (if present).
* <p>
* Asynchronous operation - some extra HFiles may be retained, in the archive directory after
* disable is called, dependent on the latency in zookeeper to the servers.
* @throws IOException if an unexpected exception occurs
* @throws KeeperException if zookeeper can't be reached
*/
public void disableHFileBackup() throws IOException, KeeperException {
createHFileArchiveManager().disableHFileBackup().stop();
}
/**
* Determine if archiving is enabled (but not necessarily fully propagated) for a table
* @param table name of the table to check
* @return <tt>true</tt> if it is, <tt>false</tt> otherwise
* @throws IOException if a connection to ZooKeeper cannot be established
* @throws KeeperException
*/
public boolean getArchivingEnabled(byte[] table) throws IOException, KeeperException {
HFileArchiveManager manager = createHFileArchiveManager();
try {
return manager.isArchivingEnabled(table);
} finally {
manager.stop();
}
}
/**
* Determine if archiving is enabled (but not necessarily fully propagated) for a table
* @param table name of the table to check
* @return <tt>true</tt> if it is, <tt>false</tt> otherwise
* @throws IOException if an unexpected network issue occurs
* @throws KeeperException if zookeeper can't be reached
*/
public boolean getArchivingEnabled(String table) throws IOException, KeeperException {
return getArchivingEnabled(Bytes.toBytes(table));
}
/**
* @return A new {@link HFileArchiveManager} to manage which tables' hfiles should be archived
* rather than deleted.
* @throws KeeperException if we can't reach zookeeper
* @throws IOException if an unexpected network issue occurs
*/
private synchronized HFileArchiveManager createHFileArchiveManager() throws KeeperException,
IOException {
return new HFileArchiveManager(this.connection, this.getConf());
}
/**
* @param conf conf to read for the base archive node
* @param zooKeeper zookeeper to used for building the full path
* @return get the znode for long-term archival of a table for
*/
public static String getArchiveZNode(Configuration conf, ZooKeeperWatcher zooKeeper) {
return ZKUtil.joinZNode(zooKeeper.baseZNode, conf.get(ZOOKEEPER_ZNODE_HFILE_ARCHIVE_KEY,
TableHFileArchiveTracker.HFILE_ARCHIVE_ZNODE_PARENT));
}
}

View File

@ -41,11 +41,10 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
@ -53,7 +52,6 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Writables;
/**
* A janitor for the catalog tables. Scans the <code>.META.</code> catalog
* table on a period looking for unused regions to garbage collect.
@ -253,7 +251,7 @@ class CatalogJanitor extends Chore {
if (hasNoReferences(a) && hasNoReferences(b)) {
LOG.debug("Deleting region " + parent.getRegionNameAsString() +
" because daughter splits no longer hold references");
// wipe out daughter references from parent region
// wipe out daughter references from parent region in meta
removeDaughtersFromParent(parent);
// This latter regionOffline should not be necessary but is done for now
@ -264,8 +262,7 @@ class CatalogJanitor extends Chore {
this.services.getAssignmentManager().regionOffline(parent);
}
FileSystem fs = this.services.getMasterFileSystem().getFileSystem();
Path rootdir = this.services.getMasterFileSystem().getRootDir();
HRegion.deleteRegion(fs, rootdir, parent);
HFileArchiver.archiveRegion(fs, parent);
MetaEditor.deleteRegion(this.server.getCatalogTracker(), parent);
result = true;
}

View File

@ -46,6 +46,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.ClusterStatus;
@ -85,6 +86,8 @@ import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
import org.apache.hadoop.hbase.master.handler.DeleteTableHandler;
import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
@ -104,6 +107,8 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.InfoServer;
import org.apache.hadoop.hbase.util.Pair;
@ -276,6 +281,7 @@ Server {
private CatalogJanitor catalogJanitorChore;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
private MasterCoprocessorHost cpHost;
private final ServerName serverName;
@ -997,12 +1003,19 @@ Server {
// Start log cleaner thread
String n = Thread.currentThread().getName();
int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
this.logCleaner =
new LogCleaner(conf.getInt("hbase.master.cleaner.interval", 60 * 1000),
new LogCleaner(cleanerInterval,
this, conf, getMasterFileSystem().getFileSystem(),
getMasterFileSystem().getOldLogDir());
Threads.setDaemonThreadRunning(logCleaner.getThread(), n + ".oldLogCleaner");
//start the hfile archive cleaner thread
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
.getFileSystem(), archiveDir);
Threads.setDaemonThreadRunning(hfileCleaner.getThread(), n + ".archivedHFileCleaner");
// Put up info server.
int port = this.conf.getInt(HConstants.MASTER_INFO_PORT, 60010);
if (port >= 0) {
@ -1038,6 +1051,8 @@ Server {
this.rpcServerOpen = false;
// Clean up and close up shop
if (this.logCleaner!= null) this.logCleaner.interrupt();
if (this.hfileCleaner != null) this.hfileCleaner.interrupt();
if (this.infoServer != null) {
LOG.info("Stopping infoServer");
try {
@ -2246,4 +2261,8 @@ Server {
MBeanUtil.registerMBean("Master", "Master", mxBeanInfo);
LOG.info("Registered HMaster MXBean");
}
public HFileCleaner getHFileCleaner() {
return this.hfileCleaner;
}
}

View File

@ -1,167 +0,0 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.FSUtils;
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
/**
* This Chore, everytime it runs, will clear the HLogs in the old logs folder
* that are deletable for each log cleaner in the chain.
*/
@InterfaceAudience.Private
public class LogCleaner extends Chore {
static final Log LOG = LogFactory.getLog(LogCleaner.class.getName());
private final FileSystem fs;
private final Path oldLogDir;
private List<LogCleanerDelegate> logCleanersChain;
private final Configuration conf;
/**
*
* @param p the period of time to sleep between each run
* @param s the stopper
* @param conf configuration to use
* @param fs handle to the FS
* @param oldLogDir the path to the archived logs
*/
public LogCleaner(final int p, final Stoppable s,
Configuration conf, FileSystem fs,
Path oldLogDir) {
super("LogsCleaner", p, s);
this.fs = fs;
this.oldLogDir = oldLogDir;
this.conf = conf;
this.logCleanersChain = new LinkedList<LogCleanerDelegate>();
initLogCleanersChain();
}
/*
* Initialize the chain of log cleaners from the configuration. The default
* in this chain are: TimeToLiveLogCleaner and ReplicationLogCleaner.
*/
private void initLogCleanersChain() {
String[] logCleaners = conf.getStrings(HBASE_MASTER_LOGCLEANER_PLUGINS);
if (logCleaners != null) {
for (String className : logCleaners) {
LogCleanerDelegate logCleaner = newLogCleaner(className, conf);
addLogCleaner(logCleaner);
}
}
}
/**
* A utility method to create new instances of LogCleanerDelegate based
* on the class name of the LogCleanerDelegate.
* @param className fully qualified class name of the LogCleanerDelegate
* @param conf
* @return the new instance
*/
public static LogCleanerDelegate newLogCleaner(String className, Configuration conf) {
try {
Class c = Class.forName(className);
LogCleanerDelegate cleaner = (LogCleanerDelegate) c.newInstance();
cleaner.setConf(conf);
return cleaner;
} catch(Exception e) {
LOG.warn("Can NOT create LogCleanerDelegate: " + className, e);
// skipping if can't instantiate
return null;
}
}
/**
* Add a LogCleanerDelegate to the log cleaner chain. A log file is deletable
* if it is deletable for each LogCleanerDelegate in the chain.
* @param logCleaner
*/
public void addLogCleaner(LogCleanerDelegate logCleaner) {
if (logCleaner != null && !logCleanersChain.contains(logCleaner)) {
logCleanersChain.add(logCleaner);
LOG.debug("Add log cleaner in chain: " + logCleaner.getClass().getName());
}
}
@Override
protected void chore() {
try {
FileStatus [] files = FSUtils.listStatus(this.fs, this.oldLogDir, null);
if (files == null) return;
FILE: for (FileStatus file : files) {
Path filePath = file.getPath();
if (HLog.validateHLogFilename(filePath.getName())) {
for (LogCleanerDelegate logCleaner : logCleanersChain) {
if (logCleaner.isStopped()) {
LOG.warn("A log cleaner is stopped, won't delete any log.");
return;
}
if (!logCleaner.isLogDeletable(filePath) ) {
// this log is not deletable, continue to process next log file
continue FILE;
}
}
// delete this log file if it passes all the log cleaners
this.fs.delete(filePath, true);
} else {
LOG.warn("Found a wrongly formated file: "
+ file.getPath().getName());
this.fs.delete(filePath, true);
}
}
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.warn("Error while cleaning the logs", e);
}
}
@Override
public void run() {
try {
super.run();
} finally {
for (LogCleanerDelegate lc: this.logCleanersChain) {
try {
lc.stop("Exiting");
} catch (Throwable t) {
LOG.warn("Stopping", t);
}
}
}
}
}

View File

@ -44,8 +44,10 @@ import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException;
@ -444,7 +446,7 @@ public class MasterFileSystem {
public void deleteRegion(HRegionInfo region) throws IOException {
fs.delete(HRegion.getRegionDir(rootdir, region), true);
HFileArchiver.archiveRegion(fs, region);
}
public void deleteTable(byte[] tableName) throws IOException {

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.BaseConfigurable;
/**
* Base class for the hfile cleaning function inside the master. By default, only the
* {@link TimeToLiveHFileCleaner} is called.
* <p>
* If other effects are needed, implement your own LogCleanerDelegate and add it to the
* configuration "hbase.master.hfilecleaner.plugins", which is a comma-separated list of fully
* qualified class names. The <code>HFileCleaner<code> will build the cleaner chain in
* order the order specified by the configuration.
* <p>
* For subclasses, setConf will be called exactly <i>once</i> before using the cleaner.
* <p>
* Since {@link BaseHFileCleanerDelegate HFileCleanerDelegates} are created in
* HFileCleaner by reflection, classes that implements this interface <b>must</b>
* provide a default constructor.
*/
@InterfaceAudience.Private
public abstract class BaseHFileCleanerDelegate extends BaseConfigurable implements
FileCleanerDelegate {
private boolean stopped = false;
@Override
public void stop(String why) {
this.stopped = true;
}
@Override
public boolean isStopped() {
return this.stopped;
}
}

View File

@ -1,6 +1,4 @@
/*
* Copyright 2010 The Apache Software Foundation
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -17,34 +15,42 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
package org.apache.hadoop.hbase.master.cleaner;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.BaseConfigurable;
/**
* Interface for the log cleaning function inside the master. By default, two
* Base class for the log cleaning function inside the master. By default, two
* cleaners: <code>TimeToLiveLogCleaner</code> and
* <code>ReplicationLogCleaner</code> are called in order. So if other
* effects are needed, implement your own LogCleanerDelegate and add it to the
* <code>ReplicationLogCleaner</code> are called in order. So if other effects
* are needed, implement your own LogCleanerDelegate and add it to the
* configuration "hbase.master.logcleaner.plugins", which is a comma-separated
* list of fully qualified class names. LogsCleaner will add it to the chain.
*
* <p>HBase ships with LogsCleaner as the default implementation.
*
* <p>This interface extends Configurable, so setConf needs to be called once
* before using the cleaner.
* Since LogCleanerDelegates are created in LogsCleaner by reflection. Classes
* that implements this interface should provide a default constructor.
* <p>
* HBase ships with LogsCleaner as the default implementation.
* <p>
* This interface extends Configurable, so setConf needs to be called once
* before using the cleaner. Since LogCleanerDelegates are created in
* LogsCleaner by reflection. Classes that implements this interface should
* provide a default constructor.
*/
@InterfaceAudience.Private
public interface LogCleanerDelegate extends Configurable, Stoppable {
public abstract class BaseLogCleanerDelegate extends BaseConfigurable implements FileCleanerDelegate {
@Override
public boolean isFileDeleteable(Path file) {
return isLogDeletable(file);
}
/**
* Should the master delete the log or keep it?
* <p>
* Implementing classes should override {@link #isFileDeleteable(Path)} instead.
* @param filePath full path to log.
* @return true if the log is deletable, false if not
*/
public boolean isLogDeletable(Path filePath);
@Deprecated
public abstract boolean isLogDeletable(Path filePath);
}

View File

@ -0,0 +1,238 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.util.FSUtils;
/**
* Abstract Cleaner that uses a chain of delegates to clean a directory of files
* @param <T> Cleaner delegate class that is dynamically loaded from configuration
*/
public abstract class CleanerChore<T extends FileCleanerDelegate> extends Chore {
private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName());
private final FileSystem fs;
private final Path oldFileDir;
private final Configuration conf;
private List<T> cleanersChain;
/**
* @param name name of the chore being run
* @param sleepPeriod the period of time to sleep between each run
* @param s the stopper
* @param conf configuration to use
* @param fs handle to the FS
* @param oldFileDir the path to the archived files
* @param confKey configuration key for the classes to instantiate
*/
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey) {
super(name, sleepPeriod, s);
this.fs = fs;
this.oldFileDir = oldFileDir;
this.conf = conf;
initCleanerChain(confKey);
}
/**
* Validate the file to see if it even belongs in the directory. If it is valid, then the file
* will go through the cleaner delegates, but otherwise the file is just deleted.
* @param file full {@link Path} of the file to be checked
* @return <tt>true</tt> if the file is valid, <tt>false</tt> otherwise
*/
protected abstract boolean validate(Path file);
/**
* Instanitate and initialize all the file cleaners set in the configuration
* @param confKey key to get the file cleaner classes from the configuration
*/
private void initCleanerChain(String confKey) {
this.cleanersChain = new LinkedList<T>();
String[] logCleaners = conf.getStrings(confKey);
if (logCleaners != null) {
for (String className : logCleaners) {
T logCleaner = newFileCleaner(className, conf);
if (logCleaner != null) this.cleanersChain.add(logCleaner);
}
}
}
/**
* A utility method to create new instances of LogCleanerDelegate based on the class name of the
* LogCleanerDelegate.
* @param className fully qualified class name of the LogCleanerDelegate
* @param conf
* @return the new instance
*/
public T newFileCleaner(String className, Configuration conf) {
try {
Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass(
FileCleanerDelegate.class);
@SuppressWarnings("unchecked")
T cleaner = (T) c.newInstance();
cleaner.setConf(conf);
return cleaner;
} catch (Exception e) {
LOG.warn("Can NOT create CleanerDelegate: " + className, e);
// skipping if can't instantiate
return null;
}
}
@Override
protected void chore() {
try {
FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir, null);
// if the path (file or directory) doesn't exist, then we can just return
if (files == null) return;
// loop over the found files and see if they should be deleted
for (FileStatus file : files) {
try {
if (file.isDir()) checkDirectory(file.getPath());
else checkAndDelete(file.getPath());
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.warn("Error while cleaning the logs", e);
}
}
} catch (IOException e) {
LOG.warn("Failed to get status of:" + oldFileDir);
}
}
/**
* Check to see if we can delete a directory (and all the children files of that directory).
* <p>
* A directory will not be deleted if it has children that are subsequently deleted since that
* will require another set of lookups in the filesystem, which is semantically same as waiting
* until the next time the chore is run, so we might as well wait.
* @param fs {@link FileSystem} where he directory resides
* @param toCheck directory to check
* @throws IOException
*/
private void checkDirectory(Path toCheck) throws IOException {
LOG.debug("Checking directory: " + toCheck);
FileStatus[] files = checkAndDeleteDirectory(toCheck);
// if the directory doesn't exist, then we are done
if (files == null) return;
// otherwise we need to check each of the child files
for (FileStatus file : files) {
Path filePath = file.getPath();
// if its a directory, then check to see if it should be deleted
if (file.isDir()) {
// check the subfiles to see if they can be deleted
checkDirectory(filePath);
continue;
}
// otherwise we can just check the file
checkAndDelete(filePath);
}
// recheck the directory to see if we can delete it this time
checkAndDeleteDirectory(toCheck);
}
/**
* Check and delete the passed directory if the directory is empty
* @param toCheck full path to the directory to check (and possibly delete)
* @return <tt>null</tt> if the directory was empty (and possibly deleted) and otherwise an array
* of <code>FileStatus</code> for the files in the directory
* @throws IOException
*/
private FileStatus[] checkAndDeleteDirectory(Path toCheck) throws IOException {
LOG.debug("Attempting to delete directory:" + toCheck);
// if it doesn't exist, we are done
if (!fs.exists(toCheck)) return null;
// get the files below the directory
FileStatus[] files = FSUtils.listStatus(fs, toCheck, null);
// if there are no subfiles, then we can delete the directory
if (files == null) {
checkAndDelete(toCheck);
return null;
}
// return the status of the files in the directory
return files;
}
/**
* Run the given file through each of the cleaners to see if it should be deleted, deleting it if
* necessary.
* @param filePath path of the file to check (and possibly delete)
* @throws IOException if cann't delete a file because of a filesystem issue
* @throws IllegalArgumentException if the file is a directory and has children
*/
private void checkAndDelete(Path filePath) throws IOException, IllegalArgumentException {
if (!validate(filePath)) {
LOG.warn("Found a wrongly formatted file: " + filePath.getName() + "deleting it.");
if (!this.fs.delete(filePath, true)) {
LOG.warn("Attempted to delete:" + filePath
+ ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
}
return;
}
for (T cleaner : cleanersChain) {
if (cleaner.isStopped()) {
LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any file in:"
+ this.oldFileDir);
return;
}
if (!cleaner.isFileDeleteable(filePath)) {
// this file is not deletable, then we are done
LOG.debug(filePath + " is not deletable according to:" + cleaner);
return;
}
}
// delete this file if it passes all the cleaners
LOG.debug("Removing:" + filePath + " from archive");
if (this.fs.delete(filePath, false)) {
LOG.warn("Attempted to delete:" + filePath
+ ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
}
}
@Override
public void cleanup() {
for (T lc : this.cleanersChain) {
try {
lc.stop("Exiting");
} catch (Throwable t) {
LOG.warn("Stopping", t);
}
}
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
/**
* General interface for cleaning files from a folder (generally an archive or
* backup folder). These are chained via the {@link CleanerChore} to determine
* if a given file should be deleted.
*/
@InterfaceAudience.Private
public interface FileCleanerDelegate extends Configurable, Stoppable {
/**
* Should the master delete the file or keep it?
* @param file full path to the file to check
* @return <tt>true</tt> if the file is deletable, <tt>false</tt> if not
*/
public boolean isFileDeleteable(Path file);
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.regionserver.StoreFile;
/**
* This Chore, every time it runs, will clear the HFiles in the hfile archive
* folder that are deletable for each HFile cleaner in the chain.
*/
@InterfaceAudience.Private
public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins";
/**
* @param period the period of time to sleep between each run
* @param stopper the stopper
* @param conf configuration to use
* @param fs handle to the FS
* @param directory directory to be cleaned
*/
public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path directory) {
super("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS);
}
@Override
protected boolean validate(Path file) {
return StoreFile.validateStoreFileName(file.getName());
}
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
/**
* This Chore, every time it runs, will attempt to delete the HLogs in the old logs folder. The HLog
* is only deleted if none of the cleaner delegates says otherwise.
* @see BaseLogCleanerDelegate
*/
@InterfaceAudience.Private
public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
static final Log LOG = LogFactory.getLog(LogCleaner.class.getName());
/**
* @param p the period of time to sleep between each run
* @param s the stopper
* @param conf configuration to use
* @param fs handle to the FS
* @param oldLogDir the path to the archived logs
*/
public LogCleaner(final int p, final Stoppable s, Configuration conf, FileSystem fs,
Path oldLogDir) {
super("LogsCleaner", p, s, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS);
}
@Override
protected boolean validate(Path file) {
return HLog.validateHLogFilename(file.getName());
}
}

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* HFile cleaner that uses the timestamp of the hfile to determine if it should be deleted. By
* default they are allowed to live for {@value TimeToLiveHFileCleaner#DEFAULT_TTL}
*/
@InterfaceAudience.Private
public class TimeToLiveHFileCleaner extends BaseHFileCleanerDelegate {
public static final Log LOG = LogFactory.getLog(TimeToLiveHFileCleaner.class.getName());
public static final String TTL_CONF_KEY = "hbase.master.hfilecleaner.ttl";
// default ttl = 5 minute
private static final long DEFAULT_TTL = 60000 * 5;
// Configured time a hfile can be kept after it was moved to the archive
private long ttl;
private FileSystem fs;
@Override
public void setConf(Configuration conf) {
this.ttl = conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
super.setConf(conf);
}
@Override
public boolean isFileDeleteable(Path filePath) {
if (!instantiateFS()) {
return false;
}
long time = 0;
long currentTime = EnvironmentEdgeManager.currentTimeMillis();
try {
FileStatus fStat = fs.getFileStatus(filePath);
time = fStat.getModificationTime();
} catch (IOException e) {
LOG.error("Unable to get modification time of file " + filePath.getName()
+ ", not deleting it.", e);
return false;
}
long life = currentTime - time;
LOG.debug("Life:" + life + ", tt:" + ttl + ", current:" + currentTime + ", from: " + time);
if (life < 0) {
LOG.warn("Found a log (" + filePath + ") newer than current time (" + currentTime + " < "
+ time + "), probably a clock skew");
return false;
}
return life > ttl;
}
/**
* setup the filesystem, if it hasn't been already
*/
private synchronized boolean instantiateFS() {
if (this.fs == null) {
try {
this.fs = FileSystem.get(this.getConf());
} catch (IOException e) {
LOG.error("Couldn't instantiate the file system, not deleting file, just incase");
return false;
}
}
return true;
}
}

View File

@ -1,6 +1,4 @@
/*
* Copyright 2010 The Apache Software Foundation
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -17,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
package org.apache.hadoop.hbase.master.cleaner;
import java.io.IOException;
@ -33,9 +31,8 @@ import org.apache.commons.logging.LogFactory;
* be deleted. By default they are allowed to live for 10 minutes.
*/
@InterfaceAudience.Private
public class TimeToLiveLogCleaner implements LogCleanerDelegate {
public class TimeToLiveLogCleaner extends BaseLogCleanerDelegate {
static final Log LOG = LogFactory.getLog(TimeToLiveLogCleaner.class.getName());
private Configuration conf;
// Configured time a log can be kept after it was closed
private long ttl;
private boolean stopped = false;
@ -45,7 +42,7 @@ public class TimeToLiveLogCleaner implements LogCleanerDelegate {
long time = 0;
long currentTime = System.currentTimeMillis();
try {
FileStatus fStat = filePath.getFileSystem(conf).getFileStatus(filePath);
FileStatus fStat = filePath.getFileSystem(this.getConf()).getFileStatus(filePath);
time = fStat.getModificationTime();
} catch (IOException e) {
LOG.error("Unable to get modification time of file " + filePath.getName() +
@ -63,14 +60,10 @@ public class TimeToLiveLogCleaner implements LogCleanerDelegate {
@Override
public void setConf(Configuration conf) {
this.conf = conf;
super.setConf(conf);
this.ttl = conf.getLong("hbase.master.logcleaner.ttl", 600000);
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void stop(String why) {

View File

@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@ -938,16 +939,7 @@ public class HRegion implements HeapSize { // , Writable{
writestate.writesEnabled = false;
wasFlushing = writestate.flushing;
LOG.debug("Closing " + this + ": disabling compactions & flushes");
while (writestate.compacting > 0 || writestate.flushing) {
LOG.debug("waiting for " + writestate.compacting + " compactions" +
(writestate.flushing ? " & cache flush" : "") +
" to complete for region " + this);
try {
writestate.wait();
} catch (InterruptedException iex) {
// continue
}
}
waitForFlushesAndCompactions();
}
// If we were not just flushing, is it worth doing a preflush...one
// that will clear out of the bulk of the memstore before we put up
@ -1022,6 +1014,26 @@ public class HRegion implements HeapSize { // , Writable{
}
}
/**
* Wait for all current flushes and compactions of the region to complete.
* <p>
* Exposed for TESTING.
*/
public void waitForFlushesAndCompactions() {
synchronized (writestate) {
while (writestate.compacting > 0 || writestate.flushing) {
LOG.debug("waiting for " + writestate.compacting + " compactions"
+ (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
try {
writestate.wait();
} catch (InterruptedException iex) {
// essentially ignore and propagate the interrupt back up
Thread.currentThread().interrupt();
}
}
}
}
protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
final String threadNamePrefix) {
int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
@ -4170,8 +4182,13 @@ public class HRegion implements HeapSize { // , Writable{
LOG.debug("Files for new region");
listPaths(fs, dstRegion.getRegionDir());
}
deleteRegion(fs, a.getRegionDir());
deleteRegion(fs, b.getRegionDir());
// delete out the 'A' region
HFileArchiver.archiveRegion(fs, FSUtils.getRootDir(a.getBaseConf()), a.getTableDir(),
a.getRegionDir());
// delete out the 'B' region
HFileArchiver.archiveRegion(fs, FSUtils.getRootDir(b.getBaseConf()), b.getTableDir(),
b.getRegionDir());
LOG.info("merge completed. New region is " + dstRegion);

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@ -488,7 +489,7 @@ public class Store extends SchemaConfigured implements HeapSize {
/**
* @return All store files.
*/
List<StoreFile> getStorefiles() {
public List<StoreFile> getStorefiles() {
return this.storefiles;
}
@ -1609,10 +1610,12 @@ public class Store extends SchemaConfigured implements HeapSize {
// Tell observers that list of StoreFiles has changed.
notifyChangedReadersObservers();
// Finally, delete old store files.
for (StoreFile hsf: compactedFiles) {
hsf.deleteReader();
}
// let the archive util decide if we should archive or delete the files
LOG.debug("Removing store files after compaction...");
HFileArchiver.archiveStoreFiles(this.fs, this.region, this.conf, this.family.getName(),
compactedFiles);
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.error("Failed replacing compacted files in " + this +

View File

@ -863,13 +863,20 @@ public class StoreFile extends SchemaConfigured {
}
/**
* Write out a split reference.
*
* Package local so it doesnt leak out of regionserver.
*
* Validate the store file name.
* @param fileName name of the file to validate
* @return <tt>true</tt> if the file could be a valid store file, <tt>false</tt> otherwise
*/
public static boolean validateStoreFileName(String fileName) {
return !fileName.contains("-");
}
/**
* Write out a split reference. Package local so it doesnt leak out of
* regionserver.
* @param fs
* @param splitDir Presumes path format is actually
* <code>SOME_DIRECTORY/REGIONNAME/FAMILY</code>.
* <code>SOME_DIRECTORY/REGIONNAME/FAMILY</code>.
* @param f File to split.
* @param splitRow
* @param top True if we are referring to the top half of the hfile.

View File

@ -1766,6 +1766,11 @@ public class HLog implements Syncable {
return dir;
}
/**
* @param filename name of the file to validate
* @return <tt>true</tt> if the filename matches an HLog, <tt>false</tt>
* otherwise
*/
public static boolean validateHLogFilename(String filename) {
return pattern.matcher(filename).matches();
}

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.master.LogCleanerDelegate;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@ -42,18 +42,13 @@ import java.util.Set;
* replication before deleting it when its TTL is over.
*/
@InterfaceAudience.Private
public class ReplicationLogCleaner implements LogCleanerDelegate, Abortable {
public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abortable {
private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
private Configuration conf;
private ReplicationZookeeper zkHelper;
private Set<String> hlogs = new HashSet<String>();
private boolean stopped = false;
private boolean aborted;
/**
* Instantiates the cleaner, does nothing more.
*/
public ReplicationLogCleaner() {}
@Override
public boolean isLogDeletable(Path filePath) {
@ -69,7 +64,7 @@ public class ReplicationLogCleaner implements LogCleanerDelegate, Abortable {
// all members of this class are null if replication is disabled, and we
// return true since false would render the LogsCleaner useless
if (this.conf == null) {
if (this.getConf() == null) {
return true;
}
String log = filePath.getName();
@ -124,18 +119,18 @@ public class ReplicationLogCleaner implements LogCleanerDelegate, Abortable {
}
@Override
public void setConf(Configuration conf) {
public void setConf(Configuration config) {
// If replication is disabled, keep all members null
if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
if (!config.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
return;
}
// Make my own Configuration. Then I'll have my own connection to zk that
// I can close myself when comes time.
this.conf = new Configuration(conf);
Configuration conf = new Configuration(config);
super.setConf(conf);
try {
ZooKeeperWatcher zkw =
new ZooKeeperWatcher(this.conf, "replicationLogCleaner", null);
this.zkHelper = new ReplicationZookeeper(this, this.conf, zkw);
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
this.zkHelper = new ReplicationZookeeper(this, conf, zkw);
} catch (KeeperException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
} catch (IOException e) {
@ -144,10 +139,6 @@ public class ReplicationLogCleaner implements LogCleanerDelegate, Abortable {
refreshHLogsAndSearch(null);
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void stop(String why) {
@ -158,7 +149,7 @@ public class ReplicationLogCleaner implements LogCleanerDelegate, Abortable {
this.zkHelper.getZookeeperWatcher().close();
}
// Not sure why we're deleting a connection that we never acquired or used
HConnectionManager.deleteConnection(this.conf, true);
HConnectionManager.deleteConnection(this.getConf(), true);
}
@Override

View File

@ -656,6 +656,10 @@ public abstract class FSUtils {
return p.makeQualified(fs);
}
public static void setRootDir(final Configuration c, final Path root) throws IOException {
c.set(HConstants.HBASE_DIR, root.toString());
}
/**
* Checks if root region exists
*
@ -1138,4 +1142,36 @@ public abstract class FSUtils {
public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
return fs.exists(path);
}
/**
* Log the current state of the filesystem from a certain root directory
* @param fs filesystem to investigate
* @param root root file/directory to start logging from
* @param LOG log to output information
* @throws IOException if an unexpected exception occurs
*/
public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
throws IOException {
LOG.debug("Current file system:");
logFSTree(LOG, fs, root, "|-");
}
/**
* Recursive helper to log the state of the FS
* @see #logFileSystemState(FileSystem, Path, Log)
*/
private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
throws IOException {
FileStatus[] files = FSUtils.listStatus(fs, root, null);
if (files == null) return;
for (FileStatus file : files) {
if (file.isDir()) {
LOG.debug(prefix + file.getPath().getName() + "/");
logFSTree(LOG, fs, file.getPath(), prefix + "---");
} else {
LOG.debug(prefix + file.getPath().getName());
}
}
}
}

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
/**
* Helper class for all utilities related to archival/retrieval of HFiles
*/
public class HFileArchiveUtil {
static final String DEFAULT_HFILE_ARCHIVE_DIRECTORY = ".archive";
private HFileArchiveUtil() {
// non-external instantiation - util class
}
/**
* Get the directory to archive a store directory
* @param conf {@link Configuration} to read for the archive directory name
* @param region parent region information under which the store currently
* lives
* @param family name of the family in the store
* @return {@link Path} to the directory to archive the given store or
* <tt>null</tt> if it should not be archived
*/
public static Path getStoreArchivePath(Configuration conf, HRegion region, byte [] family){
return getStoreArchivePath(conf, region.getRegionInfo(), region.getTableDir(), family);
}
/**
* Get the directory to archive a store directory
* @param conf {@link Configuration} to read for the archive directory name. Can be null.
* @param region parent region information under which the store currently lives
* @param tabledir directory for the table under which the store currently lives
* @param family name of the family in the store
* @return {@link Path} to the directory to archive the given store or <tt>null</tt> if it should
* not be archived
*/
public static Path getStoreArchivePath(Configuration conf, HRegionInfo region, Path tabledir,
byte[] family) {
Path tableArchiveDir = getTableArchivePath(conf, tabledir);
return Store.getStoreHomedir(tableArchiveDir,
HRegionInfo.encodeRegionName(region.getRegionName()), family);
}
/**
* Get the archive directory for a given region under the specified table
* @param conf {@link Configuration} to read the archive directory from. Can be null
* @param tabledir the original table directory. Cannot be null.
* @param regiondir the path to the region directory. Cannot be null.
* @return {@link Path} to the directory to archive the given region, or <tt>null</tt> if it
* should not be archived
*/
public static Path getRegionArchiveDir(Configuration conf, Path tabledir, Path regiondir) {
// get the archive directory for a table
Path archiveDir = getTableArchivePath(conf, tabledir);
// then add on the region path under the archive
String encodedRegionName = regiondir.getName();
return HRegion.getRegionDir(archiveDir, encodedRegionName);
}
/**
* Get the path to the table archive directory based on the configured archive directory.
* <p>
* Assumed that the table should already be archived.
* @param conf {@link Configuration} to read the archive directory property. Can be null
* @param tabledir directory of the table to be archived. Cannot be null.
* @return {@link Path} to the archive directory for the table
*/
public static Path getTableArchivePath(Configuration conf, Path tabledir) {
String archiveName = getConfiguredArchiveDirName(conf);
Path root = tabledir.getParent();
// now build the archive directory path
// first the top-level archive directory
// generally "/hbase/.archive/[table]
return archiveName.length() == 0 ? new Path(root, tabledir) : new Path(new Path(root,
archiveName), tabledir.getName());
}
/**
* Get the archive directory as per the configuration
* @param conf {@link Configuration} to read the archive directory from (can be null, in which
* case you get the default value). Can be null.
* @return the configured archived directory or the default specified by
* {@value HFileArchiveUtil#DEFAULT_HFILE_ARCHIVE_DIRECTORY}
*/
public static String getConfiguredArchiveDirName(Configuration conf) {
return conf == null ? HFileArchiveUtil.DEFAULT_HFILE_ARCHIVE_DIRECTORY : conf.get(
HConstants.HFILE_ARCHIVE_DIRECTORY, HFileArchiveUtil.DEFAULT_HFILE_ARCHIVE_DIRECTORY);
}
/**
* Get the full path to the archive directory on the configured {@link FileSystem}
* @param conf to look for archive directory name and root directory. Cannot be null. Notes for
* testing: requires a FileSystem root directory to be specified.
* @return the full {@link Path} to the archive directory, as defined by the configuration
* @throws IOException if an unexpected error occurs
*/
public static Path getArchivePath(Configuration conf) throws IOException {
return new Path(FSUtils.getRootDir(conf), getConfiguredArchiveDirName(conf));
}
}

View File

@ -297,7 +297,7 @@
</property>
<property>
<name>hbase.master.logcleaner.plugins</name>
<value>org.apache.hadoop.hbase.master.TimeToLiveLogCleaner</value>
<value>org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner</value>
<description>A comma-separated list of LogCleanerDelegate invoked by
the LogsCleaner service. These WAL/HLog cleaners are called in order,
so put the HLog cleaner that prunes the most HLog files in front. To
@ -859,7 +859,6 @@
files when hbase.data.umask.enable is true
</description>
</property>
<property>
<name>hbase.metrics.showTableName</name>
<value>true</value>
@ -869,7 +868,6 @@
In both cases, the aggregated metric M across tables and cfs will be reported.
</description>
</property>
<property>
<name>hbase.metrics.exposeOperationTimes</name>
<value>true</value>
@ -878,5 +876,23 @@
have their times exposed through Hadoop metrics per CF and per region.
</description>
</property>
<property>
<name>hbase.table.archive.directory</name>
<value>.archive</value>
<description>Per-table directory name under which to backup files for a
table. Files are moved to the same directories as they would be under the
table directory, but instead are just one level lower (under
table/.archive/... rather than table/...). Currently only applies to HFiles.</description>
</property>
<property>
<name>hbase.master.hfilecleaner.plugins</name>
<value>org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner</value>
<description>A comma-separated list of HFileCleanerDelegate invoked by
the HFileCleaner service. These HFiles cleaners are called in order,
so put the cleaner that prunes the most files in front. To
implement your own HFileCleanerDelegate, just put it in HBase's classpath
and add the fully qualified class name here. Always add the above
default log cleaners in the list as they will be overwritten in hbase-site.xml.
</description>
</property>
</configuration>

View File

@ -950,7 +950,11 @@ public class HBaseTestingUtility {
* @param tableName existing table
*/
public void deleteTable(byte[] tableName) throws IOException {
getHBaseAdmin().disableTable(tableName);
try {
getHBaseAdmin().disableTable(tableName);
} catch (TableNotEnabledException e) {
LOG.debug("Table: " + Bytes.toString(tableName) + " already disabled, so just deleting it.");
}
getHBaseAdmin().deleteTable(tableName);
}

View File

@ -0,0 +1,293 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.backup;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test that the {@link HFileArchiver} correctly removes all the parts of a region when cleaning up
* a region
*/
@Category(MediumTests.class)
public class TestHFileArchiving {
private static final String STRING_TABLE_NAME = "test_table";
private static final Log LOG = LogFactory.getLog(TestHFileArchiving.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
private static final byte[] TEST_FAM = Bytes.toBytes("fam");
/**
* Setup the config for the cluster
*/
@BeforeClass
public static void setupCluster() throws Exception {
setupConf(UTIL.getConfiguration());
UTIL.startMiniCluster();
}
private static void setupConf(Configuration conf) {
// disable the ui
conf.setInt("hbase.regionsever.info.port", -1);
// drop the memstore size so we get flushes
conf.setInt("hbase.hregion.memstore.flush.size", 25000);
// disable major compactions
conf.setInt(HConstants.MAJOR_COMPACTION_PERIOD, 0);
}
@Before
public void setup() throws Exception {
UTIL.createTable(TABLE_NAME, TEST_FAM);
}
@After
public void tearDown() throws Exception {
// cleanup the cluster if its up still
if (UTIL.getHBaseAdmin().tableExists(STRING_TABLE_NAME)) {
UTIL.deleteTable(TABLE_NAME);
}
// and cleanup the archive directory
try {
clearArchiveDirectory();
} catch (IOException e) {
Assert.fail("Failure to delete archive directory:" + e.getMessage());
}
}
@AfterClass
public static void cleanupTest() throws Exception {
try {
UTIL.shutdownMiniCluster();
} catch (Exception e) {
// NOOP;
}
}
@Test
public void testRemovesRegionDirOnArchive() throws Exception {
final HBaseAdmin admin = UTIL.getHBaseAdmin();
// get the current store files for the region
List<HRegion> servingRegions = UTIL.getHBaseCluster().getRegions(TABLE_NAME);
// make sure we only have 1 region serving this table
assertEquals(1, servingRegions.size());
HRegion region = servingRegions.get(0);
// and load the table
UTIL.loadRegion(region, TEST_FAM);
// shutdown the table so we can manipulate the files
admin.disableTable(STRING_TABLE_NAME);
FileSystem fs = UTIL.getTestFileSystem();
// now attempt to depose the region
Path regionDir = HRegion.getRegionDir(region.getTableDir().getParent(), region.getRegionInfo());
HFileArchiver.archiveRegion(fs, region.getRegionInfo());
// check for the existence of the archive directory and some files in it
Path archiveDir = HFileArchiveTestingUtil.getRegionArchiveDir(UTIL.getConfiguration(), region);
assertTrue(fs.exists(archiveDir));
// check to make sure the store directory was copied
FileStatus[] stores = fs.listStatus(archiveDir);
assertTrue(stores.length == 1);
// make sure we archived the store files
FileStatus[] storeFiles = fs.listStatus(stores[0].getPath());
assertTrue(storeFiles.length > 0);
// then ensure the region's directory isn't present
assertFalse(fs.exists(regionDir));
}
/**
* Test that the region directory is removed when we archive a region without store files, but
* still has hidden files.
* @throws Exception
*/
@Test
public void testDeleteRegionWithNoStoreFiles() throws Exception {
// get the current store files for the region
List<HRegion> servingRegions = UTIL.getHBaseCluster().getRegions(TABLE_NAME);
// make sure we only have 1 region serving this table
assertEquals(1, servingRegions.size());
HRegion region = servingRegions.get(0);
FileSystem fs = region.getFilesystem();
// make sure there are some files in the regiondir
Path rootDir = FSUtils.getRootDir(fs.getConf());
Path regionDir = HRegion.getRegionDir(rootDir, region.getRegionInfo());
FileStatus[] regionFiles = FSUtils.listStatus(fs, regionDir, null);
Assert.assertNotNull("No files in the region directory", regionFiles);
if (LOG.isDebugEnabled()) {
List<Path> files = new ArrayList<Path>();
for (FileStatus file : regionFiles) {
files.add(file.getPath());
}
LOG.debug("Current files:" + files);
}
// delete the visible folders so we just have hidden files/folders
final PathFilter dirFilter = new FSUtils.DirFilter(fs);
PathFilter nonHidden = new PathFilter() {
@Override
public boolean accept(Path file) {
return dirFilter.accept(file) && !file.getName().toString().startsWith(".");
}
};
FileStatus[] storeDirs = FSUtils.listStatus(fs, regionDir, nonHidden);
for (FileStatus store : storeDirs) {
LOG.debug("Deleting store for test");
fs.delete(store.getPath(), true);
}
// then archive the region
HFileArchiver.archiveRegion(fs, region.getRegionInfo());
// and check to make sure the region directoy got deleted
assertFalse("Region directory (" + regionDir + "), still exists.", fs.exists(regionDir));
}
@Test
public void testArchiveOnTableDelete() throws Exception {
List<HRegion> servingRegions = UTIL.getHBaseCluster().getRegions(TABLE_NAME);
// make sure we only have 1 region serving this table
assertEquals(1, servingRegions.size());
HRegion region = servingRegions.get(0);
// get the parent RS and monitor
HRegionServer hrs = UTIL.getRSForFirstRegionInTable(TABLE_NAME);
FileSystem fs = hrs.getFileSystem();
// put some data on the region
LOG.debug("-------Loading table");
UTIL.loadRegion(region, TEST_FAM);
// get the hfiles in the region
List<HRegion> regions = hrs.getOnlineRegions(TABLE_NAME);
assertEquals("More that 1 region for test table.", 1, regions.size());
region = regions.get(0);
// wait for all the compactions to complete
region.waitForFlushesAndCompactions();
// disable table to prevent new updates
UTIL.getHBaseAdmin().disableTable(TABLE_NAME);
LOG.debug("Disabled table");
// remove all the files from the archive to get a fair comparison
clearArchiveDirectory();
// then get the current store files
Path regionDir = region.getRegionDir();
List<String> storeFiles = getAllFileNames(fs, regionDir);
// remove all the non-storefile named files for the region
for (int i = 0; i < storeFiles.size(); i++) {
String file = storeFiles.get(i);
if (file.contains(HRegion.REGIONINFO_FILE) || file.contains("hlog")) {
storeFiles.remove(i--);
}
}
storeFiles.remove(HRegion.REGIONINFO_FILE);
// then delete the table so the hfiles get archived
UTIL.deleteTable(TABLE_NAME);
// then get the files in the archive directory.
Path archiveDir = HFileArchiveUtil.getArchivePath(UTIL.getConfiguration());
List<String> archivedFiles = getAllFileNames(fs, archiveDir);
Collections.sort(storeFiles);
Collections.sort(archivedFiles);
LOG.debug("Store files:");
for (int i = 0; i < storeFiles.size(); i++) {
LOG.debug(i + " - " + storeFiles.get(i));
}
LOG.debug("Archive files:");
for (int i = 0; i < archivedFiles.size(); i++) {
LOG.debug(i + " - " + archivedFiles.get(i));
}
assertTrue("Archived files are missing some of the store files!",
archivedFiles.containsAll(storeFiles));
}
private void clearArchiveDirectory() throws IOException {
UTIL.getTestFileSystem().delete(new Path(UTIL.getDefaultRootDirPath(), ".archive"), true);
}
/**
* Get the names of all the files below the given directory
* @param fs
* @param archiveDir
* @return
* @throws IOException
*/
private List<String> getAllFileNames(final FileSystem fs, Path archiveDir) throws IOException {
FileStatus[] files = FSUtils.listStatus(fs, archiveDir, null);
return recurseOnFiles(fs, files, new ArrayList<String>());
}
/** Recursively lookup all the file names under the file[] array **/
private List<String> recurseOnFiles(FileSystem fs, FileStatus[] files, List<String> fileNames)
throws IOException {
if (files == null || files.length == 0) return fileNames;
for (FileStatus file : files) {
if (file.isDir()) {
recurseOnFiles(fs, FSUtils.listStatus(fs, file.getPath(), null), fileNames);
} else fileNames.add(file.getPath().getName());
}
return fileNames;
}
}

View File

@ -0,0 +1,365 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.backup.example;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.regionserver.CheckedArchivingHFileCleaner;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Spin up a small cluster and check that the hfiles of region are properly long-term archived as
* specified via the {@link ZKTableArchiveClient}.
*/
@Category(MediumTests.class)
public class TestZooKeeperTableArchiveClient {
private static final Log LOG = LogFactory.getLog(TestZooKeeperTableArchiveClient.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final String STRING_TABLE_NAME = "test";
private static final byte[] TEST_FAM = Bytes.toBytes("fam");
private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
private static final int numRS = 2;
private static final int maxTries = 5;
private static final long ttl = 1000;
private static ZKTableArchiveClient archivingClient;
/**
* Setup the config for the cluster
*/
@BeforeClass
public static void setupCluster() throws Exception {
setupConf(UTIL.getConfiguration());
UTIL.startMiniCluster(numRS);
archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), UTIL.getHBaseAdmin()
.getConnection());
}
private static void setupConf(Configuration conf) {
// disable the ui
conf.setInt("hbase.regionsever.info.port", -1);
// change the flush size to a small amount, regulating number of store files
conf.setInt("hbase.hregion.memstore.flush.size", 25000);
// so make sure we get a compaction when doing a load, but keep around some
// files in the store
conf.setInt("hbase.hstore.compaction.min", 10);
conf.setInt("hbase.hstore.compactionThreshold", 10);
// block writes if we get to 12 store files
conf.setInt("hbase.hstore.blockingStoreFiles", 12);
// drop the number of attempts for the hbase admin
conf.setInt("hbase.client.retries.number", 1);
// set the ttl on the hfiles
conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl);
conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
CheckedArchivingHFileCleaner.class.getCanonicalName(),
TimeToLiveHFileCleaner.class.getCanonicalName(),
LongTermArchivingHFileCleaner.class.getCanonicalName());
}
@Before
public void setup() throws Exception {
UTIL.createTable(TABLE_NAME, TEST_FAM);
}
@After
public void tearDown() throws Exception {
UTIL.deleteTable(TABLE_NAME);
// and cleanup the archive directory
try {
UTIL.getTestFileSystem().delete(new Path(UTIL.getDefaultRootDirPath(), ".archive"), true);
} catch (IOException e) {
LOG.warn("Failure to delete archive directory", e);
}
// make sure that backups are off for all tables
archivingClient.disableHFileBackup();
}
@AfterClass
public static void cleanupTest() throws Exception {
try {
UTIL.shutdownMiniCluster();
} catch (Exception e) {
LOG.warn("problem shutting down cluster", e);
}
}
/**
* Test turning on/off archiving
*/
@Test
public void testArchivingEnableDisable() throws Exception {
// 1. turn on hfile backups
LOG.debug("----Starting archiving");
archivingClient.enableHFileBackupAsync(TABLE_NAME);
assertTrue("Archving didn't get turned on", archivingClient
.getArchivingEnabled(TABLE_NAME));
// 2. Turn off archiving and make sure its off
archivingClient.disableHFileBackup();
assertFalse("Archving didn't get turned off.", archivingClient.getArchivingEnabled(TABLE_NAME));
// 3. Check enable/disable on a single table
archivingClient.enableHFileBackupAsync(TABLE_NAME);
assertTrue("Archving didn't get turned on", archivingClient
.getArchivingEnabled(TABLE_NAME));
// 4. Turn off archiving and make sure its off
archivingClient.disableHFileBackup(TABLE_NAME);
assertFalse("Archving didn't get turned off for " + STRING_TABLE_NAME,
archivingClient.getArchivingEnabled(TABLE_NAME));
}
@Test
public void testArchivingOnSingleTable() throws Exception {
// turn on hfile retention
LOG.debug("----Starting archiving");
archivingClient.enableHFileBackupAsync(TABLE_NAME);
assertTrue("Archving didn't get turned on", archivingClient
.getArchivingEnabled(TABLE_NAME));
// get the RS and region serving our table
List<HRegion> servingRegions = UTIL.getHBaseCluster().getRegions(TABLE_NAME);
// make sure we only have 1 region serving this table
assertEquals(1, servingRegions.size());
HRegion region = servingRegions.get(0);
// get the parent RS and monitor
HRegionServer hrs = UTIL.getRSForFirstRegionInTable(TABLE_NAME);
FileSystem fs = hrs.getFileSystem();
// put some data on the region
LOG.debug("-------Loading table");
UTIL.loadRegion(region, TEST_FAM);
loadAndCompact(region);
// check that we actually have some store files that were archived
Store store = region.getStore(TEST_FAM);
Path storeArchiveDir = HFileArchiveTestingUtil.getStoreArchivePath(UTIL.getConfiguration(),
region, store);
// check to make sure we archived some files
assertTrue("Didn't create a store archive directory", fs.exists(storeArchiveDir));
assertTrue("No files in the store archive",
FSUtils.listStatus(fs, storeArchiveDir, null).length > 0);
// and then put some non-tables files in the archive
Configuration conf = UTIL.getConfiguration();
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
// write a tmp file to the archive dir
Path tmpFile = new Path(archiveDir, "toDelete");
FSDataOutputStream out = fs.create(tmpFile);
out.write(1);
out.close();
assertTrue(fs.exists(tmpFile));
// make sure we wait long enough for the file to expire
Thread.sleep(ttl);
// print currrent state for comparison
FSUtils.logFileSystemState(fs, archiveDir, LOG);
// ensure there are no archived files after waiting for a timeout
ensureHFileCleanersRun();
// check to make sure the right things get deleted
assertTrue("Store archive got deleted", fs.exists(storeArchiveDir));
assertTrue("Archived HFiles got deleted",
FSUtils.listStatus(fs, storeArchiveDir, null).length > 0);
assertFalse(
"Tmp file (non-table archive file) didn't " + "get deleted, archive dir: "
+ fs.listStatus(archiveDir), fs.exists(tmpFile));
LOG.debug("Turning off hfile backup.");
// stop archiving the table
archivingClient.disableHFileBackup();
LOG.debug("Deleting table from archive.");
// now remove the archived table
Path primaryTable = new Path(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()),
STRING_TABLE_NAME);
fs.delete(primaryTable, true);
LOG.debug("Deleted primary table, waiting for file cleaners to run");
// and make sure the archive directory is retained after a cleanup
// have to do this manually since delegates aren't run if there isn't any files in the archive
// dir to cleanup
Thread.sleep(ttl);
UTIL.getHBaseCluster().getMaster().getHFileCleaner().triggerNow();
Thread.sleep(ttl);
LOG.debug("File cleaners done, checking results.");
// but we still have the archive directory
assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration())));
}
/**
* Make sure all the {@link HFileCleaner} run.
* <p>
* Blocking operation up to 3x ttl
* @throws InterruptedException
*/
private void ensureHFileCleanersRun() throws InterruptedException {
CheckedArchivingHFileCleaner.resetCheck();
do {
UTIL.getHBaseCluster().getMaster().getHFileCleaner().triggerNow();
LOG.debug("Triggered, sleeping an amount until we can pass the check.");
Thread.sleep(ttl);
} while (!CheckedArchivingHFileCleaner.getChecked());
}
/**
* Test archiving/cleaning across multiple tables, where some are retained, and others aren't
* @throws Exception
*/
@Test
public void testMultipleTables() throws Exception {
archivingClient.enableHFileBackupAsync(TABLE_NAME);
assertTrue("Archving didn't get turned on", archivingClient
.getArchivingEnabled(TABLE_NAME));
// create the another table that we don't archive
String otherTable = "otherTable";
UTIL.createTable(Bytes.toBytes(otherTable), TEST_FAM);
// get the parent RS and monitor
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
// put data in the filesystem of the first table
loadAndCompact(STRING_TABLE_NAME);
// and some data in the other table
loadAndCompact(otherTable);
// make sure we wait long enough for the other tables files to expire
Thread.sleep(ttl);
ensureHFileCleanersRun();
// check to make sure the right things get deleted
Path primaryStoreArchive = HFileArchiveTestingUtil.getStoreArchivePath(UTIL, STRING_TABLE_NAME,
TEST_FAM);
Path otherStoreArchive = HFileArchiveTestingUtil
.getStoreArchivePath(UTIL, otherTable, TEST_FAM);
// make sure the primary store doesn't have any files
assertTrue("Store archive got deleted", fs.exists(primaryStoreArchive));
assertTrue("Archived HFiles got deleted",
FSUtils.listStatus(fs, primaryStoreArchive, null).length > 0);
assertNull("Archived HFiles should have gotten deleted, but didn't",
FSUtils.listStatus(fs, otherStoreArchive, null));
// sleep again to make sure we the other table gets cleaned up
Thread.sleep(ttl);
ensureHFileCleanersRun();
// first pass removes the store archive
assertFalse(fs.exists(otherStoreArchive));
// second pass removes the region
Thread.sleep(ttl);
ensureHFileCleanersRun();
Path parent = otherStoreArchive.getParent();
assertFalse(fs.exists(parent));
// thrid pass remove the table
Thread.sleep(ttl);
ensureHFileCleanersRun();
parent = otherStoreArchive.getParent();
assertFalse(fs.exists(parent));
// but we still have the archive directory
assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration())));
FSUtils.logFileSystemState(fs, HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()), LOG);
UTIL.deleteTable(Bytes.toBytes(otherTable));
}
private void loadAndCompact(String tableName) throws Exception {
byte[] table = Bytes.toBytes(tableName);
// get the RS and region serving our table
List<HRegion> servingRegions = UTIL.getHBaseCluster().getRegions(table);
// make sure we only have 1 region serving this table
assertEquals(1, servingRegions.size());
HRegion region = servingRegions.get(0);
// get the parent RS and monitor
HRegionServer hrs = UTIL.getRSForFirstRegionInTable(table);
FileSystem fs = hrs.getFileSystem();
// put some data on the region
LOG.debug("-------Loading table");
UTIL.loadRegion(region, TEST_FAM);
loadAndCompact(region);
// check that we actually have some store files that were archived
Store store = region.getStore(TEST_FAM);
Path storeArchiveDir = HFileArchiveTestingUtil.getStoreArchivePath(UTIL.getConfiguration(),
region, store);
// check to make sure we archived some files
assertTrue("Didn't create a store archive directory", fs.exists(storeArchiveDir));
assertTrue("No files in the store archive",
FSUtils.listStatus(fs, storeArchiveDir, null).length > 0);
}
/**
* Load the given region and then ensure that it compacts some files
*/
private void loadAndCompact(HRegion region) throws Exception {
int tries = 0;
Exception last = null;
while (tries++ <= maxTries) {
try {
// load the region with data
UTIL.loadRegion(region, TEST_FAM);
// and then trigger a compaction to be sure we try to archive
compactRegion(region, TEST_FAM);
return;
} catch (Exception e) {
// keep this around for if we fail later
last = e;
}
}
throw last;
}
/**
* Compact all the store files in a given region.
*/
private void compactRegion(HRegion region, byte[] family) throws IOException {
Store store = region.getStores().get(TEST_FAM);
store.compactRecentForTesting(store.getStorefiles().size());
}
}

View File

@ -19,11 +19,12 @@
*/
package org.apache.hadoop.hbase.master;
import static org.apache.hadoop.hbase.util.HFileArchiveTestingUtil.assertArchiveEqualToOriginal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
@ -36,6 +37,8 @@ import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -64,9 +67,12 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
@ -560,6 +566,161 @@ public class TestCatalogJanitor {
janitor.join();
}
@Test
public void testArchiveOldRegion() throws Exception {
String table = "table";
HBaseTestingUtility htu = new HBaseTestingUtility();
setRootDirAndCleanIt(htu, "testCleanParent");
Server server = new MockServer(htu);
MasterServices services = new MockMasterServices(server);
// create the janitor
CatalogJanitor janitor = new CatalogJanitor(server, services);
// Create regions.
HTableDescriptor htd = new HTableDescriptor(table);
htd.addFamily(new HColumnDescriptor("f"));
HRegionInfo parent = new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"), Bytes.toBytes("eee"));
HRegionInfo splita = new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"), Bytes.toBytes("ccc"));
HRegionInfo splitb = new HRegionInfo(htd.getName(), Bytes.toBytes("ccc"), Bytes.toBytes("eee"));
// Test that when both daughter regions are in place, that we do not
// remove the parent.
List<KeyValue> kvs = new ArrayList<KeyValue>();
kvs.add(new KeyValue(parent.getRegionName(), HConstants.CATALOG_FAMILY,
HConstants.SPLITA_QUALIFIER, Writables.getBytes(splita)));
kvs.add(new KeyValue(parent.getRegionName(), HConstants.CATALOG_FAMILY,
HConstants.SPLITB_QUALIFIER, Writables.getBytes(splitb)));
Result r = new Result(kvs);
FileSystem fs = FileSystem.get(htu.getConfiguration());
Path rootdir = services.getMasterFileSystem().getRootDir();
// have to set the root directory since we use it in HFileDisposer to figure out to get to the
// archive directory. Otherwise, it just seems to pick the first root directory it can find (so
// the single test passes, but when the full suite is run, things get borked).
FSUtils.setRootDir(fs.getConf(), rootdir);
Path tabledir = HTableDescriptor.getTableDir(rootdir, htd.getName());
Path storedir = Store.getStoreHomedir(tabledir, parent.getEncodedName(),
htd.getColumnFamilies()[0].getName());
// delete the file and ensure that the files have been archived
Path storeArchive = HFileArchiveUtil.getStoreArchivePath(services.getConfiguration(), parent,
tabledir, htd.getColumnFamilies()[0].getName());
// enable archiving, make sure that files get archived
addMockStoreFiles(2, services, storedir);
// get the current store files for comparison
FileStatus[] storeFiles = fs.listStatus(storedir);
for (FileStatus file : storeFiles) {
System.out.println("Have store file:" + file.getPath());
}
// do the cleaning of the parent
assertTrue(janitor.cleanParent(parent, r));
// and now check to make sure that the files have actually been archived
FileStatus[] archivedStoreFiles = fs.listStatus(storeArchive);
assertArchiveEqualToOriginal(storeFiles, archivedStoreFiles, fs);
// cleanup
services.stop("Test finished");
server.stop("shutdown");
janitor.join();
}
/**
* Test that if a store file with the same name is present as those already backed up cause the
* already archived files to be timestamped backup
*/
@Test
public void testDuplicateHFileResolution() throws Exception {
String table = "table";
HBaseTestingUtility htu = new HBaseTestingUtility();
setRootDirAndCleanIt(htu, "testCleanParent");
Server server = new MockServer(htu);
MasterServices services = new MockMasterServices(server);
// create the janitor
CatalogJanitor janitor = new CatalogJanitor(server, services);
// Create regions.
HTableDescriptor htd = new HTableDescriptor(table);
htd.addFamily(new HColumnDescriptor("f"));
HRegionInfo parent = new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"), Bytes.toBytes("eee"));
HRegionInfo splita = new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"), Bytes.toBytes("ccc"));
HRegionInfo splitb = new HRegionInfo(htd.getName(), Bytes.toBytes("ccc"), Bytes.toBytes("eee"));
// Test that when both daughter regions are in place, that we do not
// remove the parent.
List<KeyValue> kvs = new ArrayList<KeyValue>();
kvs.add(new KeyValue(parent.getRegionName(), HConstants.CATALOG_FAMILY,
HConstants.SPLITA_QUALIFIER, Writables.getBytes(splita)));
kvs.add(new KeyValue(parent.getRegionName(), HConstants.CATALOG_FAMILY,
HConstants.SPLITB_QUALIFIER, Writables.getBytes(splitb)));
Result r = new Result(kvs);
FileSystem fs = FileSystem.get(htu.getConfiguration());
Path rootdir = services.getMasterFileSystem().getRootDir();
// have to set the root directory since we use it in HFileDisposer to figure out to get to the
// archive directory. Otherwise, it just seems to pick the first root directory it can find (so
// the single test passes, but when the full suite is run, things get borked).
FSUtils.setRootDir(fs.getConf(), rootdir);
Path tabledir = HTableDescriptor.getTableDir(rootdir, parent.getTableName());
Path storedir = Store.getStoreHomedir(tabledir, parent.getEncodedName(),
htd.getColumnFamilies()[0].getName());
System.out.println("Old root:" + rootdir);
System.out.println("Old table:" + tabledir);
System.out.println("Old store:" + storedir);
Path storeArchive = HFileArchiveUtil.getStoreArchivePath(services.getConfiguration(), parent,
tabledir, htd.getColumnFamilies()[0].getName());
System.out.println("Old archive:" + storeArchive);
// enable archiving, make sure that files get archived
addMockStoreFiles(2, services, storedir);
// get the current store files for comparison
FileStatus[] storeFiles = fs.listStatus(storedir);
// do the cleaning of the parent
assertTrue(janitor.cleanParent(parent, r));
// and now check to make sure that the files have actually been archived
FileStatus[] archivedStoreFiles = fs.listStatus(storeArchive);
assertArchiveEqualToOriginal(storeFiles, archivedStoreFiles, fs);
// now add store files with the same names as before to check backup
// enable archiving, make sure that files get archived
addMockStoreFiles(2, services, storedir);
// do the cleaning of the parent
assertTrue(janitor.cleanParent(parent, r));
// and now check to make sure that the files have actually been archived
archivedStoreFiles = fs.listStatus(storeArchive);
assertArchiveEqualToOriginal(storeFiles, archivedStoreFiles, fs, true);
// cleanup
services.stop("Test finished");
server.stop("shutdown");
janitor.join();
}
private void addMockStoreFiles(int count, MasterServices services, Path storedir)
throws IOException {
// get the existing store files
FileSystem fs = services.getMasterFileSystem().getFileSystem();
fs.mkdirs(storedir);
// create the store files in the parent
for (int i = 0; i < count; i++) {
Path storeFile = new Path(storedir, "_store" + i);
FSDataOutputStream dos = fs.create(storeFile, true);
dos.writeBytes("Some data: " + i);
dos.close();
}
// make sure the mock store files are there
FileStatus[] storeFiles = fs.listStatus(storedir);
assertEquals(count, storeFiles.length);
}
private Result makeResultFromHRegionInfo(HRegionInfo region, HRegionInfo splita,
HRegionInfo splitb) throws IOException {
List<KeyValue> kvs = new ArrayList<KeyValue>();

View File

@ -0,0 +1,145 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestHFileCleaner {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@Test
public void testHFileCleaning() throws Exception{
String prefix = "someHFileThatWouldBeAUUID";
Configuration conf = TEST_UTIL.getConfiguration();
// set TTL
long ttl = 2000;
conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl);
Server server = new DummyServer();
Path archivedHfileDir = new Path(TEST_UTIL.getDataTestDir(),
HFileArchiveUtil.getConfiguredArchiveDirName(conf));
FileSystem fs = FileSystem.get(conf);
HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir);
// Create 2 invalid files, 1 "recent" file, 1 very new file and 30 old files
long now = System.currentTimeMillis();
fs.delete(archivedHfileDir, true);
fs.mkdirs(archivedHfileDir);
// Case 1: 1 invalid file, which would be deleted directly
fs.createNewFile(new Path(archivedHfileDir, "dfd-dfd"));
// Case 2: 1 "recent" file, not even deletable for the first log cleaner
// (TimeToLiveLogCleaner), so we are not going down the chain
System.out.println("Now is: " + now);
for (int i = 1; i < 32; i++) {
// Case 3: old files which would be deletable for the first log cleaner
// (TimeToLiveHFileCleaner),
Path fileName = new Path(archivedHfileDir, (prefix + "." + (now - i)));
fs.createNewFile(fileName);
}
// sleep for sometime to get newer modifcation time
Thread.sleep(ttl);
// Case 2: 1 newer file, not even deletable for the first log cleaner
// (TimeToLiveLogCleaner), so we are not going down the chain
fs.createNewFile(new Path(archivedHfileDir, prefix + "." + (now + 10000)));
for (FileStatus stat : fs.listStatus(archivedHfileDir)) {
System.out.println(stat.getPath().toString());
}
assertEquals(33, fs.listStatus(archivedHfileDir).length);
cleaner.chore();
// We end up a small number - just the one newer one
assertEquals(1, fs.listStatus(archivedHfileDir).length);
for (FileStatus file : fs.listStatus(archivedHfileDir)) {
System.out.println("Kept log files: " + file.getPath().getName());
}
cleaner.interrupt();
}
static class DummyServer implements Server {
@Override
public Configuration getConfiguration() {
return TEST_UTIL.getConfiguration();
}
@Override
public ZooKeeperWatcher getZooKeeper() {
try {
return new ZooKeeperWatcher(getConfiguration(), "dummy server", this);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
@Override
public CatalogTracker getCatalogTracker() {
return null;
}
@Override
public ServerName getServerName() {
return new ServerName("regionserver,60020,000000");
}
@Override
public void abort(String why, Throwable e) {}
@Override
public boolean isAborted() {
return false;
}
@Override
public void stop(String why) {}
@Override
public boolean isStopped() {
return false;
}
}
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
}

View File

@ -1,6 +1,4 @@
/**
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -17,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
package org.apache.hadoop.hbase.master.cleaner;
import static org.junit.Assert.assertEquals;
@ -31,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
/**
* HFile archive cleaner that just tells you if it has been run already or not (and allows resets) -
* always attempts to delete the passed file.
* <p>
* Just a helper class for testing to make sure the cleaner has been run.
*/
public class CheckedArchivingHFileCleaner extends BaseHFileCleanerDelegate {
private static boolean checked;
@Override
public boolean isFileDeleteable(Path file) {
checked = true;
return true;
}
public static boolean getChecked() {
return checked;
}
public static void resetCheck() {
checked = false;
}
}

View File

@ -0,0 +1,239 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
/**
* Test helper for testing archiving of HFiles
*/
public class HFileArchiveTestingUtil {
private static final Log LOG = LogFactory.getLog(HFileArchiveTestingUtil.class);
private HFileArchiveTestingUtil() {
// NOOP private ctor since this is just a utility class
}
public static boolean compareArchiveToOriginal(FileStatus[] previous, FileStatus[] archived,
FileSystem fs, boolean hasTimedBackup) {
List<List<String>> lists = getFileLists(previous, archived);
List<String> original = lists.get(0);
Collections.sort(original);
List<String> currentFiles = lists.get(1);
Collections.sort(currentFiles);
List<String> backedup = lists.get(2);
Collections.sort(backedup);
// check the backed up files versus the current (should match up, less the
// backup time in the name)
if (!hasTimedBackup == (backedup.size() > 0)) {
LOG.debug("backedup files doesn't match expected.");
return false;
}
String msg = null;
if (hasTimedBackup) {
msg = assertArchiveEquality(original, backedup);
if (msg != null) {
LOG.debug(msg);
return false;
}
}
msg = assertArchiveEquality(original, currentFiles);
if (msg != null) {
LOG.debug(msg);
return false;
}
return true;
}
/**
* Compare the archived files to the files in the original directory
* @param previous original files that should have been archived
* @param archived files that were archived
* @param fs filessystem on which the archiving took place
* @throws IOException
*/
public static void assertArchiveEqualToOriginal(FileStatus[] previous, FileStatus[] archived,
FileSystem fs) throws IOException {
assertArchiveEqualToOriginal(previous, archived, fs, false);
}
/**
* Compare the archived files to the files in the original directory
* @param previous original files that should have been archived
* @param archived files that were archived
* @param fs {@link FileSystem} on which the archiving took place
* @param hasTimedBackup <tt>true</tt> if we expect to find an archive backup directory with a
* copy of the files in the archive directory (and the original files).
* @throws IOException
*/
public static void assertArchiveEqualToOriginal(FileStatus[] previous, FileStatus[] archived,
FileSystem fs, boolean hasTimedBackup) throws IOException {
List<List<String>> lists = getFileLists(previous, archived);
List<String> original = lists.get(0);
Collections.sort(original);
List<String> currentFiles = lists.get(1);
Collections.sort(currentFiles);
List<String> backedup = lists.get(2);
Collections.sort(backedup);
// check the backed up files versus the current (should match up, less the
// backup time in the name)
assertEquals("Didn't expect any backup files, but got: " + backedup, hasTimedBackup,
backedup.size() > 0);
String msg = null;
if (hasTimedBackup) {
assertArchiveEquality(original, backedup);
assertNull(msg, msg);
}
// do the rest of the comparison
msg = assertArchiveEquality(original, currentFiles);
assertNull(msg, msg);
}
private static String assertArchiveEquality(List<String> expected, List<String> archived) {
String compare = compareFileLists(expected, archived);
if (!(expected.size() == archived.size())) return "Not the same number of current files\n"
+ compare;
if (!expected.equals(archived)) return "Different backup files, but same amount\n" + compare;
return null;
}
/**
* @return <expected, gotten, backup>, where each is sorted
*/
private static List<List<String>> getFileLists(FileStatus[] previous, FileStatus[] archived) {
List<List<String>> files = new ArrayList<List<String>>();
// copy over the original files
List<String> originalFileNames = convertToString(previous);
files.add(originalFileNames);
List<String> currentFiles = new ArrayList<String>(previous.length);
List<FileStatus> backedupFiles = new ArrayList<FileStatus>(previous.length);
for (FileStatus f : archived) {
String name = f.getPath().getName();
// if the file has been backed up
if (name.contains(".")) {
Path parent = f.getPath().getParent();
String shortName = name.split("[.]")[0];
Path modPath = new Path(parent, shortName);
FileStatus file = new FileStatus(f.getLen(), f.isDir(), f.getReplication(),
f.getBlockSize(), f.getModificationTime(), modPath);
backedupFiles.add(file);
} else {
// otherwise, add it to the list to compare to the original store files
currentFiles.add(name);
}
}
files.add(currentFiles);
files.add(convertToString(backedupFiles));
return files;
}
private static List<String> convertToString(FileStatus[] files) {
return convertToString(Arrays.asList(files));
}
private static List<String> convertToString(List<FileStatus> files) {
List<String> originalFileNames = new ArrayList<String>(files.size());
for (FileStatus f : files) {
originalFileNames.add(f.getPath().getName());
}
return originalFileNames;
}
/* Get a pretty representation of the differences */
private static String compareFileLists(List<String> expected, List<String> gotten) {
StringBuilder sb = new StringBuilder("Expected (" + expected.size() + "): \t\t Gotten ("
+ gotten.size() + "):\n");
List<String> notFound = new ArrayList<String>();
for (String s : expected) {
if (gotten.contains(s)) sb.append(s + "\t\t" + s + "\n");
else notFound.add(s);
}
sb.append("Not Found:\n");
for (String s : notFound) {
sb.append(s + "\n");
}
sb.append("\nExtra:\n");
for (String s : gotten) {
if (!expected.contains(s)) sb.append(s + "\n");
}
return sb.toString();
}
/**
* Helper method to get the archive directory for the specified region
* @param conf {@link Configuration} to check for the name of the archive directory
* @param region region that is being archived
* @return {@link Path} to the archive directory for the given region
*/
public static Path getRegionArchiveDir(Configuration conf, HRegion region) {
return HFileArchiveUtil.getRegionArchiveDir(conf, region.getTableDir(), region.getRegionDir());
}
/**
* Helper method to get the store archive directory for the specified region
* @param conf {@link Configuration} to check for the name of the archive directory
* @param region region that is being archived
* @param store store that is archiving files
* @return {@link Path} to the store archive directory for the given region
*/
public static Path getStoreArchivePath(Configuration conf, HRegion region, Store store) {
return HFileArchiveUtil.getStoreArchivePath(conf, region, store.getFamily().getName());
}
public static Path getStoreArchivePath(HBaseTestingUtility util, String tableName,
byte[] storeName) throws IOException {
byte[] table = Bytes.toBytes(tableName);
// get the RS and region serving our table
List<HRegion> servingRegions = util.getHBaseCluster().getRegions(table);
HRegion region = servingRegions.get(0);
// check that we actually have some store files that were archived
Store store = region.getStore(storeName);
return HFileArchiveTestingUtil.getStoreArchivePath(util.getConfiguration(), region, store);
}
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.junit.Assert.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
/**
* Test that the utility works as expected
*/
@Category(SmallTests.class)
public class TestHFileArchiveUtil {
@Test
public void testGetConfiguredArchiveDir() {
assertEquals(HFileArchiveUtil.DEFAULT_HFILE_ARCHIVE_DIRECTORY,
HFileArchiveUtil.getConfiguredArchiveDirName(null));
Configuration conf = new Configuration();
assertEquals(HFileArchiveUtil.DEFAULT_HFILE_ARCHIVE_DIRECTORY,
HFileArchiveUtil.getConfiguredArchiveDirName(conf));
conf.set(HConstants.HFILE_ARCHIVE_DIRECTORY, "");
assertEquals("", HFileArchiveUtil.getConfiguredArchiveDirName(conf));
String archiveDir = "somearchive";
conf.set(HConstants.HFILE_ARCHIVE_DIRECTORY, archiveDir);
assertEquals(archiveDir, HFileArchiveUtil.getConfiguredArchiveDirName(conf));
}
@Test
public void testGetTableArchivePath() {
assertNotNull(HFileArchiveUtil.getTableArchivePath(null, new Path("table")));
Configuration conf = new Configuration();
conf.set(HConstants.HFILE_ARCHIVE_DIRECTORY, "");
assertNotNull(HFileArchiveUtil.getTableArchivePath(conf, new Path("root", new Path("table"))));
}
@Test
public void testGetArchivePath() throws Exception {
Configuration conf = new Configuration();
FSUtils.setRootDir(conf, new Path("root"));
assertNotNull(HFileArchiveUtil.getArchivePath(conf));
String archiveDir = "somearchive";
conf.set(HConstants.HFILE_ARCHIVE_DIRECTORY, archiveDir);
assertEquals(new Path(FSUtils.getRootDir(conf), archiveDir),
HFileArchiveUtil.getArchivePath(conf));
}
@Test
public void testRegionArchiveDir() {
Path tableDir = new Path("table");
Path regionDir = new Path("region");
assertNotNull(HFileArchiveUtil.getRegionArchiveDir(null, tableDir, regionDir));
}
@Test
public void testGetStoreArchivePath(){
byte[] family = Bytes.toBytes("Family");
Path tabledir = new Path("table");
HRegionInfo region = new HRegionInfo(Bytes.toBytes("table"));
Configuration conf = null;
assertNotNull(HFileArchiveUtil.getStoreArchivePath(conf, region, tabledir, family));
conf = new Configuration();
assertNotNull(HFileArchiveUtil.getStoreArchivePath(conf, region, tabledir, family));
conf.set(HConstants.HFILE_ARCHIVE_DIRECTORY, "archiveDir");
assertNotNull(HFileArchiveUtil.getStoreArchivePath(conf, region, tabledir, family));
// do a little mocking of a region to get the same results
HRegion mockRegion = Mockito.mock(HRegion.class);
Mockito.when(mockRegion.getRegionInfo()).thenReturn(region);
Mockito.when(mockRegion.getTableDir()).thenReturn(tabledir);
assertNotNull(HFileArchiveUtil.getStoreArchivePath(null, mockRegion, family));
conf = new Configuration();
assertNotNull(HFileArchiveUtil.getStoreArchivePath(conf, mockRegion, family));
conf.set(HConstants.HFILE_ARCHIVE_DIRECTORY, "archiveDir");
assertNotNull(HFileArchiveUtil.getStoreArchivePath(conf, mockRegion, family));
}
}