HBASE-9208 ReplicationLogCleaner slow at large scale

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1518466 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-08-29 04:29:30 +00:00
parent fa40ff3566
commit 20728884b2
10 changed files with 206 additions and 149 deletions

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.BaseConfigurable;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
/**
* Base class for file cleaners which allows subclasses to implement a simple
* isFileDeletable method (which used to be the FileCleanerDelegate contract).
*/
public abstract class BaseFileCleanerDelegate extends BaseConfigurable
implements FileCleanerDelegate {
@Override
public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
return Iterables.filter(files, new Predicate<FileStatus>() {
@Override
public boolean apply(FileStatus file) {
return isFileDeletable(file);
}});
}
/**
* Should the master delete the file or keep it?
* @param fStat file status of the file to check
* @return <tt>true</tt> if the file is deletable, <tt>false</tt> if not
*/
protected abstract boolean isFileDeletable(FileStatus fStat);
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.cleaner;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.BaseConfigurable;
/**
* Base class for the hfile cleaning function inside the master. By default, only the
@ -36,8 +35,7 @@ import org.apache.hadoop.hbase.BaseConfigurable;
* provide a default constructor.
*/
@InterfaceAudience.Private
public abstract class BaseHFileCleanerDelegate extends BaseConfigurable implements
FileCleanerDelegate {
public abstract class BaseHFileCleanerDelegate extends BaseFileCleanerDelegate {
private boolean stopped = false;

View File

@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.master.cleaner;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.BaseConfigurable;
/**
* Base class for the log cleaning function inside the master. By default, two
@ -38,7 +36,7 @@ import org.apache.hadoop.hbase.BaseConfigurable;
* provide a default constructor.
*/
@InterfaceAudience.Private
public abstract class BaseLogCleanerDelegate extends BaseConfigurable implements FileCleanerDelegate {
public abstract class BaseLogCleanerDelegate extends BaseFileCleanerDelegate {
@Override
public boolean isFileDeletable(FileStatus fStat) {

View File

@ -32,6 +32,11 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.util.FSUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
/**
* Abstract Cleaner that uses a chain of delegates to clean a directory of files
* @param <T> Cleaner delegate class that is dynamically loaded from configuration
@ -97,7 +102,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Chore
* @param conf
* @return the new instance
*/
public T newFileCleaner(String className, Configuration conf) {
private T newFileCleaner(String className, Configuration conf) {
try {
Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass(
FileCleanerDelegate.class);
@ -115,25 +120,46 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Chore
@Override
protected void chore() {
try {
FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir, null);
// if the path (file or directory) doesn't exist, then we can just return
if (files == null) return;
// loop over the found files and see if they should be deleted
for (FileStatus file : files) {
try {
if (file.isDir()) checkAndDeleteDirectory(file.getPath());
else checkAndDelete(file);
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.warn("Error while cleaning the logs", e);
}
}
FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir);
checkAndDeleteEntries(files);
} catch (IOException e) {
LOG.warn("Failed to get status of: " + oldFileDir);
e = RemoteExceptionHandler.checkIOException(e);
LOG.warn("Error while cleaning the logs", e);
}
}
/**
* Loop over the given directory entries, and check whether they can be deleted.
* If an entry is itself a directory it will be recursively checked and deleted itself iff
* all subentries are deleted (and no new subentries are added in the mean time)
*
* @param entries directory entries to check
* @return true if all entries were successfully deleted
*/
private boolean checkAndDeleteEntries(FileStatus[] entries) {
if (entries == null) {
return true;
}
boolean allEntriesDeleted = true;
List<FileStatus> files = Lists.newArrayListWithCapacity(entries.length);
for (FileStatus child : entries) {
Path path = child.getPath();
if (child.isDir()) {
// for each subdirectory delete it and all entries if possible
if (!checkAndDeleteDirectory(path)) {
allEntriesDeleted = false;
}
} else {
// collect all files to attempt to delete in one batch
files.add(child);
}
}
if (!checkAndDeleteFiles(files)) {
allEntriesDeleted = false;
}
return allEntriesDeleted;
}
/**
* Attempt to delete a directory and all files under that directory. Each child file is passed
* through the delegates to see if it can be deleted. If the directory has no children when the
@ -141,109 +167,107 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Chore
* <p>
* If new children files are added between checks of the directory, the directory will <b>not</b>
* be deleted.
* @param toCheck directory to check
* @param dir directory to check
* @return <tt>true</tt> if the directory was deleted, <tt>false</tt> otherwise.
* @throws IOException if there is an unexpected filesystem error
*/
public boolean checkAndDeleteDirectory(Path toCheck) throws IOException {
@VisibleForTesting boolean checkAndDeleteDirectory(Path dir) {
if (LOG.isTraceEnabled()) {
LOG.trace("Checking directory: " + toCheck);
LOG.trace("Checking directory: " + dir);
}
FileStatus[] children = FSUtils.listStatus(fs, toCheck);
// if the directory doesn't exist, then we are done
if (children == null) {
try {
return fs.delete(toCheck, false);
} catch (IOException e) {
if (LOG.isTraceEnabled()) {
LOG.trace("Couldn't delete directory: " + toCheck, e);
}
}
// couldn't delete w/o exception, so we can't return success.
try {
FileStatus[] children = FSUtils.listStatus(fs, dir);
boolean allChildrenDeleted = checkAndDeleteEntries(children);
// if the directory still has children, we can't delete it, so we are done
if (!allChildrenDeleted) return false;
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.warn("Error while listing directory: " + dir, e);
// couldn't list directory, so don't try to delete, and don't return success
return false;
}
boolean canDeleteThis = true;
for (FileStatus child : children) {
Path path = child.getPath();
// attempt to delete all the files under the directory
if (child.isDir()) {
if (!checkAndDeleteDirectory(path)) {
canDeleteThis = false;
}
}
// otherwise we can just check the file
else if (!checkAndDelete(child)) {
canDeleteThis = false;
}
}
// if the directory has children, we can't delete it, so we are done
if (!canDeleteThis) return false;
// otherwise, all the children (that we know about) have been deleted, so we should try to
// delete this directory. However, don't do so recursively so we don't delete files that have
// been added since we last checked.
try {
return fs.delete(toCheck, false);
return fs.delete(dir, false);
} catch (IOException e) {
if (LOG.isTraceEnabled()) {
LOG.trace("Couldn't delete directory: " + toCheck, e);
LOG.trace("Couldn't delete directory: " + dir, e);
}
// couldn't delete w/o exception, so we can't return success.
return false;
}
// couldn't delete w/o exception, so we can't return success.
return false;
}
/**
* Run the given file through each of the cleaners to see if it should be deleted, deleting it if
* Run the given files through each of the cleaners to see if it should be deleted, deleting it if
* necessary.
* @param fStat path of the file to check (and possibly delete)
* @throws IOException if cann't delete a file because of a filesystem issue
* @throws IllegalArgumentException if the file is a directory and has children
* @param files List of FileStatus for the files to check (and possibly delete)
* @return true iff successfully deleted all files
*/
private boolean checkAndDelete(FileStatus fStat) throws IOException, IllegalArgumentException {
Path filePath = fStat.getPath();
private boolean checkAndDeleteFiles(List<FileStatus> files) {
// first check to see if the path is valid
if (!validate(filePath)) {
LOG.warn("Found a wrongly formatted file: " + filePath.getName() + " deleting it.");
boolean success = this.fs.delete(filePath, true);
if(!success)
LOG.warn("Attempted to delete: " + filePath
+ ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
return success;
List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
List<FileStatus> invalidFiles = Lists.newArrayList();
for (FileStatus file : files) {
if (validate(file.getPath())) {
validFiles.add(file);
} else {
LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");
invalidFiles.add(file);
}
}
// check each of the cleaners for the file
Iterable<FileStatus> deletableValidFiles = validFiles;
// check each of the cleaners for the valid files
for (T cleaner : cleanersChain) {
if (cleaner.isStopped() || this.stopper.isStopped()) {
LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any file in:"
LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:"
+ this.oldFileDir);
return false;
}
if (!cleaner.isFileDeletable(fStat)) {
// this file is not deletable, then we are done
if (LOG.isTraceEnabled()) {
LOG.trace(filePath + " is not deletable according to:" + cleaner);
Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);
// trace which cleaner is holding on to each file
if (LOG.isTraceEnabled()) {
ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);
for (FileStatus file : deletableValidFiles) {
if (!filteredFileSet.contains(file)) {
LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);
}
}
return false;
}
deletableValidFiles = filteredFiles;
}
Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
int deletedFileCount = 0;
for (FileStatus file : filesToDelete) {
Path filePath = file.getPath();
if (LOG.isTraceEnabled()) {
LOG.trace("Removing: " + filePath + " from archive");
}
try {
boolean success = this.fs.delete(filePath, false);
if (success) {
deletedFileCount++;
} else {
LOG.warn("Attempted to delete:" + filePath
+ ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
}
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.warn("Error while deleting: " + filePath, e);
}
}
// delete this file if it passes all the cleaners
if (LOG.isTraceEnabled()) {
LOG.trace("Removing: " + filePath + " from archive");
}
boolean success = this.fs.delete(filePath, false);
if (!success) {
LOG.warn("Attempted to delete:" + filePath
+ ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
}
return success;
}
return deletedFileCount == files.size();
}
@Override
public void cleanup() {

View File

@ -31,9 +31,9 @@ import org.apache.hadoop.hbase.Stoppable;
public interface FileCleanerDelegate extends Configurable, Stoppable {
/**
* Should the master delete the file or keep it?
* @param fStat file status of the file to check
* @return <tt>true</tt> if the file is deletable, <tt>false</tt> if not
* Determines which of the given files are safe to delete
* @param files files to check for deletion
* @return files that are ok to delete according to this cleaner
*/
boolean isFileDeletable(FileStatus fStat);
Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files);
}

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.hbase.master.cleaner;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -28,11 +26,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
/**
* HFileLink cleaner that determines if a hfile should be deleted.

View File

@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
/**

View File

@ -30,15 +30,18 @@ import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
/**
* Implementation of a log cleaner that checks if a log is still scheduled for
* replication before deleting it when its TTL is over.
@ -48,48 +51,45 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
private ZooKeeperWatcher zkw;
private ReplicationQueuesClient replicationQueues;
private final Set<String> hlogs = new HashSet<String>();
private boolean stopped = false;
private boolean aborted;
@Override
public boolean isLogDeletable(FileStatus fStat) {
// all members of this class are null if replication is disabled, and we
// return true since false would render the LogsCleaner useless
public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
// all members of this class are null if replication is disabled,
// so we cannot filter the files
if (this.getConf() == null) {
return true;
return files;
}
String log = fStat.getPath().getName();
// If we saw the hlog previously, let's consider it's still used
// At some point in the future we will refresh the list and it will be gone
if (this.hlogs.contains(log)) {
return false;
}
// Let's see it's still there
// This solution makes every miss very expensive to process since we
// almost completely refresh the cache each time
return !refreshHLogsAndSearch(log);
final Set<String> hlogs = loadHLogsFromQueues();
return Iterables.filter(files, new Predicate<FileStatus>() {
@Override
public boolean apply(FileStatus file) {
String hlog = file.getPath().getName();
boolean logInReplicationQueue = hlogs.contains(hlog);
if (LOG.isDebugEnabled()) {
if (logInReplicationQueue) {
LOG.debug("Found log in ZK, keeping: " + hlog);
} else {
LOG.debug("Didn't find this log in ZK, deleting: " + hlog);
}
}
return !logInReplicationQueue;
}});
}
/**
* Search through all the hlogs we have in ZK to refresh the cache
* If a log is specified and found, then we early out and return true
* @param searchedLog log we are searching for, pass null to cache everything
* that's in zookeeper.
* @return false until a specified log is found.
* Load all hlogs in all replication queues from ZK
*/
private boolean refreshHLogsAndSearch(String searchedLog) {
this.hlogs.clear();
final boolean lookForLog = searchedLog != null;
private Set<String> loadHLogsFromQueues() {
List<String> rss = replicationQueues.getListOfReplicators();
if (rss == null) {
LOG.debug("Didn't find any region server that replicates, deleting: " +
searchedLog);
return false;
LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
return ImmutableSet.of();
}
Set<String> hlogs = Sets.newHashSet();
for (String rs: rss) {
List<String> listOfPeers = replicationQueues.getAllQueues(rs);
// if rs just died, this will be null
@ -99,23 +99,18 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
for (String id : listOfPeers) {
List<String> peersHlogs = replicationQueues.getLogsInQueue(rs, id);
if (peersHlogs != null) {
this.hlogs.addAll(peersHlogs);
}
// early exit if we found the log
if(lookForLog && this.hlogs.contains(searchedLog)) {
LOG.debug("Found log in ZK, keeping: " + searchedLog);
return true;
hlogs.addAll(peersHlogs);
}
}
}
LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog);
return false;
return hlogs;
}
@Override
public void setConf(Configuration config) {
// If replication is disabled, keep all members null
if (!config.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
LOG.warn("Not configured - allowing all hlogs to be deleted");
return;
}
// Make my own Configuration. Then I'll have my own connection to zk that
@ -131,10 +126,8 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
} catch (IOException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
}
refreshHLogsAndSearch(null);
}
@Override
public void stop(String why) {
if (this.stopped) return;

View File

@ -1575,7 +1575,7 @@ public abstract class FSUtils {
* @param fs file system
* @param dir directory
* @param filter path filter
* @return null if tabledir doesn't exist, otherwise FileStatus array
* @return null if dir is empty or doesn't exist, otherwise FileStatus array
*/
public static FileStatus [] listStatus(final FileSystem fs,
final Path dir, final PathFilter filter) throws IOException {
@ -1598,7 +1598,7 @@ public abstract class FSUtils {
*
* @param fs file system
* @param dir directory
* @return null if tabledir doesn't exist, otherwise FileStatus array
* @return null if dir is empty or doesn't exist, otherwise FileStatus array
*/
public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
return listStatus(fs, dir, null);

View File

@ -329,19 +329,20 @@ public class TestZooKeeperTableArchiveClient {
BaseHFileCleanerDelegate delegateSpy = Mockito.spy(cleaner);
final int[] counter = new int[] { 0 };
final CountDownLatch finished = new CountDownLatch(1);
Mockito.doAnswer(new Answer<Boolean>() {
Mockito.doAnswer(new Answer<Iterable<FileStatus>>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
public Iterable<FileStatus> answer(InvocationOnMock invocation) throws Throwable {
counter[0]++;
LOG.debug(counter[0] + "/ " + expected + ") Wrapping call to isFileDeletable for file: "
LOG.debug(counter[0] + "/ " + expected + ") Wrapping call to getDeletableFiles for files: "
+ invocation.getArguments()[0]);
Boolean ret = (Boolean) invocation.callRealMethod();
@SuppressWarnings("unchecked")
Iterable<FileStatus> ret = (Iterable<FileStatus>) invocation.callRealMethod();
if (counter[0] >= expected) finished.countDown();
return ret;
}
}).when(delegateSpy).isFileDeletable(Mockito.any(FileStatus.class));
}).when(delegateSpy).getDeletableFiles(Mockito.anyListOf(FileStatus.class));
cleaners.set(0, delegateSpy);
return finished;