HBASE-6865 Snapshot File Cleaners (Jesse Yates)
git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-7290@1445783 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cb6f286858
commit
77ef4a85d2
|
@ -2434,7 +2434,7 @@ Server {
|
|||
* Exposed for TESTING!
|
||||
* @return the underlying snapshot manager
|
||||
*/
|
||||
SnapshotManager getSnapshotManagerForTesting() {
|
||||
public SnapshotManager getSnapshotManagerForTesting() {
|
||||
return this.snapshotManager;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,329 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.snapshot;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
||||
/**
|
||||
* Intelligently keep track of all the files for all the snapshots.
|
||||
* <p>
|
||||
* A cache of files is kept to avoid querying the {@link FileSystem} frequently. If there is a cache
|
||||
* miss the directory modification time is used to ensure that we don't rescan directories that we
|
||||
* already have in cache. We only check the modification times of the snapshot directories
|
||||
* (/hbase/.snapshot/[snapshot_name]) to determine if the files need to be loaded into the cache.
|
||||
* <p>
|
||||
* New snapshots will be added to the cache and deleted snapshots will be removed when we refresh
|
||||
* the cache. If the files underneath a snapshot directory are changed, but not the snapshot itself,
|
||||
* we will ignore updates to that snapshot's files.
|
||||
* <p>
|
||||
* This is sufficient because each snapshot has its own directory and is added via an atomic rename
|
||||
* <i>once</i>, when the snapshot is created. We don't need to worry about the data in the snapshot
|
||||
* being run.
|
||||
* <p>
|
||||
* Further, the cache is periodically refreshed ensure that files in snapshots that were deleted are
|
||||
* also removed from the cache.
|
||||
* <p>
|
||||
* A {@link PathFilter} must be passed when creating <tt>this</tt> to allow filtering of children
|
||||
* under the /hbase/.snapshot/[snapshot name] directory, for each snapshot. This allows you to only
|
||||
* cache files under, for instance, all the logs in the .logs directory or all the files under all
|
||||
* the regions. The filter is only applied the children, not to the children of children. For
|
||||
* instance, given the layout:
|
||||
*
|
||||
* <pre>
|
||||
* /hbase/.snapshot/SomeSnapshot/
|
||||
* .logs/
|
||||
* server/
|
||||
* server.1234567
|
||||
* .regioninfo
|
||||
* 1234567890/
|
||||
* family/
|
||||
* 123456789
|
||||
* </pre>
|
||||
*
|
||||
* would only apply a filter to directories: .logs, .regioninfo and 1234567890, not to their
|
||||
* children.
|
||||
* <p>
|
||||
* <tt>this</tt> also considers all running snapshots (those under /hbase/.snapshot/.tmp) as valid
|
||||
* snapshots and will attempt to cache files from those snapshots as well.
|
||||
* <p>
|
||||
* Queries about a given file are thread-safe with respect to multiple queries and cache refreshes.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class SnapshotFileCache implements Stoppable {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(SnapshotFileCache.class);
|
||||
private volatile boolean stop = false;
|
||||
private final FileSystem fs;
|
||||
private final PathFilter dirFilter;
|
||||
private final Path snapshotDir;
|
||||
private final Set<String> cache = new HashSet<String>();
|
||||
/**
|
||||
* This is a helper map of information about the snapshot directories so we don't need to rescan
|
||||
* them if they haven't changed since the last time we looked.
|
||||
*/
|
||||
private final Map<String, SnapshotDirectoryInfo> snapshots = new HashMap<String, SnapshotDirectoryInfo>();
|
||||
private final Timer refreshTimer;
|
||||
|
||||
private long lastModifiedTime = Long.MIN_VALUE;
|
||||
|
||||
/**
|
||||
* Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the
|
||||
* filesystem.
|
||||
* <p>
|
||||
* Immediately loads the file cache.
|
||||
* @param conf to extract the configured {@link FileSystem} where the snapshots are stored and
|
||||
* hbase root directory
|
||||
* @param cacheRefreshPeriod frequency (ms) with which the cache should be refreshed
|
||||
* @param refreshThreadName name of the cache refresh thread
|
||||
* @param inspectSnapshotDirectory Filter to apply to the directories under each snapshot.
|
||||
* @throws IOException if the {@link FileSystem} or root directory cannot be loaded
|
||||
*/
|
||||
public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, String refreshThreadName,
|
||||
PathFilter inspectSnapshotDirectory) throws IOException {
|
||||
this(FSUtils.getCurrentFileSystem(conf), FSUtils.getRootDir(conf), 0, cacheRefreshPeriod,
|
||||
refreshThreadName, inspectSnapshotDirectory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the
|
||||
* filesystem
|
||||
* @param fs {@link FileSystem} where the snapshots are stored
|
||||
* @param rootDir hbase root directory
|
||||
* @param cacheRefreshPeriod frequency (ms) with which the cache should be refreshed
|
||||
* @param cacheRefreshDelay amount of time to wait for the cache to be refreshed
|
||||
* @param refreshThreadName name of the cache refresh thread
|
||||
* @param inspectSnapshotDirectory Filter to apply to the directories under each snapshot.
|
||||
*/
|
||||
public SnapshotFileCache(FileSystem fs, Path rootDir, long cacheRefreshPeriod,
|
||||
long cacheRefreshDelay, String refreshThreadName, PathFilter inspectSnapshotDirectory) {
|
||||
this.fs = fs;
|
||||
this.dirFilter = inspectSnapshotDirectory;
|
||||
this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
|
||||
// periodically refresh the file cache to make sure we aren't superfluously saving files.
|
||||
this.refreshTimer = new Timer(refreshThreadName, true);
|
||||
this.refreshTimer.scheduleAtFixedRate(new RefreshCacheTask(), cacheRefreshDelay,
|
||||
cacheRefreshPeriod);
|
||||
}
|
||||
|
||||
/**
|
||||
* Trigger a cache refresh, even if its before the next cache refresh. Does not affect pending
|
||||
* cache refreshes.
|
||||
* <p>
|
||||
* Blocks until the cache is refreshed.
|
||||
* <p>
|
||||
* Exposed for TESTING.
|
||||
*/
|
||||
public void triggerCacheRefreshForTesting() {
|
||||
LOG.debug("Triggering cache refresh");
|
||||
new RefreshCacheTask().run();
|
||||
LOG.debug("Current cache:" + cache);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check to see if the passed file name is contained in any of the snapshots. First checks an
|
||||
* in-memory cache of the files to keep. If its not in the cache, then the cache is refreshed and
|
||||
* the cache checked again for that file. This ensures that we always return <tt>true</tt> for a
|
||||
* files that exists.
|
||||
* <p>
|
||||
* Note this may lead to periodic false positives for the file being referenced. Periodically, the
|
||||
* cache is refreshed even if there are no requests to ensure that the false negatives get removed
|
||||
* eventually. For instance, suppose you have a file in the snapshot and it gets loaded into the
|
||||
* cache. Then at some point later that snapshot is deleted. If the cache has not been refreshed
|
||||
* at that point, cache will still think the file system contains that file and return
|
||||
* <tt>true</tt>, even if it is no longer present (false positive). However, if the file never was
|
||||
* on the filesystem, we will never find it and always return <tt>false</tt>.
|
||||
* @param fileName file to check
|
||||
* @return <tt>false</tt> if the file is not referenced in any current or running snapshot,
|
||||
* <tt>true</tt> if the file is in the cache.
|
||||
* @throws IOException if there is an unexpected error reaching the filesystem.
|
||||
*/
|
||||
// XXX this is inefficient to synchronize on the method, when what we really need to guard against
|
||||
// is an illegal access to the cache. Really we could do a mutex-guarded pointer swap on the
|
||||
// cache, but that seems overkill at the moment and isn't necessarily a bottleneck.
|
||||
public synchronized boolean contains(String fileName) throws IOException {
|
||||
if (this.cache.contains(fileName)) return true;
|
||||
|
||||
refreshCache();
|
||||
|
||||
// then check again
|
||||
return this.cache.contains(fileName);
|
||||
}
|
||||
|
||||
private synchronized void refreshCache() throws IOException {
|
||||
// get the status of the snapshots directory
|
||||
FileStatus status;
|
||||
try {
|
||||
status = fs.getFileStatus(snapshotDir);
|
||||
} catch (FileNotFoundException e) {
|
||||
LOG.warn("Snapshot directory: " + snapshotDir + " doesn't exist");
|
||||
return;
|
||||
}
|
||||
// if the snapshot directory wasn't modified since we last check, we are done
|
||||
if (status.getModificationTime() <= lastModifiedTime) return;
|
||||
|
||||
// directory was modified, so we need to reload our cache
|
||||
// there could be a slight race here where we miss the cache, check the directory modification
|
||||
// time, then someone updates the directory, causing us to not scan the directory again.
|
||||
// However, snapshot directories are only created once, so this isn't an issue.
|
||||
|
||||
// 1. update the modified time
|
||||
this.lastModifiedTime = status.getModificationTime();
|
||||
|
||||
// 2.clear the cache
|
||||
this.cache.clear();
|
||||
Map<String, SnapshotDirectoryInfo> known = new HashMap<String, SnapshotDirectoryInfo>();
|
||||
|
||||
// 3. check each of the snapshot directories
|
||||
FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir);
|
||||
if (snapshots == null) {
|
||||
// remove all the remembered snapshots because we don't have any left
|
||||
LOG.debug("No snapshots on-disk, cache empty");
|
||||
this.snapshots.clear();
|
||||
return;
|
||||
}
|
||||
|
||||
// 3.1 iterate through the on-disk snapshots
|
||||
for (FileStatus snapshot : snapshots) {
|
||||
String name = snapshot.getPath().getName();
|
||||
// its the tmp dir
|
||||
if (name.equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME)) {
|
||||
// only add those files to the cache, but not to the known snapshots
|
||||
FileStatus[] running = FSUtils.listStatus(fs, snapshot.getPath());
|
||||
if (running == null) continue;
|
||||
for (FileStatus run : running) {
|
||||
this.cache.addAll(getAllFiles(run, dirFilter));
|
||||
}
|
||||
}
|
||||
|
||||
SnapshotDirectoryInfo files = this.snapshots.remove(name);
|
||||
// 3.1.1 if we don't know about the snapshot or its been modified, we need to update the files
|
||||
// the latter could occur where I create a snapshot, then delete it, and then make a new
|
||||
// snapshot with the same name. We will need to update the cache the information from that new
|
||||
// snapshot, even though it has the same name as the files referenced have probably changed.
|
||||
if (files == null || files.hasBeenModified(snapshot.getModificationTime())) {
|
||||
// get all files for the snapshot and create a new info
|
||||
Collection<String> storedFiles = getAllFiles(snapshot, dirFilter);
|
||||
files = new SnapshotDirectoryInfo(snapshot.getModificationTime(), storedFiles);
|
||||
}
|
||||
// 3.2 add all the files to cache
|
||||
this.cache.addAll(files.getFiles());
|
||||
known.put(name, files);
|
||||
}
|
||||
|
||||
// 4. set the snapshots we are tracking
|
||||
this.snapshots.clear();
|
||||
this.snapshots.putAll(known);
|
||||
}
|
||||
|
||||
private Collection<String> getAllFiles(FileStatus file, PathFilter filter) throws IOException {
|
||||
if (!file.isDir()) return Collections.singleton(file.getPath().getName());
|
||||
|
||||
Set<String> ret = new HashSet<String>();
|
||||
// read all the files/directories below the passed directory
|
||||
FileStatus[] files = FSUtils.listStatus(fs, file.getPath(), filter);
|
||||
if (files == null) return ret;
|
||||
|
||||
// get all the files for the children
|
||||
for (FileStatus child : files) {
|
||||
// children aren't filtered out
|
||||
ret.addAll(getAllFiles(child, null));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple helper task that just periodically attempts to refresh the cache
|
||||
*/
|
||||
public class RefreshCacheTask extends TimerTask {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
SnapshotFileCache.this.refreshCache();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to refresh snapshot hfile cache!", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(String why) {
|
||||
if (!this.stop) {
|
||||
this.stop = true;
|
||||
this.refreshTimer.cancel();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStopped() {
|
||||
return this.stop;
|
||||
}
|
||||
|
||||
/**
|
||||
* Information about a snapshot directory
|
||||
*/
|
||||
public class SnapshotDirectoryInfo {
|
||||
long lastModified;
|
||||
Collection<String> files;
|
||||
|
||||
public SnapshotDirectoryInfo(long mtime, Collection<String> files) {
|
||||
this.lastModified = mtime;
|
||||
this.files = files;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the hfiles in the snapshot when <tt>this</tt> was made.
|
||||
*/
|
||||
public Collection<String> getFiles() {
|
||||
return this.files;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the snapshot directory has been modified
|
||||
* @param mtime current modification time of the directory
|
||||
* @return <tt>true</tt> if it the modification time of the directory is newer time when we
|
||||
* created <tt>this</tt>
|
||||
*/
|
||||
public boolean hasBeenModified(long mtime) {
|
||||
return this.lastModified < mtime;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,97 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.snapshot;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
||||
/**
|
||||
* Implementation of a file cleaner that checks if a hfile is still used by snapshots of HBase
|
||||
* tables.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class SnapshotHFileCleaner extends BaseHFileCleanerDelegate {
|
||||
private static final Log LOG = LogFactory.getLog(SnapshotHFileCleaner.class);
|
||||
|
||||
/**
|
||||
* Conf key for the frequency to attempt to refresh the cache of hfiles currently used in
|
||||
* snapshots (ms)
|
||||
*/
|
||||
public static final String HFILE_CACHE_REFRESH_PERIOD_CONF_KEY = "hbase.master.hfilecleaner.plugins.snapshot.period";
|
||||
|
||||
/** Refresh cache, by default, every 5 minutes */
|
||||
private static final long DEFAULT_HFILE_CACHE_REFRESH_PERIOD = 300000;
|
||||
|
||||
/** File cache for HFiles in the completed and currently running snapshots */
|
||||
private SnapshotFileCache cache;
|
||||
|
||||
@Override
|
||||
public synchronized boolean isFileDeletable(Path filePath) {
|
||||
try {
|
||||
return !cache.contains(filePath.getName());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Exception while checking if:" + filePath + " was valid, keeping it just in case.",
|
||||
e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
super.setConf(conf);
|
||||
try {
|
||||
long cacheRefreshPeriod = conf.getLong(HFILE_CACHE_REFRESH_PERIOD_CONF_KEY,
|
||||
DEFAULT_HFILE_CACHE_REFRESH_PERIOD);
|
||||
FileSystem fs = FSUtils.getCurrentFileSystem(conf);
|
||||
Path rootDir = FSUtils.getRootDir(conf);
|
||||
cache = new SnapshotFileCache(fs, rootDir, cacheRefreshPeriod, cacheRefreshPeriod,
|
||||
"snapshot-hfile-cleaner-cache-refresher", new FSUtils.RegionDirFilter(fs)
|
||||
);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to create cleaner util", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(String why) {
|
||||
this.cache.stop(why);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStopped() {
|
||||
return this.cache.isStopped();
|
||||
}
|
||||
|
||||
/**
|
||||
* Exposed for Testing!
|
||||
* @return the cache of all hfiles
|
||||
*/
|
||||
public SnapshotFileCache getFileCacheForTesting() {
|
||||
return this.cache;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,110 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.snapshot;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
||||
/**
|
||||
* Implementation of a log cleaner that checks if a log is still used by
|
||||
* snapshots of HBase tables.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class SnapshotLogCleaner extends BaseLogCleanerDelegate {
|
||||
private static final Log LOG = LogFactory.getLog(SnapshotLogCleaner.class);
|
||||
|
||||
/**
|
||||
* Conf key for the frequency to attempt to refresh the cache of hfiles currently used in
|
||||
* snapshots (ms)
|
||||
*/
|
||||
static final String HLOG_CACHE_REFRESH_PERIOD_CONF_KEY = "hbase.master.hlogcleaner.plugins.snapshot.period";
|
||||
|
||||
/** Refresh cache, by default, every 5 minutes */
|
||||
private static final long DEFAULT_HLOG_CACHE_REFRESH_PERIOD = 300000;
|
||||
|
||||
private SnapshotFileCache cache;
|
||||
|
||||
@Override
|
||||
public synchronized boolean isFileDeletable(Path filePath) {
|
||||
try {
|
||||
return !cache.contains(filePath.getName());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Exception while checking if:" + filePath + " was valid, keeping it just in case.",
|
||||
e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method should only be called <b>once</b>, as it starts a thread to keep the cache
|
||||
* up-to-date.
|
||||
* <p>
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
super.setConf(conf);
|
||||
try {
|
||||
long cacheRefreshPeriod = conf.getLong(
|
||||
HLOG_CACHE_REFRESH_PERIOD_CONF_KEY, DEFAULT_HLOG_CACHE_REFRESH_PERIOD);
|
||||
final FileSystem fs = FSUtils.getCurrentFileSystem(conf);
|
||||
Path rootDir = FSUtils.getRootDir(conf);
|
||||
cache = new SnapshotFileCache(fs, rootDir, cacheRefreshPeriod, cacheRefreshPeriod,
|
||||
"snapshot-log-cleaner-cache-refresher", new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path path) {
|
||||
if (path.getName().equals(HConstants.HREGION_LOGDIR_NAME)) {
|
||||
try {
|
||||
if (!fs.isFile(path)) return true;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Couldn't reach fs to check:" + path + " is a directory, stopping!");
|
||||
SnapshotLogCleaner.this.stop("Couldn't reach FS to check if " + path
|
||||
+ " was a directory");
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to create snapshot log cleaner", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(String why) {
|
||||
this.cache.stop(why);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStopped() {
|
||||
return this.cache.isStopped();
|
||||
}
|
||||
}
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
|
@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
|||
import org.apache.hadoop.hbase.snapshot.exception.CorruptedSnapshotException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.FSUtils.DirFilter;
|
||||
|
||||
import com.google.common.collect.HashMultimap;
|
||||
import com.google.common.collect.Multimap;
|
||||
|
|
|
@ -110,7 +110,9 @@ public class SnapshotDescriptionUtils {
|
|||
*/
|
||||
public static final String SNAPSHOTINFO_FILE = ".snapshotinfo";
|
||||
|
||||
private static final String SNAPSHOT_TMP_DIR_NAME = ".tmp";
|
||||
/** Temporary directory under the snapshot directory to store in-progress snapshots */
|
||||
public static final String SNAPSHOT_TMP_DIR_NAME = ".tmp";
|
||||
|
||||
// snapshot operation values
|
||||
/** Default value if no start time is specified */
|
||||
public static final long NO_SNAPSHOT_START_TIME_SPECIFIED = 0;
|
||||
|
@ -283,7 +285,7 @@ public class SnapshotDescriptionUtils {
|
|||
long increment = conf.getLong(
|
||||
SnapshotDescriptionUtils.TIMESTAMP_SNAPSHOT_SPLIT_POINT_ADDITION,
|
||||
SnapshotDescriptionUtils.DEFAULT_TIMESTAMP_SNAPSHOT_SPLIT_IN_FUTURE);
|
||||
LOG.debug("Setting timestamp snasphot in future by " + increment + " ms.");
|
||||
LOG.debug("Setting timestamp snapshot in future by " + increment + " ms.");
|
||||
time += increment;
|
||||
}
|
||||
LOG.debug("Creation time not specified, setting to:" + time + " (current time:"
|
||||
|
|
|
@ -15,35 +15,48 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
package org.apache.hadoop.hbase.master.cleaner;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
|
||||
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||
import org.apache.hadoop.hbase.master.snapshot.DisabledTableSnapshotHandler;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotHFileCleaner;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteSnapshotRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsSnapshotDoneRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsSnapshotDoneResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListSnapshotRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListSnapshotResponse;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.exception.HBaseSnapshotException;
|
||||
import org.apache.hadoop.hbase.snapshot.exception.SnapshotCreationException;
|
||||
import org.apache.hadoop.hbase.snapshot.exception.UnknownSnapshotException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
@ -66,9 +79,16 @@ public class TestSnapshotFromMaster {
|
|||
private static final int NUM_RS = 2;
|
||||
private static Path rootDir;
|
||||
private static Path snapshots;
|
||||
private static Path archiveDir;
|
||||
private static FileSystem fs;
|
||||
private static HMaster master;
|
||||
|
||||
private static final String STRING_TABLE_NAME = "test";
|
||||
private static final byte[] TEST_FAM = Bytes.toBytes("fam");
|
||||
private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
|
||||
// refresh the cache every 1/2 second
|
||||
private static final long cacheRefreshPeriod = 500;
|
||||
|
||||
/**
|
||||
* Setup the config for the cluster
|
||||
*/
|
||||
|
@ -79,6 +99,7 @@ public class TestSnapshotFromMaster {
|
|||
fs = UTIL.getDFSCluster().getFileSystem();
|
||||
rootDir = FSUtils.getRootDir(UTIL.getConfiguration());
|
||||
snapshots = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
|
||||
archiveDir = new Path(rootDir, ".archive");
|
||||
master = UTIL.getMiniHBaseCluster().getMaster();
|
||||
}
|
||||
|
||||
|
@ -95,18 +116,37 @@ public class TestSnapshotFromMaster {
|
|||
conf.setInt("hbase.hstore.blockingStoreFiles", 12);
|
||||
// drop the number of attempts for the hbase admin
|
||||
conf.setInt("hbase.client.retries.number", 1);
|
||||
// set the only HFile cleaner as the snapshot cleaner
|
||||
conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
|
||||
SnapshotHFileCleaner.class.getCanonicalName());
|
||||
conf.setLong(SnapshotHFileCleaner.HFILE_CACHE_REFRESH_PERIOD_CONF_KEY, cacheRefreshPeriod);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
UTIL.createTable(TABLE_NAME, TEST_FAM);
|
||||
master.getSnapshotManagerForTesting().setSnapshotHandlerForTesting(null);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (!fs.delete(snapshots, true)) {
|
||||
throw new IOException("Couldn't delete snapshots directory (" + snapshots
|
||||
+ " for an unknown reason");
|
||||
|
||||
UTIL.deleteTable(TABLE_NAME);
|
||||
|
||||
// delete the archive directory, if its exists
|
||||
if (fs.exists(archiveDir)) {
|
||||
if (!fs.delete(archiveDir, true)) {
|
||||
throw new IOException("Couldn't delete archive directory (" + archiveDir
|
||||
+ " for an unknown reason");
|
||||
}
|
||||
}
|
||||
|
||||
// delete the snapshot directory, if its exists
|
||||
if (fs.exists(snapshots)) {
|
||||
if (!fs.delete(snapshots, true)) {
|
||||
throw new IOException("Couldn't delete snapshots directory (" + snapshots
|
||||
+ " for an unknown reason");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -245,4 +285,110 @@ public class TestSnapshotFromMaster {
|
|||
// then delete the existing snapshot,which shouldn't cause an exception to be thrown
|
||||
master.deleteSnapshot(null, request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the snapshot hfile archive cleaner works correctly. HFiles that are in snapshots
|
||||
* should be retained, while those that are not in a snapshot should be deleted.
|
||||
* @throws Exception on failure
|
||||
*/
|
||||
@Test
|
||||
public void testSnapshotHFileArchiving() throws Exception {
|
||||
HBaseAdmin admin = UTIL.getHBaseAdmin();
|
||||
// make sure we don't fail on listing snapshots
|
||||
SnapshotTestingUtils.assertNoSnapshots(admin);
|
||||
// load the table
|
||||
UTIL.loadTable(new HTable(UTIL.getConfiguration(), TABLE_NAME), TEST_FAM);
|
||||
|
||||
// disable the table so we can take a snapshot
|
||||
admin.disableTable(TABLE_NAME);
|
||||
|
||||
// take a snapshot of the table
|
||||
String snapshotName = "snapshot";
|
||||
byte[] snapshotNameBytes = Bytes.toBytes(snapshotName);
|
||||
admin.snapshot(snapshotNameBytes, TABLE_NAME);
|
||||
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
Path rootDir = FSUtils.getRootDir(conf);
|
||||
FileSystem fs = FSUtils.getCurrentFileSystem(conf);
|
||||
|
||||
FSUtils.logFileSystemState(fs, rootDir, LOG);
|
||||
|
||||
// ensure we only have one snapshot
|
||||
SnapshotTestingUtils.assertOneSnapshotThatMatches(admin, snapshotNameBytes, TABLE_NAME);
|
||||
|
||||
// renable the table so we can compact the regions
|
||||
admin.enableTable(TABLE_NAME);
|
||||
|
||||
// compact the files so we get some archived files for the table we just snapshotted
|
||||
List<HRegion> regions = UTIL.getHBaseCluster().getRegions(TABLE_NAME);
|
||||
for (HRegion region : regions) {
|
||||
region.compactStores();
|
||||
}
|
||||
|
||||
// make sure the cleaner has run
|
||||
LOG.debug("Running hfile cleaners");
|
||||
ensureHFileCleanersRun();
|
||||
|
||||
// get the snapshot files for the table
|
||||
Path snapshotTable = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
|
||||
FileStatus[] snapshotHFiles = SnapshotTestingUtils.listHFiles(fs, snapshotTable);
|
||||
// check that the files in the archive contain the ones that we need for the snapshot
|
||||
LOG.debug("Have snapshot hfiles:");
|
||||
for (FileStatus file : snapshotHFiles) {
|
||||
LOG.debug(file.getPath());
|
||||
}
|
||||
// get the archived files for the table
|
||||
Collection<String> files = getArchivedHFiles(conf, rootDir, fs, STRING_TABLE_NAME);
|
||||
|
||||
// and make sure that there is a proper subset
|
||||
for (FileStatus file : snapshotHFiles) {
|
||||
assertTrue("Archived hfiles " + files + " is missing snapshot file:" + file.getPath(),
|
||||
files.contains(file.getPath().getName()));
|
||||
}
|
||||
|
||||
// delete the existing snapshot
|
||||
admin.deleteSnapshot(snapshotNameBytes);
|
||||
SnapshotTestingUtils.assertNoSnapshots(admin);
|
||||
|
||||
// make sure that we don't keep around the hfiles that aren't in a snapshot
|
||||
// make sure we wait long enough to refresh the snapshot hfile
|
||||
List<BaseHFileCleanerDelegate> delegates = UTIL.getMiniHBaseCluster().getMaster()
|
||||
.getHFileCleaner().cleanersChain;
|
||||
((SnapshotHFileCleaner) delegates.get(0)).getFileCacheForTesting()
|
||||
.triggerCacheRefreshForTesting();
|
||||
// run the cleaner again
|
||||
LOG.debug("Running hfile cleaners");
|
||||
ensureHFileCleanersRun();
|
||||
|
||||
files = getArchivedHFiles(conf, rootDir, fs, STRING_TABLE_NAME);
|
||||
assertEquals("Still have some hfiles in the archive, when their snapshot has been deleted.", 0,
|
||||
files.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return all the HFiles for a given table that have been archived
|
||||
* @throws IOException on expected failure
|
||||
*/
|
||||
private final Collection<String> getArchivedHFiles(Configuration conf, Path rootDir,
|
||||
FileSystem fs, String tableName) throws IOException {
|
||||
Path tableArchive = HFileArchiveUtil.getTableArchivePath(new Path(rootDir, tableName));
|
||||
FileStatus[] archivedHFiles = SnapshotTestingUtils.listHFiles(fs, tableArchive);
|
||||
List<String> files = new ArrayList<String>(archivedHFiles.length);
|
||||
LOG.debug("Have archived hfiles:");
|
||||
for (FileStatus file : archivedHFiles) {
|
||||
LOG.debug(file.getPath());
|
||||
files.add(file.getPath().getName());
|
||||
}
|
||||
// sort the archived files
|
||||
|
||||
Collections.sort(files);
|
||||
return files;
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure the {@link HFileCleaner HFileCleaners} run at least once
|
||||
*/
|
||||
private static void ensureHFileCleanersRun() {
|
||||
UTIL.getHBaseCluster().getMaster().getHFileCleaner().chore();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,222 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.snapshot;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.server.snapshot.TakeSnapshotUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Test that we correctly reload the cache, filter directories, etc.
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
public class TestSnapshotFileCache {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestSnapshotFileCache.class);
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
private static FileSystem fs;
|
||||
private static Path rootDir;
|
||||
|
||||
@BeforeClass
|
||||
public static void startCluster() throws Exception {
|
||||
UTIL.startMiniDFSCluster(1);
|
||||
fs = UTIL.getDFSCluster().getFileSystem();
|
||||
rootDir = UTIL.getDefaultRootDirPath();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void stopCluster() throws Exception {
|
||||
UTIL.shutdownMiniDFSCluster();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanupFiles() throws Exception {
|
||||
// cleanup the snapshot directory
|
||||
Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
|
||||
fs.delete(snapshotDir, true);
|
||||
}
|
||||
|
||||
@Test(timeout = 10000000)
|
||||
public void testLoadAndDelete() throws Exception {
|
||||
// don't refresh the cache unless we tell it to
|
||||
long period = Long.MAX_VALUE;
|
||||
Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
|
||||
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
|
||||
"test-snapshot-file-cache-refresh", null);
|
||||
|
||||
Path snapshot = new Path(snapshotDir, "snapshot");
|
||||
Path region = new Path(snapshot, "region1");
|
||||
Path family = new Path(region, "fam");
|
||||
Path file1 = new Path(family, "file1");
|
||||
Path file2 = new Path(family, "file2");
|
||||
|
||||
// create two hfiles under the snapshot
|
||||
fs.create(file1);
|
||||
fs.create(file2);
|
||||
|
||||
FSUtils.logFileSystemState(fs, rootDir, LOG);
|
||||
|
||||
// then make sure the cache finds them
|
||||
assertTrue("Cache didn't find:" + file1, cache.contains(file1.getName()));
|
||||
assertTrue("Cache didn't find:" + file2, cache.contains(file2.getName()));
|
||||
String not = "file-shouldn't-be-found";
|
||||
assertFalse("Cache found '" + not + "', but it shouldn't have.", cache.contains(not));
|
||||
|
||||
// make sure we get a little bit of separation in the modification times
|
||||
// its okay if we sleep a little longer (b/c of GC pause), as long as we sleep a little
|
||||
Thread.sleep(10);
|
||||
|
||||
LOG.debug("Deleting snapshot.");
|
||||
// then delete the snapshot and make sure that we can still find the files
|
||||
if (!fs.delete(snapshot, true)) {
|
||||
throw new IOException("Couldn't delete " + snapshot + " for an unknown reason.");
|
||||
}
|
||||
FSUtils.logFileSystemState(fs, rootDir, LOG);
|
||||
|
||||
|
||||
LOG.debug("Checking to see if file is deleted.");
|
||||
assertTrue("Cache didn't find:" + file1, cache.contains(file1.getName()));
|
||||
assertTrue("Cache didn't find:" + file2, cache.contains(file2.getName()));
|
||||
|
||||
// then trigger a refresh
|
||||
cache.triggerCacheRefreshForTesting();
|
||||
// and not it shouldn't find those files
|
||||
assertFalse("Cache found '" + file1 + "', but it shouldn't have.",
|
||||
cache.contains(file1.getName()));
|
||||
assertFalse("Cache found '" + file2 + "', but it shouldn't have.",
|
||||
cache.contains(file2.getName()));
|
||||
|
||||
fs.delete(snapshotDir, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadsTmpDir() throws Exception {
|
||||
// don't refresh the cache unless we tell it to
|
||||
long period = Long.MAX_VALUE;
|
||||
Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
|
||||
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
|
||||
"test-snapshot-file-cache-refresh", null);
|
||||
|
||||
// create a file in a 'completed' snapshot
|
||||
Path snapshot = new Path(snapshotDir, "snapshot");
|
||||
Path region = new Path(snapshot, "region1");
|
||||
Path family = new Path(region, "fam");
|
||||
Path file1 = new Path(family, "file1");
|
||||
fs.create(file1);
|
||||
|
||||
// create an 'in progress' snapshot
|
||||
SnapshotDescription desc = SnapshotDescription.newBuilder().setName("working").build();
|
||||
snapshot = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, snapshotDir);
|
||||
region = new Path(snapshot, "region1");
|
||||
family = new Path(region, "fam");
|
||||
Path file2 = new Path(family, "file2");
|
||||
fs.create(file2);
|
||||
|
||||
FSUtils.logFileSystemState(fs, rootDir, LOG);
|
||||
|
||||
// then make sure the cache finds both files
|
||||
assertTrue("Cache didn't find:" + file1, cache.contains(file1.getName()));
|
||||
assertTrue("Cache didn't find:" + file2, cache.contains(file2.getName()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJustFindLogsDirectory() throws Exception {
|
||||
// don't refresh the cache unless we tell it to
|
||||
long period = Long.MAX_VALUE;
|
||||
Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
|
||||
PathFilter filter = new PathFilter() {
|
||||
|
||||
@Override
|
||||
public boolean accept(Path path) {
|
||||
return path.getName().equals(HConstants.HREGION_LOGDIR_NAME);
|
||||
}
|
||||
};
|
||||
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
|
||||
"test-snapshot-file-cache-refresh", filter);
|
||||
|
||||
// create a file in a 'completed' snapshot
|
||||
Path snapshot = new Path(snapshotDir, "snapshot");
|
||||
Path region = new Path(snapshot, "region1");
|
||||
Path family = new Path(region, "fam");
|
||||
Path file1 = new Path(family, "file1");
|
||||
fs.create(file1);
|
||||
|
||||
// and another file in the logs directory
|
||||
Path logs = TakeSnapshotUtils.getSnapshotHLogsDir(snapshot, "server");
|
||||
Path log = new Path(logs, "me.hbase.com%2C58939%2C1350424310315.1350424315552");
|
||||
fs.create(log);
|
||||
|
||||
FSUtils.logFileSystemState(fs, rootDir, LOG);
|
||||
|
||||
// then make sure the cache only finds the log files
|
||||
assertFalse("Cache found '" + file1 + "', but it shouldn't have.",
|
||||
cache.contains(file1.getName()));
|
||||
assertTrue("Cache didn't find:" + log, cache.contains(log.getName()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReloadModifiedDirectory() throws IOException {
|
||||
// don't refresh the cache unless we tell it to
|
||||
long period = Long.MAX_VALUE;
|
||||
Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
|
||||
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
|
||||
"test-snapshot-file-cache-refresh", null);
|
||||
|
||||
Path snapshot = new Path(snapshotDir, "snapshot");
|
||||
Path region = new Path(snapshot, "region1");
|
||||
Path family = new Path(region, "fam");
|
||||
Path file1 = new Path(family, "file1");
|
||||
Path file2 = new Path(family, "file2");
|
||||
|
||||
// create two hfiles under the snapshot
|
||||
fs.create(file1);
|
||||
fs.create(file2);
|
||||
|
||||
FSUtils.logFileSystemState(fs, rootDir, LOG);
|
||||
|
||||
assertTrue("Cache didn't find " + file1, cache.contains(file1.getName()));
|
||||
|
||||
// now delete the snapshot and add a file with a different name
|
||||
fs.delete(snapshot, true);
|
||||
Path file3 = new Path(family, "new_file");
|
||||
fs.create(file3);
|
||||
|
||||
FSUtils.logFileSystemState(fs, rootDir, LOG);
|
||||
assertTrue("Cache didn't find new file:" + file3, cache.contains(file3.getName()));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,89 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.snapshot;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Test that the snapshot hfile cleaner finds hfiles referenced in a snapshot
|
||||
*/
|
||||
@Category(SmallTests.class)
|
||||
public class TestSnapshotHFileCleaner {
|
||||
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
@AfterClass
|
||||
public static void cleanup() throws IOException {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
Path rootDir = FSUtils.getRootDir(conf);
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
// cleanup
|
||||
fs.delete(rootDir, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindsSnapshotFilesWhenCleaning() throws IOException {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
|
||||
Path rootDir = FSUtils.getRootDir(conf);
|
||||
Path archivedHfileDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HFILE_ARCHIVE_DIRECTORY);
|
||||
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
SnapshotHFileCleaner cleaner = new SnapshotHFileCleaner();
|
||||
cleaner.setConf(conf);
|
||||
|
||||
// write an hfile to the snapshot directory
|
||||
String snapshotName = "snapshot";
|
||||
byte[] snapshot = Bytes.toBytes(snapshotName);
|
||||
String table = "table";
|
||||
byte[] tableName = Bytes.toBytes(table);
|
||||
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
|
||||
HRegionInfo mockRegion = new HRegionInfo(tableName);
|
||||
Path regionSnapshotDir = new Path(snapshotDir, mockRegion.getEncodedName());
|
||||
Path familyDir = new Path(regionSnapshotDir, "family");
|
||||
// create a reference to a supposedly valid hfile
|
||||
String hfile = "fd1e73e8a96c486090c5cec07b4894c4";
|
||||
Path refFile = new Path(familyDir, hfile);
|
||||
|
||||
// make sure the reference file exists
|
||||
fs.create(refFile);
|
||||
|
||||
// create the hfile in the archive
|
||||
fs.mkdirs(archivedHfileDir);
|
||||
fs.createNewFile(new Path(archivedHfileDir, hfile));
|
||||
|
||||
// make sure that the file isn't deletable
|
||||
assertFalse(cleaner.isFileDeletable(new Path(hfile)));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.snapshot;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Test that the snapshot log cleaner finds logs referenced in a snapshot
|
||||
*/
|
||||
@Category(SmallTests.class)
|
||||
public class TestSnapshotLogCleaner {
|
||||
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
@AfterClass
|
||||
public static void cleanup() throws IOException {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
Path rootDir = FSUtils.getRootDir(conf);
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
// cleanup
|
||||
fs.delete(rootDir, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindsSnapshotFilesWhenCleaning() throws IOException {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
|
||||
Path rootDir = FSUtils.getRootDir(conf);
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
SnapshotLogCleaner cleaner = new SnapshotLogCleaner();
|
||||
cleaner.setConf(conf);
|
||||
|
||||
// write an hfile to the snapshot directory
|
||||
String snapshotName = "snapshot";
|
||||
byte[] snapshot = Bytes.toBytes(snapshotName);
|
||||
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
|
||||
Path snapshotLogDir = new Path(snapshotDir, HConstants.HREGION_LOGDIR_NAME);
|
||||
String timestamp = "1339643343027";
|
||||
String hostFromMaster = "localhost%2C59648%2C1339643336601";
|
||||
|
||||
Path hostSnapshotLogDir = new Path(snapshotLogDir, hostFromMaster);
|
||||
String snapshotlogfile = hostFromMaster + "." + timestamp + ".hbase";
|
||||
|
||||
// add the reference to log in the snapshot
|
||||
fs.create(new Path(hostSnapshotLogDir, snapshotlogfile));
|
||||
|
||||
// now check to see if that log file would get deleted.
|
||||
Path oldlogDir = new Path(rootDir, ".oldlogs");
|
||||
Path logFile = new Path(oldlogDir, snapshotlogfile);
|
||||
fs.create(logFile);
|
||||
|
||||
// make sure that the file isn't deletable
|
||||
assertFalse(cleaner.isFileDeletable(logFile));
|
||||
}
|
||||
}
|
|
@ -21,13 +21,19 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
|
@ -42,6 +48,7 @@ import org.apache.hadoop.hbase.server.snapshot.TakeSnapshotUtils;
|
|||
import org.apache.hadoop.hbase.snapshot.exception.HBaseSnapshotException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.junit.Assert;
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
@ -185,4 +192,65 @@ public class SnapshotTestingUtils {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* List all the HFiles in the given table
|
||||
* @param fs FileSystem where the table lives
|
||||
* @param tableDir directory of the table
|
||||
* @return array of the current HFiles in the table (could be a zero-length array)
|
||||
* @throws IOException on unexecpted error reading the FS
|
||||
*/
|
||||
public static FileStatus[] listHFiles(final FileSystem fs, Path tableDir) throws IOException {
|
||||
// setup the filters we will need based on the filesystem
|
||||
PathFilter regionFilter = new FSUtils.RegionDirFilter(fs);
|
||||
PathFilter familyFilter = new FSUtils.FamilyDirFilter(fs);
|
||||
final PathFilter fileFilter = new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path file) {
|
||||
try {
|
||||
return fs.isFile(file);
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
FileStatus[] regionDirs = FSUtils.listStatus(fs, tableDir, regionFilter);
|
||||
// if no regions, then we are done
|
||||
if (regionDirs == null || regionDirs.length == 0) return new FileStatus[0];
|
||||
|
||||
// go through each of the regions, and add al the hfiles under each family
|
||||
List<FileStatus> regionFiles = new ArrayList<FileStatus>(regionDirs.length);
|
||||
for (FileStatus regionDir : regionDirs) {
|
||||
FileStatus[] fams = FSUtils.listStatus(fs, regionDir.getPath(), familyFilter);
|
||||
// if no families, then we are done again
|
||||
if (fams == null || fams.length == 0) continue;
|
||||
// add all the hfiles under the family
|
||||
regionFiles.addAll(SnapshotTestingUtils.getHFilesInRegion(fams, fs, fileFilter));
|
||||
}
|
||||
FileStatus[] files = new FileStatus[regionFiles.size()];
|
||||
regionFiles.toArray(files);
|
||||
return files;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all the hfiles in the region, under the passed set of families
|
||||
* @param families all the family directories under the region
|
||||
* @param fs filesystem where the families live
|
||||
* @param fileFilter filter to only include files
|
||||
* @return collection of all the hfiles under all the passed in families (non-null)
|
||||
* @throws IOException on unexecpted error reading the FS
|
||||
*/
|
||||
public static Collection<FileStatus> getHFilesInRegion(FileStatus[] families, FileSystem fs,
|
||||
PathFilter fileFilter) throws IOException {
|
||||
Set<FileStatus> files = new TreeSet<FileStatus>();
|
||||
for (FileStatus family : families) {
|
||||
// get all the hfiles in the family
|
||||
FileStatus[] hfiles = FSUtils.listStatus(fs, family.getPath(), fileFilter);
|
||||
// if no hfiles, then we are done with this family
|
||||
if (hfiles == null || hfiles.length == 0) continue;
|
||||
files.addAll(Arrays.asList(hfiles));
|
||||
}
|
||||
return files;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue